Skip to content

Commit

Permalink
feat(telem)_: track raw message by type on dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 11, 2024
1 parent 8b95c81 commit 8e26853
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
6 changes: 6 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,13 @@ func NewMessenger(
if c.telemetryServerURL != "" {
options := []telemetry.TelemetryClientOption{
telemetry.WithPeerID(peerId.String()),
telemetry.WithMessageChannel(sender.SubscribeToMessageEvents()),
}
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}

telemetryClient.Start(ctx)
}

Expand Down Expand Up @@ -705,6 +707,10 @@ func (m *Messenger) processSentMessage(id string) error {
return err
}

if m.telemetryClient != nil {
m.telemetryClient.PushRawMessageByType(context.Background(), rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)))
}

return nil
}

Expand Down
63 changes: 62 additions & 1 deletion telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/types"
protocolCommon "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"

Expand Down Expand Up @@ -60,6 +61,8 @@ const (
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
// Size and type of raw message successfully returned by dispatchMessage
RawMessageByTypeMetric TelemetryType = "RawMessageByType"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -151,6 +154,10 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

func (c *Client) PushRawMessageByType(ctx context.Context, messageType string, messageSize uint32) {
c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: messageType, Size: messageSize})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -206,6 +213,11 @@ type SentMessageTotal struct {
Size uint32
}

type RawMessageByType struct {
MessageType string
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand All @@ -225,6 +237,7 @@ type Client struct {
lastPeerCountTime time.Time
lastPeerConnFailures map[string]int
deviceType string
messageCh <-chan *protocolCommon.MessageEvent
}

type TelemetryClientOption func(*Client)
Expand All @@ -241,6 +254,12 @@ func WithPeerID(peerId string) TelemetryClientOption {
}
}

func WithMessageChannel(messageCh <-chan *protocolCommon.MessageEvent) TelemetryClientOption {
return func(c *Client) {
c.messageCh = messageCh
}
}

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
Expand Down Expand Up @@ -273,6 +292,21 @@ func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}

func (c *Client) handleMessageEvent(ctx context.Context, event *protocolCommon.MessageEvent) {
if event == nil {
return
}

switch event.Type {
case protocolCommon.MessageSent:
if event.RawMessage != nil {
c.PushRawMessageByType(ctx, event.RawMessage.MessageType.String(), uint32(len(event.RawMessage.Payload)))
}
case protocolCommon.MessageScheduled:
return
}
}

func (c *Client) Start(ctx context.Context) {
go func() {
defer common.LogOnPanic()
Expand All @@ -287,6 +321,7 @@ func (c *Client) Start(ctx context.Context) {
}
}
}()

go func() {
defer common.LogOnPanic()
sendPeriod := c.sendPeriod
Expand Down Expand Up @@ -317,8 +352,21 @@ func (c *Client) Start(ctx context.Context) {
return
}
}

}()

if c.messageCh != nil {
go func() {
defer common.LogOnPanic()
for {
select {
case <-ctx.Done():
return
case event := <-c.messageCh:
c.handleMessageEvent(ctx, event)
}
}
}()
}
}

func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) {
Expand Down Expand Up @@ -408,6 +456,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
case RawMessageByType:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: RawMessageByTypeMetric,
TelemetryData: c.ProcessRawMessageByType(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -589,6 +643,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageType"] = rawMessageByType.MessageType
postBody["size"] = rawMessageByType.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down
30 changes: 30 additions & 0 deletions telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/status-im/status-go/eth-node/types"
protocolCommon "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt"
v1protocol "github.com/status-im/status-go/protocol/v1"
Expand Down Expand Up @@ -97,6 +98,19 @@ func createClient(t *testing.T, mockServerURL string) *Client {
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm"))
}

func createClientWithMessageChannel(t *testing.T, mockServerURL string, messageCh <-chan *protocolCommon.MessageEvent) *Client {
config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build()
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0",
WithSendPeriod(100*time.Millisecond),
WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm"),
WithMessageChannel(messageCh))
}

type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)

func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) {
Expand Down Expand Up @@ -611,3 +625,19 @@ func TestProcessSentMessageTotal(t *testing.T) {
}
runTestCase(t, tc)
}

func TestProcessRawMessageByType(t *testing.T) {
tc := testCase{
name: "RawMessageByType",
input: RawMessageByType{
MessageType: "test-message-type",
Size: 1234,
},
expectedType: RawMessageByTypeMetric,
expectedFields: map[string]interface{}{
"messageType": "test-message-type",
"size": float64(1234),
},
}
runTestCase(t, tc)
}

0 comments on commit 8e26853

Please sign in to comment.