diff --git a/README.md b/README.md index db621d6..3fcdee2 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@

- Live Demo • + Live DemoDocumentationDeploymentDiscord diff --git a/aggregator/data.go b/aggregator/data.go index 6303a7b..7191b53 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -941,6 +941,13 @@ func (a *Aggregator) decodeKafkaPayload(d *l7_req.L7Event) ([]*KafkaMessage, err // var message protocol.Message // var err error + defer func() { + if r := recover(); r != nil { + log.Logger.Debug().Any("r", r). + Msg("recovered from kafka event,probably slice out of bounds") // since we read 1024 bytes at most from ebpf, slice out of bounds can occur + } + }() + result := make([]*KafkaMessage, 0) if d.Method == l7_req.KAFKA_PRODUCE_REQUEST { @@ -1241,6 +1248,42 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) { } +func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) { + query, err := a.parseMongoEvent(d) + if err != nil { + log.Logger.Error().AnErr("err", err) + return + } + addrPair := extractAddressPair(d) + + reqDto := &datastore.Request{ + StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6), + Latency: d.Duration, + FromIP: addrPair.Saddr, + ToIP: addrPair.Daddr, + Protocol: d.Protocol, + Tls: d.Tls, + Completed: true, + StatusCode: d.Status, + FailReason: "", + Method: d.Method, + Path: query, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, + } + + err = a.setFromToV2(addrPair, d, reqDto, "") + if err != nil { + return + } + + err = a.ds.PersistRequest(reqDto) + if err != nil { + log.Logger.Error().Err(err).Msg("error persisting request") + } +} + func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) { query, err := a.parseMySQLCommand(d) if err != nil { @@ -1271,7 +1314,6 @@ func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) { return } - log.Logger.Debug().Any("event", reqDto).Msg("persisting mysql-event") err = a.ds.PersistRequest(reqDto) if err != nil { log.Logger.Error().Err(err).Msg("error persisting request") @@ -1335,6 +1377,8 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { a.processKafkaEvent(ctx, d) case l7_req.L7_PROTOCOL_MYSQL: a.processMySQLEvent(ctx, d) + case l7_req.L7_PROTOCOL_MONGO: + a.processMongoEvent(ctx, d) } } @@ -1511,6 +1555,56 @@ func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) { return sqlCommand, nil } +func (a *Aggregator) parseMongoEvent(d *l7_req.L7Event) (string, error) { + defer func() { + if r := recover(); r != nil { + log.Logger.Debug().Any("r", r). + Msg("recovered from mongo event,probably slice out of bounds") + } + }() + + payload := d.Payload[:d.PayloadSize] + + // cut mongo header, 4 bytes MessageLength, 4 bytes RequestID, 4 bytes ResponseTo, 4 bytes Opcode, 4 bytes MessageFlags + payload = payload[20:] + + kind := payload[0] + payload = payload[1:] // cut kind + if kind == 0 { // body + docLenBytes := payload[:4] // document length + docLen := binary.LittleEndian.Uint32(docLenBytes) + payload = payload[4:docLen] // cut docLen + // parse Element + type_ := payload[0] // 2 means string + if type_ != 2 { + return "", fmt.Errorf("document element not a string") + } + payload = payload[1:] // cut type + + // read until NULL + element := []uint8{} + for _, p := range payload { + if p == 0 { + break + } + element = append(element, p) + } + + // 1 byte NULL, 4 bytes len + elementLenBytes := payload[len(element)+1 : len(element)+1+4] + elementLength := binary.LittleEndian.Uint32(elementLenBytes) + + payload = payload[len(element)+5:] // cut element + null + len + elementValue := payload[:elementLength-1] // myCollection, last byte is null + + result := fmt.Sprintf("%s %s", string(element), string(elementValue)) + log.Logger.Debug().Str("result", result).Msg("mongo-elem-result") + return result, nil + } + + return "", fmt.Errorf("could not parse mongo event") +} + func (a *Aggregator) getPgStmtKey(pid uint32, fd uint64, stmtName string) string { return fmt.Sprintf("%s-%s", a.getConnKey(pid, fd), stmtName) } diff --git a/ebpf/c/bpf.c b/ebpf/c/bpf.c index 6d4de53..f715046 100644 --- a/ebpf/c/bpf.c +++ b/ebpf/c/bpf.c @@ -37,6 +37,7 @@ #include "redis.c" #include "kafka.c" #include "mysql.c" +#include "mongo.c" #include "openssl.c" #include "http2.c" #include "tcp_sock.c" diff --git a/ebpf/c/bpf_bpfeb.o b/ebpf/c/bpf_bpfeb.o index 3269dc4..4e77177 100644 Binary files a/ebpf/c/bpf_bpfeb.o and b/ebpf/c/bpf_bpfeb.o differ diff --git a/ebpf/c/bpf_bpfel.o b/ebpf/c/bpf_bpfel.o index edb47f6..72b3840 100644 Binary files a/ebpf/c/bpf_bpfel.o and b/ebpf/c/bpf_bpfel.o differ diff --git a/ebpf/c/l7.c b/ebpf/c/l7.c index 2097045..6aee153 100644 --- a/ebpf/c/l7.c +++ b/ebpf/c/l7.c @@ -7,6 +7,7 @@ #define PROTOCOL_REDIS 5 #define PROTOCOL_KAFKA 6 #define PROTOCOL_MYSQL 7 +#define PROTOCOL_MONGO 8 @@ -329,6 +330,8 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha return 0; } req->protocol = PROTOCOL_MYSQL; + }else if(is_mongo_request(buf,count)){ + req->protocol = PROTOCOL_MONGO; }else if (is_http2_frame(buf, count)){ struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero); if (!e) { @@ -875,11 +878,17 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 }else if(active_req->request_type == MYSQL_COM_QUERY){ e->method = METHOD_MYSQL_TEXT_QUERY; } + }else if (e->protocol == PROTOCOL_MONGO){ + e->status = is_mongo_reply(read_info->buf, ret); } }else{ bpf_map_delete_elem(&active_reads, &id); return 0; } + + if (e->status == 0){ + return 0; + } bpf_map_delete_elem(&active_reads, &id); bpf_map_delete_elem(&active_l7_requests, &k); diff --git a/ebpf/c/mongo.c b/ebpf/c/mongo.c new file mode 100644 index 0000000..93bcaa4 --- /dev/null +++ b/ebpf/c/mongo.c @@ -0,0 +1,93 @@ +//go:build ignore +// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ +// Mongo Request Query +// 4 bytes message len +// 4 bytes request id +// 4 bytes response to +// 4 bytes opcode (2004 for Query) +// 4 bytes query flags +// fullCollectionName : ? +// 4 bytes number to skip +// 4 bytes number to return +// 4 bytes Document Length +// Elements + +// Extensible Message Format +// 4 bytes len +// 4 bytes request id +// 4 bytes response to +// 4 bytes opcode (2013 for extensible message format) +// 4 bytes message flags +// Section +// 1 byte Kind (0 for body) +// BodyDocument +// 4 bytes document length +// Elements +// Section +// Kind : Document Sequence (1) +// SeqId: "documents" +// DocumentSequence +// Document +// 4 bytes doc len + +// For response: +// same with above + +#define MONGO_OP_COMPRESSED 2012 // Wraps other opcodes using compression +#define MONGO_OP_MSG 2013 // Send a message using the standard format. Used for both client requests and database replies. + +// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#standard-message-header +struct mongo_header { + __s32 length; // total message size, including this + __s32 request_id; // identifier for this message + __s32 response_to; // requestID from the original request (used in responses from the database) + __s32 opcode; // message type +}; + +struct mongo_header_wout_len { + // __s32 length; // total message size, including this + __s32 request_id; // identifier for this message + __s32 response_to; // requestID from the original request (used in responses from the database) + __s32 opcode; // message type +}; + +static __always_inline +int is_mongo_request(char *buf, __u64 buf_size) { + struct mongo_header h = {}; + if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) { + return 0; + } + if (h.response_to == 0 && (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED)) { + bpf_printk("this is a mongo_request\n"); + return 1; + } + return 0; +} + +// mongo replies read in 2 parts +// [pid 286873] read(7, "\x2d\x00\x00\x00", 4) = 4 // these 4 bytes are length +// [pid 286873] read(7, "\xe1\x0b\x00\x00 \x09\x00\x00\x00 \xdd\x07\x00\x00 // request_id - response_to - opcode +// \x00\x00\x00\x00\x00\x18\x00\x00\x00\x10 +// \x6e\x00 +// \x01\x00\x00\x00\x01\x6f\x6b\x00"..., 41) = 41static __always_inline + // (ok) +static __always_inline +int is_mongo_reply(char *buf, __u64 buf_size) { + struct mongo_header_wout_len h = {}; + if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) { + bpf_printk("this is a mongo_reply_header_fail\n"); + return 0; + } + if (h.response_to == 0) { + bpf_printk("this is a mongo_reply_response_to0, - %d\n",h.opcode); + return 0; + } + if (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED) { + bpf_printk("this is a mongo_reply\n"); + return 1; + } + + bpf_printk("this is a mongo_reply-fail - %d\n",h.opcode); + return 0; +} + diff --git a/ebpf/l7_req/l7.go b/ebpf/l7_req/l7.go index c1b66a0..b5b3f7c 100644 --- a/ebpf/l7_req/l7.go +++ b/ebpf/l7_req/l7.go @@ -25,6 +25,7 @@ const ( BPF_L7_PROTOCOL_REDIS BPF_L7_PROTOCOL_KAFKA BPF_L7_PROTOCOL_MYSQL + BPF_L7_PROTOCOL_MONGO ) // for user space @@ -36,6 +37,7 @@ const ( L7_PROTOCOL_REDIS = "REDIS" L7_PROTOCOL_KAFKA = "KAFKA" L7_PROTOCOL_MYSQL = "MYSQL" + L7_PROTOCOL_MONGO = "MONGO" L7_PROTOCOL_UNKNOWN = "UNKNOWN" ) @@ -59,6 +61,8 @@ func (e L7ProtocolConversion) String() string { return L7_PROTOCOL_KAFKA case BPF_L7_PROTOCOL_MYSQL: return L7_PROTOCOL_MYSQL + case BPF_L7_PROTOCOL_MONGO: + return L7_PROTOCOL_MONGO case BPF_L7_PROTOCOL_UNKNOWN: return L7_PROTOCOL_UNKNOWN default: @@ -723,6 +727,8 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { case L7_PROTOCOL_MYSQL: method = MySQLMethodConversion(l7Event.Method).String() // no method set for kafka on kernel side + // no method set for mongo on kernel side + default: method = "Unknown" }