-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathsqs.go
121 lines (103 loc) · 3.5 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// (c) Copyright IBM Corp. 2021
// (c) Copyright Instana Inc. 2021
package instaawssdk
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
instana "github.com/instana/go-sensor"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
)
// StartSQSSpan initiates a new span from an AWS SQS request and injects it into the
// request.Request context
func StartSQSSpan(req *request.Request, sensor instana.TracerLogger) {
tags, err := extractSQSTags(req)
if err != nil {
if err == errMethodNotInstrumented {
return
}
sensor.Logger().Warn("failed to extract SQS tags: ", err)
}
// an exit span will be created without a parent span
// and forwarded if user chose to opt in
opts := []opentracing.StartSpanOption{
ext.SpanKindProducer,
opentracing.Tags{sqsSort: "exit"},
tags,
}
parent, ok := instana.SpanFromContext(req.Context())
if ok {
opts = append(opts, opentracing.ChildOf(parent.Context()))
}
sp := sensor.Tracer().StartSpan("sqs", opts...)
req.SetContext(instana.ContextWithSpan(req.Context(), sp))
injectTraceContext(sp, req, sensor.Logger())
}
// FinalizeSQSSpan retrieves tags from completed request.Request and adds them
// to the span
func FinalizeSQSSpan(req *request.Request) {
sp, ok := instana.SpanFromContext(req.Context())
if !ok {
return
}
defer sp.Finish()
if req.Error != nil {
sp.LogFields(otlog.Error(req.Error))
sp.SetTag(sqsError, req.Error.Error())
}
if req.DataFilled() {
switch data := req.Data.(type) {
case *sqs.GetQueueUrlOutput:
sp.SetTag(sqsQueue, aws.StringValue(data.QueueUrl))
case *sqs.CreateQueueOutput:
sp.SetTag(sqsQueue, aws.StringValue(data.QueueUrl))
case *sqs.ReceiveMessageOutput:
sp.SetTag(sqsSize, len(data.Messages))
}
}
}
// TraceSQSMessage creates an returns an entry span for an SQS message. The context of this span is injected
// into message attributes. This context can than be retrieved with instaawssdk.SpanContextFromSQSMessage()
// and used in the message handler method to continue the trace.
func TraceSQSMessage(msg *sqs.Message, sensor instana.TracerLogger) opentracing.Span {
opts := []opentracing.StartSpanOption{
ext.SpanKindConsumer,
opentracing.Tags{
sqsSort: "entry",
},
}
if spCtx, ok := SpanContextFromSQSMessage(msg, sensor); ok {
opts = append(opts, opentracing.ChildOf(spCtx))
} else {
body := aws.StringValue(msg.Body)
// In case the delivery has been created via a subscription to an SNS topic,
// the message body will be a JSON document containing the SNS notification
// along with message attributes.
var payload struct {
MessageAttributes snsMessageAttributes `json:"MessageAttributes"`
}
// try to unmarshal the message attributes and extract the trace context from there
if err := json.Unmarshal([]byte(body), &payload); err == nil {
if spCtx, err := sensor.Tracer().Extract(
opentracing.TextMap,
SNSMessageAttributesCarrier(map[string]*sns.MessageAttributeValue(payload.MessageAttributes)),
); err == nil {
opts = append(opts, opentracing.ChildOf(spCtx))
}
}
}
sp := sensor.Tracer().StartSpan("sqs", opts...)
if msg.MessageAttributes == nil {
msg.MessageAttributes = make(map[string]*sqs.MessageAttributeValue)
}
sp.Tracer().Inject(
sp.Context(),
opentracing.TextMap,
SQSMessageAttributesCarrier(msg.MessageAttributes),
)
return sp
}