Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(grpc): adds support for grpc parsing. #177

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(grpc): adds support for grpc parsing.
  • Loading branch information
jcchavezs committed Sep 9, 2020
commit 1c762e0846bddf86a9b5d0531dfa855ea7456182
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
Expand Down Expand Up @@ -88,6 +89,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
27 changes: 26 additions & 1 deletion middleware/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type clientHandler struct {
tracer *zipkin.Tracer
remoteServiceName string
handleRPCParser handleRPCParser
}

// A ClientOption can be passed to NewClientHandler to customize the returned handler.
Expand All @@ -41,6 +42,30 @@ func WithRemoteServiceName(name string) ClientOption {
}
}

// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inHeader = parser
}
}

// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails.
Expand All @@ -67,7 +92,7 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, c.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
38 changes: 37 additions & 1 deletion middleware/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpc_test
import (
"context"
"errors"
"log"
"net"
"testing"

Expand All @@ -26,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
Expand Down Expand Up @@ -121,7 +123,7 @@ func (g *sequentialIdGenerator) reset() {
g.nextSpanId = g.start
}

type TestHelloService struct{
type TestHelloService struct {
service.UnimplementedHelloServiceServer
}

Expand Down Expand Up @@ -158,3 +160,37 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)

return resp, nil
}

func initListener(s *grpc.Server) func(context.Context, string) (net.Conn, error) {
const bufSize = 1024 * 1024

listener := bufconn.Listen(bufSize)
bufDialer := func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}

go func() {
if err := s.Serve(listener); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()

return bufDialer
}

func createTracer(joinSpans bool) (*zipkin.Tracer, func() []model.SpanModel) {
recorder := recorder.NewReporter()
ep, _ := zipkin.NewEndpoint("grpc-server", "")

serverIdGenerator = newSequentialIdGenerator(0x1000000)

tracer, _ := zipkin.NewTracer(
recorder,
zipkin.WithLocalEndpoint(ep),
zipkin.WithSharedSpans(joinSpans),
zipkin.WithIDGenerator(serverIdGenerator),
)
return tracer, func() []model.SpanModel {
return recorder.Flush()
}
}
31 changes: 28 additions & 3 deletions middleware/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
)

type serverHandler struct {
tracer *zipkin.Tracer
defaultTags map[string]string
tracer *zipkin.Tracer
defaultTags map[string]string
handleRPCParser handleRPCParser
}

// A ServerOption can be passed to NewServerHandler to customize the returned handler.
Expand All @@ -39,6 +40,30 @@ func ServerTags(tags map[string]string) ServerOption {
}
}

// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inHeader = parser
}
}

// NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC server. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that
Expand Down Expand Up @@ -66,7 +91,7 @@ func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, s.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
138 changes: 138 additions & 0 deletions middleware/grpc/server_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2019 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_test

import (
"context"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
"github.com/openzipkin/zipkin-go/model"
service "github.com/openzipkin/zipkin-go/proto/testing"
)

func TestGRPCServerCreatesASpanAndContext(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}

func TestGRPCServerCanAccessToHeaders(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
}
}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value")
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}
20 changes: 19 additions & 1 deletion middleware/grpc/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
"github.com/openzipkin/zipkin-go/model"
)

type handleRPCParser struct {
inPayload func(*stats.InPayload, zipkin.Span)
inTrailer func(*stats.InTrailer, zipkin.Span)
inHeader func(*stats.InHeader, zipkin.Span)
}

// A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of
// a handler for additional span customization.
type RPCHandler func(span zipkin.Span, rpcStats stats.RPCStats)
Expand All @@ -37,10 +43,22 @@ func spanName(rti *stats.RPCTagInfo) string {
return name
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) {
span := zipkin.SpanFromContext(ctx)

switch rs := rs.(type) {
case *stats.InPayload:
if h.inPayload != nil {
h.inPayload(rs, span)
}
case *stats.InHeader:
if h.inHeader != nil {
h.inHeader(rs, span)
}
case *stats.InTrailer:
if h.inTrailer != nil {
h.inTrailer(rs, span)
}
case *stats.End:
s, ok := status.FromError(rs.Error)
// rs.Error should always be convertable to a status, this is just a defensive check.
Expand Down