Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Kafka dissector. #2456

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Improved Kafka dissector.
 * detect more Kafka request packet's
 * requires less flow memory
 * same detection behavior as before e.g. no asym detection implemented
   (can be done by dissecting responses, requires more effort)

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
  • Loading branch information
utoni committed May 24, 2024
commit 51fdde70e05ec479c870d7abea366b18897ca681
6 changes: 0 additions & 6 deletions src/include/ndpi_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,6 @@ struct ndpi_flow_tcp_struct {
/* NDPI_PROTOCOL_SSH */
u_int32_t ssh_stage:3;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_stage:1;

/* NDPI_PROTOCOL_VNC */
u_int32_t vnc_stage:2; // 0 - 3

Expand Down Expand Up @@ -891,9 +888,6 @@ struct ndpi_flow_tcp_struct {

/* NDPI_PROTOCOL_RADMIN */
u_int32_t radmin_stage:1;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_correlation_id;
};

/* ************************************************** */
Expand Down
51 changes: 32 additions & 19 deletions src/lib/protocols/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
#include "ndpi_api.h"
#include "ndpi_private.h"

static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
}

static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
Expand All @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
* API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
* API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
*/
if (packet->payload_packet_len > 40 &&
ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
if (packet->payload_packet_len < 8 /* min. required packet length */ ||
ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4))
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Request */
if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
{
/* Request */
if (!flow->l4.tcp.kafka_stage &&
current_pkt_from_client_to_server(ndpi_struct, flow) &&
ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
if (packet->payload_packet_len < 14)
{
flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
flow->l4.tcp.kafka_stage = 1;
return;
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Response */
if (flow->l4.tcp.kafka_stage == 1 &&
current_pkt_from_server_to_client(ndpi_struct, flow))
const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12));
if (client_id_len + 12 + 2 > packet->payload_packet_len)
{
if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}
}
if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0)
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

ndpi_int_kafka_add_connection(ndpi_struct, flow);
return;
}

NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
Expand Down
Binary file added tests/cfgs/default/pcap/kafka.pcap
Binary file not shown.
37 changes: 37 additions & 0 deletions tests/cfgs/default/result/kafka.pcap.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Guessed flow protos: 1

DPI Packets (TCP): 12 (1.50 pkts/flow)
Confidence Match by port : 1 (flows)
Confidence DPI : 7 (flows)
Num dissector calls: 221 (27.62 diss/flow)
LRU cache ookla: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/3/0 (insert/search/found)
LRU cache stun: 0/0/0 (insert/search/found)
LRU cache tls_cert: 0/0/0 (insert/search/found)
LRU cache mining: 0/1/0 (insert/search/found)
LRU cache msteams: 0/0/0 (insert/search/found)
LRU cache stun_zoom: 0/0/0 (insert/search/found)
Automa host: 0/0 (search/found)
Automa domain: 0/0 (search/found)
Automa tls cert: 0/0 (search/found)
Automa risk mask: 0/0 (search/found)
Automa common alpns: 0/0 (search/found)
Patricia risk mask: 14/0 (search/found)
Patricia risk mask IPv6: 0/0 (search/found)
Patricia risk: 0/0 (search/found)
Patricia risk IPv6: 0/0 (search/found)
Patricia protocols: 16/0 (search/found)
Patricia protocols IPv6: 0/0 (search/found)

Kafka 22 4830 8

Acceptable 22 4830 8

1 TCP 172.16.17.101:38176 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][3 pkts/1408 bytes <-> 2 pkts/254 bytes][Goodput ratio: 86/48][0.58 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (timestamp)][Plen Bins: 0,40,0,0,0,0,0,0,0,0,0,40,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
2 TCP 172.30.0.237:9092 <-> 172.16.17.101:58052 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: Match by port][DPI packets: 5][cat: RPC/16][4 pkts/974 bytes <-> 1 pkts/110 bytes][Goodput ratio: 73/40][599.70 sec][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,20,0,60,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
3 TCP 172.16.17.101:49280 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][2 pkts/201 bytes <-> 3 pkts/788 bytes][Goodput ratio: 34/75][899.84 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 20,20,0,40,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
4 TCP 172.16.17.101:56556 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes <-> 1 pkts/416 bytes][Goodput ratio: 27/84][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 50,0,0,0,0,0,0,0,0,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
5 TCP 172.16.17.101:40042 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes <-> 1 pkts/186 bytes][Goodput ratio: 40/64][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,50,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
6 TCP 172.16.17.101:53768 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes -> 0 pkts/0 bytes][Goodput ratio: 40/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 0,100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
7 TCP 172.16.17.101:53052 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
8 TCP 172.16.17.101:58300 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
6 changes: 3 additions & 3 deletions tests/cfgs/default/result/kafka.pcapng.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DPI Packets (TCP): 6 (6.00 pkts/flow)
DPI Packets (TCP): 4 (4.00 pkts/flow)
Confidence DPI : 1 (flows)
Num dissector calls: 150 (150.00 diss/flow)
Num dissector calls: 1 (1.00 diss/flow)
LRU cache ookla: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/0/0 (insert/search/found)
LRU cache stun: 0/0/0 (insert/search/found)
Expand All @@ -24,4 +24,4 @@ Kafka 19 2237 1

Acceptable 19 2237 1

1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: TCP connection with unidirectional traffic][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Loading