diff --git a/README.md b/README.md
index db621d6..3fcdee2 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@
- Live Demo •
+ Live Demo •
Documentation •
Deployment •
Discord
diff --git a/aggregator/data.go b/aggregator/data.go
index 6303a7b..7191b53 100644
--- a/aggregator/data.go
+++ b/aggregator/data.go
@@ -941,6 +941,13 @@ func (a *Aggregator) decodeKafkaPayload(d *l7_req.L7Event) ([]*KafkaMessage, err
// var message protocol.Message
// var err error
+ defer func() {
+ if r := recover(); r != nil {
+ log.Logger.Debug().Any("r", r).
+ Msg("recovered from kafka event,probably slice out of bounds") // since we read 1024 bytes at most from ebpf, slice out of bounds can occur
+ }
+ }()
+
result := make([]*KafkaMessage, 0)
if d.Method == l7_req.KAFKA_PRODUCE_REQUEST {
@@ -1241,6 +1248,42 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
}
+func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) {
+ query, err := a.parseMongoEvent(d)
+ if err != nil {
+ log.Logger.Error().AnErr("err", err)
+ return
+ }
+ addrPair := extractAddressPair(d)
+
+ reqDto := &datastore.Request{
+ StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
+ Latency: d.Duration,
+ FromIP: addrPair.Saddr,
+ ToIP: addrPair.Daddr,
+ Protocol: d.Protocol,
+ Tls: d.Tls,
+ Completed: true,
+ StatusCode: d.Status,
+ FailReason: "",
+ Method: d.Method,
+ Path: query,
+ // dist tracing disabled by default temporarily
+ // Tid: d.Tid,
+ // Seq: d.Seq,
+ }
+
+ err = a.setFromToV2(addrPair, d, reqDto, "")
+ if err != nil {
+ return
+ }
+
+ err = a.ds.PersistRequest(reqDto)
+ if err != nil {
+ log.Logger.Error().Err(err).Msg("error persisting request")
+ }
+}
+
func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) {
query, err := a.parseMySQLCommand(d)
if err != nil {
@@ -1271,7 +1314,6 @@ func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) {
return
}
- log.Logger.Debug().Any("event", reqDto).Msg("persisting mysql-event")
err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
@@ -1335,6 +1377,8 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
a.processKafkaEvent(ctx, d)
case l7_req.L7_PROTOCOL_MYSQL:
a.processMySQLEvent(ctx, d)
+ case l7_req.L7_PROTOCOL_MONGO:
+ a.processMongoEvent(ctx, d)
}
}
@@ -1511,6 +1555,56 @@ func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) {
return sqlCommand, nil
}
+func (a *Aggregator) parseMongoEvent(d *l7_req.L7Event) (string, error) {
+ defer func() {
+ if r := recover(); r != nil {
+ log.Logger.Debug().Any("r", r).
+ Msg("recovered from mongo event,probably slice out of bounds")
+ }
+ }()
+
+ payload := d.Payload[:d.PayloadSize]
+
+ // cut mongo header, 4 bytes MessageLength, 4 bytes RequestID, 4 bytes ResponseTo, 4 bytes Opcode, 4 bytes MessageFlags
+ payload = payload[20:]
+
+ kind := payload[0]
+ payload = payload[1:] // cut kind
+ if kind == 0 { // body
+ docLenBytes := payload[:4] // document length
+ docLen := binary.LittleEndian.Uint32(docLenBytes)
+ payload = payload[4:docLen] // cut docLen
+ // parse Element
+ type_ := payload[0] // 2 means string
+ if type_ != 2 {
+ return "", fmt.Errorf("document element not a string")
+ }
+ payload = payload[1:] // cut type
+
+ // read until NULL
+ element := []uint8{}
+ for _, p := range payload {
+ if p == 0 {
+ break
+ }
+ element = append(element, p)
+ }
+
+ // 1 byte NULL, 4 bytes len
+ elementLenBytes := payload[len(element)+1 : len(element)+1+4]
+ elementLength := binary.LittleEndian.Uint32(elementLenBytes)
+
+ payload = payload[len(element)+5:] // cut element + null + len
+ elementValue := payload[:elementLength-1] // myCollection, last byte is null
+
+ result := fmt.Sprintf("%s %s", string(element), string(elementValue))
+ log.Logger.Debug().Str("result", result).Msg("mongo-elem-result")
+ return result, nil
+ }
+
+ return "", fmt.Errorf("could not parse mongo event")
+}
+
func (a *Aggregator) getPgStmtKey(pid uint32, fd uint64, stmtName string) string {
return fmt.Sprintf("%s-%s", a.getConnKey(pid, fd), stmtName)
}
diff --git a/ebpf/c/bpf.c b/ebpf/c/bpf.c
index 6d4de53..f715046 100644
--- a/ebpf/c/bpf.c
+++ b/ebpf/c/bpf.c
@@ -37,6 +37,7 @@
#include "redis.c"
#include "kafka.c"
#include "mysql.c"
+#include "mongo.c"
#include "openssl.c"
#include "http2.c"
#include "tcp_sock.c"
diff --git a/ebpf/c/bpf_bpfeb.o b/ebpf/c/bpf_bpfeb.o
index 3269dc4..4e77177 100644
Binary files a/ebpf/c/bpf_bpfeb.o and b/ebpf/c/bpf_bpfeb.o differ
diff --git a/ebpf/c/bpf_bpfel.o b/ebpf/c/bpf_bpfel.o
index edb47f6..72b3840 100644
Binary files a/ebpf/c/bpf_bpfel.o and b/ebpf/c/bpf_bpfel.o differ
diff --git a/ebpf/c/l7.c b/ebpf/c/l7.c
index 2097045..6aee153 100644
--- a/ebpf/c/l7.c
+++ b/ebpf/c/l7.c
@@ -7,6 +7,7 @@
#define PROTOCOL_REDIS 5
#define PROTOCOL_KAFKA 6
#define PROTOCOL_MYSQL 7
+#define PROTOCOL_MONGO 8
@@ -329,6 +330,8 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
return 0;
}
req->protocol = PROTOCOL_MYSQL;
+ }else if(is_mongo_request(buf,count)){
+ req->protocol = PROTOCOL_MONGO;
}else if (is_http2_frame(buf, count)){
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
@@ -875,11 +878,17 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}else if(active_req->request_type == MYSQL_COM_QUERY){
e->method = METHOD_MYSQL_TEXT_QUERY;
}
+ }else if (e->protocol == PROTOCOL_MONGO){
+ e->status = is_mongo_reply(read_info->buf, ret);
}
}else{
bpf_map_delete_elem(&active_reads, &id);
return 0;
}
+
+ if (e->status == 0){
+ return 0;
+ }
bpf_map_delete_elem(&active_reads, &id);
bpf_map_delete_elem(&active_l7_requests, &k);
diff --git a/ebpf/c/mongo.c b/ebpf/c/mongo.c
new file mode 100644
index 0000000..93bcaa4
--- /dev/null
+++ b/ebpf/c/mongo.c
@@ -0,0 +1,93 @@
+//go:build ignore
+// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/
+// Mongo Request Query
+// 4 bytes message len
+// 4 bytes request id
+// 4 bytes response to
+// 4 bytes opcode (2004 for Query)
+// 4 bytes query flags
+// fullCollectionName : ?
+// 4 bytes number to skip
+// 4 bytes number to return
+// 4 bytes Document Length
+// Elements
+
+// Extensible Message Format
+// 4 bytes len
+// 4 bytes request id
+// 4 bytes response to
+// 4 bytes opcode (2013 for extensible message format)
+// 4 bytes message flags
+// Section
+// 1 byte Kind (0 for body)
+// BodyDocument
+// 4 bytes document length
+// Elements
+// Section
+// Kind : Document Sequence (1)
+// SeqId: "documents"
+// DocumentSequence
+// Document
+// 4 bytes doc len
+
+// For response:
+// same with above
+
+#define MONGO_OP_COMPRESSED 2012 // Wraps other opcodes using compression
+#define MONGO_OP_MSG 2013 // Send a message using the standard format. Used for both client requests and database replies.
+
+// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#standard-message-header
+struct mongo_header {
+ __s32 length; // total message size, including this
+ __s32 request_id; // identifier for this message
+ __s32 response_to; // requestID from the original request (used in responses from the database)
+ __s32 opcode; // message type
+};
+
+struct mongo_header_wout_len {
+ // __s32 length; // total message size, including this
+ __s32 request_id; // identifier for this message
+ __s32 response_to; // requestID from the original request (used in responses from the database)
+ __s32 opcode; // message type
+};
+
+static __always_inline
+int is_mongo_request(char *buf, __u64 buf_size) {
+ struct mongo_header h = {};
+ if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+ return 0;
+ }
+ if (h.response_to == 0 && (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED)) {
+ bpf_printk("this is a mongo_request\n");
+ return 1;
+ }
+ return 0;
+}
+
+// mongo replies read in 2 parts
+// [pid 286873] read(7, "\x2d\x00\x00\x00", 4) = 4 // these 4 bytes are length
+// [pid 286873] read(7, "\xe1\x0b\x00\x00 \x09\x00\x00\x00 \xdd\x07\x00\x00 // request_id - response_to - opcode
+// \x00\x00\x00\x00\x00\x18\x00\x00\x00\x10
+// \x6e\x00
+// \x01\x00\x00\x00\x01\x6f\x6b\x00"..., 41) = 41static __always_inline
+ // (ok)
+static __always_inline
+int is_mongo_reply(char *buf, __u64 buf_size) {
+ struct mongo_header_wout_len h = {};
+ if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+ bpf_printk("this is a mongo_reply_header_fail\n");
+ return 0;
+ }
+ if (h.response_to == 0) {
+ bpf_printk("this is a mongo_reply_response_to0, - %d\n",h.opcode);
+ return 0;
+ }
+ if (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED) {
+ bpf_printk("this is a mongo_reply\n");
+ return 1;
+ }
+
+ bpf_printk("this is a mongo_reply-fail - %d\n",h.opcode);
+ return 0;
+}
+
diff --git a/ebpf/l7_req/l7.go b/ebpf/l7_req/l7.go
index c1b66a0..b5b3f7c 100644
--- a/ebpf/l7_req/l7.go
+++ b/ebpf/l7_req/l7.go
@@ -25,6 +25,7 @@ const (
BPF_L7_PROTOCOL_REDIS
BPF_L7_PROTOCOL_KAFKA
BPF_L7_PROTOCOL_MYSQL
+ BPF_L7_PROTOCOL_MONGO
)
// for user space
@@ -36,6 +37,7 @@ const (
L7_PROTOCOL_REDIS = "REDIS"
L7_PROTOCOL_KAFKA = "KAFKA"
L7_PROTOCOL_MYSQL = "MYSQL"
+ L7_PROTOCOL_MONGO = "MONGO"
L7_PROTOCOL_UNKNOWN = "UNKNOWN"
)
@@ -59,6 +61,8 @@ func (e L7ProtocolConversion) String() string {
return L7_PROTOCOL_KAFKA
case BPF_L7_PROTOCOL_MYSQL:
return L7_PROTOCOL_MYSQL
+ case BPF_L7_PROTOCOL_MONGO:
+ return L7_PROTOCOL_MONGO
case BPF_L7_PROTOCOL_UNKNOWN:
return L7_PROTOCOL_UNKNOWN
default:
@@ -723,6 +727,8 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
case L7_PROTOCOL_MYSQL:
method = MySQLMethodConversion(l7Event.Method).String()
// no method set for kafka on kernel side
+ // no method set for mongo on kernel side
+
default:
method = "Unknown"
}