diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 20e9ab54461..2bbd7a9f2af 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -26,6 +26,10 @@ import ( v1protocol "github.com/status-im/status-go/protocol/v1" ) +type TelemetryService interface { + PushRawMessageByType(ctx context.Context, messageType string, size uint32) +} + // Whisper message properties. const ( whisperTTL = 15 @@ -88,6 +92,8 @@ type MessageSender struct { // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated handleSharedSecrets func([]*sharedsecret.Secret) error + + telemetryClient TelemetryService } func NewMessageSender( @@ -113,6 +119,10 @@ func NewMessageSender( return p, nil } +func (s *MessageSender) WithTelemetryClient(client TelemetryService) { + s.telemetryClient = client +} + func (s *MessageSender) Stop() { s.messageEventsSubscriptionsMutex.Lock() defer s.messageEventsSubscriptionsMutex.Unlock() @@ -432,6 +442,9 @@ func (s *MessageSender) sendCommunity( zap.String("messageType", "community"), zap.Any("contentType", rawMessage.MessageType), zap.Strings("hashes", types.EncodeHexes(hashes))) + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -550,6 +563,10 @@ func (s *MessageSender) sendPrivate( s.transport.Track(messageID, hashes, newMessages) } + if s.telemetryClient != nil { + s.telemetryClient.PushRawMessageByType(ctx, rawMessage.MessageType.String(), uint32(len(rawMessage.Payload))) + } + return messageID, nil } @@ -578,6 +595,9 @@ func (s *MessageSender) SendPairInstallation( return nil, errors.Wrap(err, "failed to send a message spec") } + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, &rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -808,6 +828,9 @@ func (s *MessageSender) SendPublic( zap.Any("contentType", rawMessage.MessageType), zap.String("messageType", "public"), zap.Strings("hashes", types.EncodeHexes(hashes))) + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, &rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -1381,3 +1404,7 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error { return nil } + +func (s *MessageSender) sendBandwidthMetric(ctx context.Context, rawMessage *RawMessage) { + s.telemetryClient.PushRawMessageByType(ctx, rawMessage.MessageType.String(), uint32(len(rawMessage.Payload))) +} diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025d..f64549fea39 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -532,7 +532,10 @@ func NewMessenger( if c.wakuService != nil { c.wakuService.SetStatusTelemetryClient(telemetryClient) } + telemetryClient.Start(ctx) + + sender.WithTelemetryClient(telemetryClient) } messenger = &Messenger{ diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index b47149f364e..9ee52de7c1a 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -415,6 +415,9 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P } m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes))) + if m.telemetryClient != nil { + m.telemetryClient.PushRawMessageByType(ctx, "DATASYNC", uint32(len(marshalledPayload))) + } m.transport.TrackMany(messageIDs, hashes, newMessages) return nil diff --git a/telemetry/client.go b/telemetry/client.go index 5ded3e7b748..c580a22525c 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -60,6 +60,8 @@ const ( MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" // Total number and size of Waku messages sent by this node SentMessageTotalMetric TelemetryType = "SentMessageTotal" + // Size and type of raw message successfully returned by dispatchMessage + RawMessageByTypeMetric TelemetryType = "RawMessageByType" ) const MaxRetryCache = 5000 @@ -151,6 +153,10 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) } +func (c *Client) PushRawMessageByType(ctx context.Context, messageType string, messageSize uint32) { + c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: messageType, Size: messageSize}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -206,6 +212,11 @@ type SentMessageTotal struct { Size uint32 } +type RawMessageByType struct { + MessageType string + Size uint32 +} + type Client struct { serverURL string httpClient *http.Client @@ -287,6 +298,7 @@ func (c *Client) Start(ctx context.Context) { } } }() + go func() { defer common.LogOnPanic() sendPeriod := c.sendPeriod @@ -317,7 +329,6 @@ func (c *Client) Start(ctx context.Context) { return } } - }() } @@ -408,6 +419,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: SentMessageTotalMetric, TelemetryData: c.ProcessSentMessageTotal(v), } + case RawMessageByType: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: RawMessageByTypeMetric, + TelemetryData: c.ProcessRawMessageByType(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -589,6 +606,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso return c.marshalPostBody(postBody) } +func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageType"] = rawMessageByType.MessageType + postBody["size"] = rawMessageByType.Size + return c.marshalPostBody(postBody) +} + // Helper function to marshal post body and handle errors func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { body, err := json.Marshal(postBody) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index 7c1e6e7848d..d6b3141d49b 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -611,3 +611,19 @@ func TestProcessSentMessageTotal(t *testing.T) { } runTestCase(t, tc) } + +func TestProcessRawMessageByType(t *testing.T) { + tc := testCase{ + name: "RawMessageByType", + input: RawMessageByType{ + MessageType: "test-message-type", + Size: 1234, + }, + expectedType: RawMessageByTypeMetric, + expectedFields: map[string]interface{}{ + "messageType": "test-message-type", + "size": float64(1234), + }, + } + runTestCase(t, tc) +}