Skip to content

Commit

Permalink
feat(telem)_: track raw message by type on dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 12, 2024
1 parent 8b95c81 commit 81f0d97
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 1 deletion.
27 changes: 27 additions & 0 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)

type TelemetryService interface {
PushRawMessageByType(ctx context.Context, messageType string, size uint32)
}

// Whisper message properties.
const (
whisperTTL = 15
Expand Down Expand Up @@ -88,6 +92,8 @@ type MessageSender struct {

// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error

telemetryClient TelemetryService
}

func NewMessageSender(
Expand All @@ -113,6 +119,10 @@ func NewMessageSender(
return p, nil
}

func (s *MessageSender) WithTelemetryClient(client TelemetryService) {
s.telemetryClient = client
}

func (s *MessageSender) Stop() {
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
Expand Down Expand Up @@ -432,6 +442,9 @@ func (s *MessageSender) sendCommunity(
zap.String("messageType", "community"),
zap.Any("contentType", rawMessage.MessageType),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -550,6 +563,10 @@ func (s *MessageSender) sendPrivate(
s.transport.Track(messageID, hashes, newMessages)
}

if s.telemetryClient != nil {
s.telemetryClient.PushRawMessageByType(ctx, rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)))
}

return messageID, nil
}

Expand Down Expand Up @@ -578,6 +595,9 @@ func (s *MessageSender) SendPairInstallation(
return nil, errors.Wrap(err, "failed to send a message spec")
}

if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -808,6 +828,9 @@ func (s *MessageSender) SendPublic(
zap.Any("contentType", rawMessage.MessageType),
zap.String("messageType", "public"),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -1381,3 +1404,7 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error {

return nil
}

func (s *MessageSender) sendBandwidthMetric(ctx context.Context, rawMessage *RawMessage) {
s.telemetryClient.PushRawMessageByType(ctx, rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)))
}
3 changes: 3 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ func NewMessenger(
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}

telemetryClient.Start(ctx)

sender.WithTelemetryClient(telemetryClient)
}

messenger = &Messenger{
Expand Down
3 changes: 3 additions & 0 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P
}

m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
if m.telemetryClient != nil {
m.telemetryClient.PushRawMessageByType(ctx, "DATASYNC", uint32(len(marshalledPayload)))
}
m.transport.TrackMany(messageIDs, hashes, newMessages)

return nil
Expand Down
26 changes: 25 additions & 1 deletion telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
// Size and type of raw message successfully returned by dispatchMessage
RawMessageByTypeMetric TelemetryType = "RawMessageByType"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -151,6 +153,10 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

func (c *Client) PushRawMessageByType(ctx context.Context, messageType string, messageSize uint32) {
c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: messageType, Size: messageSize})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -206,6 +212,11 @@ type SentMessageTotal struct {
Size uint32
}

type RawMessageByType struct {
MessageType string
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -287,6 +298,7 @@ func (c *Client) Start(ctx context.Context) {
}
}
}()

go func() {
defer common.LogOnPanic()
sendPeriod := c.sendPeriod
Expand Down Expand Up @@ -317,7 +329,6 @@ func (c *Client) Start(ctx context.Context) {
return
}
}

}()
}

Expand Down Expand Up @@ -408,6 +419,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
case RawMessageByType:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: RawMessageByTypeMetric,
TelemetryData: c.ProcessRawMessageByType(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -589,6 +606,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageType"] = rawMessageByType.MessageType
postBody["size"] = rawMessageByType.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down
16 changes: 16 additions & 0 deletions telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,19 @@ func TestProcessSentMessageTotal(t *testing.T) {
}
runTestCase(t, tc)
}

func TestProcessRawMessageByType(t *testing.T) {
tc := testCase{
name: "RawMessageByType",
input: RawMessageByType{
MessageType: "test-message-type",
Size: 1234,
},
expectedType: RawMessageByTypeMetric,
expectedFields: map[string]interface{}{
"messageType": "test-message-type",
"size": float64(1234),
},
}
runTestCase(t, tc)
}

0 comments on commit 81f0d97

Please sign in to comment.