Skip to content

Commit

Permalink
fix(reporter/kafka): fixes the SpanSerializer option usage. (openzipk…
Browse files Browse the repository at this point in the history
  • Loading branch information
jcchavezs authored Nov 4, 2019
1 parent c29478e commit bafd576
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
5 changes: 2 additions & 3 deletions reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Package kafka implements a Kafka reporter to send spans to a Kafka server/cluste
package kafka

import (
"encoding/json"
"log"
"os"

Expand Down Expand Up @@ -109,8 +108,8 @@ func (r *kafkaReporter) logErrors() {

func (r *kafkaReporter) Send(s model.SpanModel) {
// Zipkin expects the message to be wrapped in an array
ss := []model.SpanModel{s}
m, err := json.Marshal(ss)
ss := []*model.SpanModel{&s}
m, err := r.serializer.Serialize(ss)
if err != nil {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
Expand Down
23 changes: 16 additions & 7 deletions reporter/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ var spans = []*model.SpanModel{
makeNewSpan("div", 123, 101112, 456, true),
}

func jsonDeserializer(body []byte) ([]*model.SpanModel, error) {
spans := []*model.SpanModel{}
err := json.Unmarshal(body, &spans)
return spans, err
}

func protoDeserializer(body []byte) ([]*model.SpanModel, error) {
spans, err := zipkin_proto3.ParseSpans(body, true)
return spans, err
}

func TestKafkaProduce(t *testing.T) {
p := newStubProducer(false)
c, err := kafka.NewReporter(
Expand All @@ -76,7 +87,7 @@ func TestKafkaProduce(t *testing.T) {
for _, want := range spans {
m := sendSpan(t, c, p, *want)
testMetadata(t, m)
have := deserializeSpan(t, m.Value)
have := deserializeSpan(t, m.Value, jsonDeserializer)
testEqual(t, want, have)
}
}
Expand All @@ -95,7 +106,7 @@ func TestKafkaProduceProto(t *testing.T) {
for _, want := range spans {
m := sendSpan(t, c, p, *want)
testMetadata(t, m)
have := deserializeSpan(t, m.Value)
have := deserializeSpan(t, m.Value, protoDeserializer)
testEqual(t, want, have)
}
}
Expand Down Expand Up @@ -208,21 +219,19 @@ func testMetadata(t *testing.T, m *sarama.ProducerMessage) {
}
}

func deserializeSpan(t *testing.T, e sarama.Encoder) *model.SpanModel {
func deserializeSpan(t *testing.T, e sarama.Encoder, deserializer func([]byte) ([]*model.SpanModel, error)) *model.SpanModel {
bytes, err := e.Encode()
if err != nil {
t.Errorf("unexpected error in encoding: %v", err)
}

var s []model.SpanModel

err = json.Unmarshal(bytes, &s)
s, err := deserializer(bytes)
if err != nil {
t.Errorf("unexpected error in decoding: %v", err)
return nil
}

return &s[0]
return s[0]
}

func testEqual(t *testing.T, want *model.SpanModel, have *model.SpanModel) {
Expand Down

0 comments on commit bafd576

Please sign in to comment.