Skip to content

Commit

Permalink
Ensure TC programs play nice with 3rdparty programs (#1462)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelroquetto authored Dec 14, 2024
1 parent cc8de1b commit 0d8cdc3
Show file tree
Hide file tree
Showing 94 changed files with 2,768 additions and 2,720 deletions.
3 changes: 1 addition & 2 deletions bpf/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
#define __FLOW_H__

#include "vmlinux.h"
#include "tc_act.h"

#define TC_ACT_OK 0
#define TC_ACT_SHOT 2
#define IP_MAX_LEN 16

#define ETH_ALEN 6 /* Octets in one ethernet addr */
Expand Down
10 changes: 5 additions & 5 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u
static inline int flow_monitor(struct __sk_buff *skb) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
Expand All @@ -157,7 +157,7 @@ static inline int flow_monitor(struct __sk_buff *skb) {
struct ethhdr *eth = (struct ethhdr *)data;
u16 flags = 0;
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}
id.if_index = skb->ifindex;

Expand Down Expand Up @@ -261,16 +261,16 @@ static inline int flow_monitor(struct __sk_buff *skb) {
if (flags & FIN_FLAG || flags & RST_FLAG || flags & FIN_ACK_FLAG || flags & RST_ACK_FLAG) {
bpf_map_delete_elem(&flow_directions, &id);
}
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
int beyla_ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb);
}

SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
int beyla_egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb);
}

Expand Down
10 changes: 5 additions & 5 deletions bpf/flows_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ static __always_inline bool same_ip(const u8 *ip1, const u8 *ip2) {
}

SEC("socket/http_filter")
int socket__http_filter(struct __sk_buff *skb) {
int beyla_socket__http_filter(struct __sk_buff *skb) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

u16 flags = 0;
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
if (!read_sk_buff(skb, &id, &flags)) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

// ignore traffic that's not egress or ingress
if (same_ip(id.src_ip.s6_addr, id.dst_ip.s6_addr)) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

u64 current_time = bpf_ktime_get_ns();
Expand Down Expand Up @@ -296,7 +296,7 @@ int socket__http_filter(struct __sk_buff *skb) {
if (flags & FIN_FLAG || flags & RST_FLAG) {
bpf_map_delete_elem(&flow_directions, &id);
}
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

// Force emitting structs into the ELF for automatic creation of Golang struct
Expand Down
32 changes: 16 additions & 16 deletions bpf/go_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct {
49 // 1 + 1 + 8 + 1 +~ 38 = type byte + hpack_len_as_byte("traceparent") + strlen(hpack("traceparent")) + len_as_byte(38) + hpack(generated tracepanent id)

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream(struct pt_regs *ctx) {
int beyla_uprobe_server_handleStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream === ");
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);
Expand Down Expand Up @@ -130,7 +130,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {

// Handles finding the connection information for http2 servers in grpc
SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
int beyla_uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
void *frame = GO_PARAM2(ctx);
Expand Down Expand Up @@ -167,7 +167,7 @@ int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {

// Handles finding the connection information for grpc ServeHTTP
SEC("uprobe/serverHandlerTransport_HandleStreams")
int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
int beyla_uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
void *tr = GO_PARAM1(ctx);
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_printk("=== uprobe/serverHandlerTransport_HandleStreams tr %llx goroutine %lx === ",
Expand Down Expand Up @@ -198,7 +198,7 @@ int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
}

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream_return(struct pt_regs *ctx) {
int beyla_uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream return === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -300,7 +300,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}

SEC("uprobe/transport_writeStatus")
int uprobe_transport_writeStatus(struct pt_regs *ctx) {
int beyla_uprobe_transport_writeStatus(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/transport_writeStatus === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -373,7 +373,7 @@ static __always_inline void clientConnStart(
}

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand All @@ -391,7 +391,7 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {

// Same as ClientConn_Invoke, registers for the method are offset by one
SEC("uprobe/ClientConn_NewStream")
int uprobe_ClientConn_NewStream(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.NewStream === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -475,7 +475,7 @@ static __always_inline int grpc_connect_done(struct pt_regs *ctx, void *err) {

// Same as ClientConn_Invoke, registers for the method are offset by one
SEC("uprobe/ClientConn_NewStream")
int uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.NewStream return === ");

void *stream = GO_PARAM1(ctx);
Expand All @@ -488,7 +488,7 @@ int uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
}

SEC("uprobe/ClientConn_Close")
int uprobe_ClientConn_Close(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Close(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Close === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand All @@ -502,7 +502,7 @@ int uprobe_ClientConn_Close(struct pt_regs *ctx) {
}

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke return === ");

void *err = GO_PARAM1(ctx);
Expand All @@ -516,7 +516,7 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {

// google.golang.org/grpc.(*clientStream).RecvMsg
SEC("uprobe/clientStream_RecvMsg")
int uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
int beyla_uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc clientStream.RecvMsg return === ");
void *err = (void *)GO_PARAM1(ctx);
return grpc_connect_done(ctx, err);
Expand All @@ -525,7 +525,7 @@ int uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
// The gRPC client stream is written on another goroutine in transport loopyWriter (controlbuf.go).
// We extract the stream ID when it's just created and make a mapping of it to our goroutine that's executing ClientConn.Invoke.
SEC("uprobe/transport_http2Client_NewStream")
int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
int beyla_uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc transport.(*http2Client).NewStream === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -619,7 +619,7 @@ struct {
} grpc_framer_invocation_map SEC(".maps");

SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders === ");

void *framer = GO_PARAM1(ctx);
Expand Down Expand Up @@ -682,7 +682,7 @@ int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
}
#else
SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
return 0;
}
#endif
Expand All @@ -692,7 +692,7 @@ int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
66 // 1 + 1 + 8 + 1 + 55 = type byte + hpack_len_as_byte("traceparent") + strlen(hpack("traceparent")) + len_as_byte(55) + generated traceparent id

SEC("uprobe/grpcFramerWriteHeaders_returns")
int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders returns === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -807,7 +807,7 @@ int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
}
#else
SEC("uprobe/grpcFramerWriteHeaders_returns")
int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
return 0;
}
#endif
18 changes: 9 additions & 9 deletions bpf/go_kafka_go.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct {

// Code for the produce messages path
SEC("uprobe/writer_write_messages")
int uprobe_writer_write_messages(struct pt_regs *ctx) {
int beyla_uprobe_writer_write_messages(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *w_ptr = (void *)GO_PARAM1(ctx);
bpf_dbg_printk(
Expand All @@ -81,7 +81,7 @@ int uprobe_writer_write_messages(struct pt_regs *ctx) {
}

SEC("uprobe/writer_produce")
int uprobe_writer_produce(struct pt_regs *ctx) {
int beyla_uprobe_writer_produce(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go writer_produce %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand Down Expand Up @@ -129,7 +129,7 @@ int uprobe_writer_produce(struct pt_regs *ctx) {
}

SEC("uprobe/client_roundTrip")
int uprobe_client_roundTrip(struct pt_regs *ctx) {
int beyla_uprobe_client_roundTrip(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go client_roundTrip %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -154,7 +154,7 @@ int uprobe_client_roundTrip(struct pt_regs *ctx) {
}

SEC("uprobe/protocol_RoundTrip")
int uprobe_protocol_roundtrip(struct pt_regs *ctx) {
int beyla_uprobe_protocol_roundtrip(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/kafka-go protocol_RoundTrip === ");
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *rw_ptr = (void *)GO_PARAM2(ctx);
Expand Down Expand Up @@ -187,7 +187,7 @@ int uprobe_protocol_roundtrip(struct pt_regs *ctx) {
}

SEC("uprobe/protocol_RoundTrip_ret")
int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
int beyla_uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/protocol_RoundTrip ret %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand Down Expand Up @@ -241,7 +241,7 @@ int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {

// Code for the fetch messages path
SEC("uprobe/reader_read")
int uprobe_reader_read(struct pt_regs *ctx) {
int beyla_uprobe_reader_read(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *r_ptr = (void *)GO_PARAM1(ctx);
void *conn = (void *)GO_PARAM5(ctx);
Expand Down Expand Up @@ -287,7 +287,7 @@ int uprobe_reader_read(struct pt_regs *ctx) {
}

SEC("uprobe/reader_send_message")
int uprobe_reader_send_message(struct pt_regs *ctx) {
int beyla_uprobe_reader_send_message(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_send_message %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -304,7 +304,7 @@ int uprobe_reader_send_message(struct pt_regs *ctx) {
}

SEC("uprobe/reader_read")
int uprobe_reader_read_ret(struct pt_regs *ctx) {
int beyla_uprobe_reader_read_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_read ret %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -330,4 +330,4 @@ int uprobe_reader_read_ret(struct pt_regs *ctx) {
bpf_map_delete_elem(&fetch_requests, &g_key);

return 0;
}
}
Loading

0 comments on commit 0d8cdc3

Please sign in to comment.