Skip to content

Commit

Permalink
Merge pull request openzipkin#130 from ValeryPiashchynski/add_amqp_re…
Browse files Browse the repository at this point in the history
…porter_support

Add amqp reporter support
  • Loading branch information
jcchavezs authored Jun 18, 2019
2 parents b42a292 + e2f0d1b commit a035034
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
language: go

services:
- rabbitmq

sudo: false

matrix:
Expand Down
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ environment:
GOFLAGS: -mod=readonly

install:
- choco install rabbitmq --ignoredependencies -y
- echo %PATH%
- echo %GOPATH%
- set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ require (
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/kisielk/gotool v1.0.0 // indirect
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
golang.org/x/net v0.0.0-20190311183353-d8887717615a
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect
google.golang.org/grpc v1.20.0
Expand Down
16 changes: 3 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
Expand All @@ -43,34 +42,26 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b h1:MQE+LT/ABUuuvEZ+YQAMSXindAdUh7slEmAkup74op4=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.20.0 h1:DlsSIrgEBuZAUFJcta2B5i/lzeHHbnfkNFAfFXLVFYQ=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand All @@ -81,5 +72,4 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
201 changes: 201 additions & 0 deletions reporter/amqp/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
Package amqp implements a RabbitMq reporter to send spans to a Rabbit server/cluster.
*/
package amqp

import (
"encoding/json"
"fmt"
"log"
"os"

"github.com/streadway/amqp"

"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)

// defaultRmqRoutingKey/Exchange/Kind sets the standard RabbitMQ queue our Reporter will publish on.
const (
defaultRmqRoutingKey = "zipkin"
defaultRmqExchange = "zipkin"
defaultExchangeKind = "direct"
)

// rmqReporter implements Reporter by publishing spans to a RabbitMQ exchange
type rmqReporter struct {
e chan error
channel *amqp.Channel
conn *amqp.Connection
exchange string
queue string
logger *log.Logger
}

// ReporterOption sets a parameter for the rmqReporter
type ReporterOption func(c *rmqReporter)

// Logger sets the logger used to report errors in the collection
// process.
func Logger(logger *log.Logger) ReporterOption {
return func(c *rmqReporter) {
c.logger = logger
}
}

// Exchange sets the Exchange used to send messages (
// see https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/rabbitmq
// if want to change default routing key or exchange
func Exchange(exchange string) ReporterOption {
return func(c *rmqReporter) {
c.exchange = exchange
}
}

// Queue sets the Queue used to send messages
func Queue(queue string) ReporterOption {
return func(c *rmqReporter) {
c.queue = queue
}
}

// Channel sets the Channel used to send messages
func Channel(ch *amqp.Channel) ReporterOption {
return func(c *rmqReporter) {
c.channel = ch
}
}

// Connection sets the Connection used to send messages
func Connection(conn *amqp.Connection) ReporterOption {
return func(c *rmqReporter) {
c.conn = conn
}
}

// NewReporter returns a new RabbitMq-backed Reporter. address should be as described here: https://www.rabbitmq.com/uri-spec.html
func NewReporter(address string, options ...ReporterOption) (reporter.Reporter, error) {
r := &rmqReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
queue: defaultRmqRoutingKey,
exchange: defaultRmqExchange,
e: make(chan error),
}

for _, option := range options {
option(r)
}

checks := []func() error{
r.queueVerify,
r.exchangeVerify,
r.queueBindVerify,
}

var err error

if r.conn == nil {
r.conn, err = amqp.Dial(address)
if err != nil {
return nil, err
}
}

if r.channel == nil {
r.channel, err = r.conn.Channel()
if err != nil {
return nil, err
}
}

for i := 0; i < len(checks); i++ {
if err := checks[i](); err != nil {
return nil, err
}
}

go r.logErrors()

return r, nil
}

func (r *rmqReporter) logErrors() {
for err := range r.e {
r.logger.Print("msg", err.Error())
}
}

func (r *rmqReporter) Send(s model.SpanModel) {
// Zipkin expects the message to be wrapped in an array
ss := []model.SpanModel{s}
m, err := json.Marshal(ss)
if err != nil {
r.e <- fmt.Errorf("failed when marshalling the span: %s\n", err.Error())
return
}

msg := amqp.Publishing{
Body: m,
}

err = r.channel.Publish(defaultRmqExchange, defaultRmqRoutingKey, false, false, msg)
if err != nil {
r.e <- fmt.Errorf("failed when publishing the span: %s\n", err.Error())
}
}

func (r *rmqReporter) queueBindVerify() error {
return r.channel.QueueBind(
defaultRmqRoutingKey,
defaultRmqRoutingKey,
defaultRmqExchange,
false,
nil)
}

func (r *rmqReporter) exchangeVerify() error {
err := r.channel.ExchangeDeclare(
defaultRmqExchange,
defaultExchangeKind,
true,
false,
false,
false,
nil,
)

if err != nil {
return err
}

return nil
}

func (r *rmqReporter) queueVerify() error {
_, err := r.channel.QueueDeclare(
defaultRmqExchange,
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}

return nil
}

func (r *rmqReporter) Close() error {
err := r.channel.Close()
if err != nil {
return err
}

err = r.conn.Close()
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit a035034

Please sign in to comment.