Skip to content

Commit

Permalink
stats/opentelemetry: CSM Observability client side component changes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored May 23, 2024
1 parent 092e793 commit f7d3d3e
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 37 deletions.
106 changes: 85 additions & 21 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"google.golang.org/grpc"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)

type clientStatsHandler struct {
Expand All @@ -49,11 +51,11 @@ func (csh *clientStatsHandler) initializeMetrics() {

setOfMetrics := csh.o.MetricsOptions.Metrics.metrics

csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -63,6 +65,15 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}

startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
csh.perCallMetrics(ctx, err, startTime, ci)
Expand Down Expand Up @@ -98,6 +109,16 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
method: csh.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}

startTime := time.Now()

callback := func(err error) {
Expand All @@ -110,7 +131,7 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
s := status.Convert(err)
callLatency := float64(time.Since(startTime)) / float64(time.Second)
csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code()))))
csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
}

// TagConn exists to satisfy stats.Handler.
Expand All @@ -123,11 +144,24 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

// TagRPC implements per RPC attempt context management.
func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
mi := &metricsInfo{ // populates information about RPC start.
// Numerous stats handlers can be used for the same channel. The cluster
// impl balancer which writes to this will only write once, thus have this
// stats handler's per attempt scoped context point to the same optional
// labels map if set.
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
ri := &rpcInfo{
mi: mi,
ai: ai,
}
return setRPCInfo(ctx, ri)
}
Expand All @@ -138,10 +172,10 @@ func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats)
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
csh.processRPCEvent(ctx, rs, ri.mi)
csh.processRPCEvent(ctx, rs, ri.ai)
}

func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.Begin:
ci := getCallInfo(ctx)
Expand All @@ -150,34 +184,64 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
return
}

csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target)))
csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
case *stats.OutPayload:
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
case *stats.InHeader:
csh.setLabelsFromPluginOption(ai, st.Header)
case *stats.InTrailer:
csh.setLabelsFromPluginOption(ai, st.Trailer)
case *stats.End:
csh.processRPCEnd(ctx, mi, st)
csh.processRPCEnd(ctx, ai, st)
default:
}
}

func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
func (csh *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
if ai.pluginOptionLabels == nil && csh.o.MetricsOptions.pluginOption != nil {
labels := csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
ai.pluginOptionLabels = labels
}
}

func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
ci := getCallInfo(ctx)
if ci == nil {
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
return
}
latency := float64(time.Since(mi.startTime)) / float64(time.Second)
latency := float64(time.Since(ai.startTime)) / float64(time.Second)
st := "OK"
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
}

clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st))
attributes := []otelattribute.KeyValue{
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", st),
}

for k, v := range ai.pluginOptionLabels {
attributes = append(attributes, otelattribute.String(k, v))
}

for _, o := range csh.o.MetricsOptions.OptionalLabels {
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
}

clientAttributeOption := otelmetric.WithAttributes(attributes...)
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption)
}

const (
Expand Down
17 changes: 14 additions & 3 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -126,6 +127,13 @@ type MetricsOptions struct {
// grpc.StaticMethodCallOption as a call option into Invoke or NewStream.
// This only applies for server side metrics.
MethodAttributeFilter func(string) bool

// OptionalLabels are labels received from LB Policies that this component
// should add to metrics that record after receiving incoming metadata.
OptionalLabels []string

// pluginOption is used to get labels to attach to certain metrics, if set.
pluginOption otelinternal.PluginOption
}

// DialOption returns a dial option which enables OpenTelemetry instrumentation
Expand Down Expand Up @@ -187,7 +195,7 @@ func getCallInfo(ctx context.Context) *callInfo {
// rpcInfo is RPC information scoped to the RPC attempt life span client side,
// and the RPC life span server side.
type rpcInfo struct {
mi *metricsInfo
ai *attemptInfo
}

type rpcInfoKey struct{}
Expand All @@ -207,9 +215,9 @@ func removeLeadingSlash(mn string) string {
return strings.TrimLeft(mn, "/")
}

// metricsInfo is RPC information scoped to the RPC attempt life span client
// attemptInfo is RPC information scoped to the RPC attempt life span client
// side, and the RPC life span server side.
type metricsInfo struct {
type attemptInfo struct {
// access these counts atomically for hedging in the future:
// number of bytes after compression (within each message) from side (client
// || server).
Expand All @@ -220,6 +228,9 @@ type metricsInfo struct {

startTime time.Time
method string

pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
xdsLabels map[string]string
}

type clientMetrics struct {
Expand Down
26 changes: 13 additions & 13 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func (ssh *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
}
}

mi := &metricsInfo{
ai := &attemptInfo{
startTime: time.Now(),
method: removeLeadingSlash(method),
}
ri := &rpcInfo{
mi: mi,
ai: ai,
}
return setRPCInfo(ctx, ri)
}
Expand All @@ -99,35 +99,35 @@ func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats)
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
return
}
ssh.processRPCData(ctx, rs, ri.mi)
ssh.processRPCData(ctx, rs, ri.ai)
}

func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.InHeader:
ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method)))
ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ai.method)))
case *stats.OutPayload:
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
case *stats.End:
ssh.processRPCEnd(ctx, mi, st)
ssh.processRPCEnd(ctx, ai, st)
default:
}
}

func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
latency := float64(time.Since(mi.startTime)) / float64(time.Second)
func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
latency := float64(time.Since(ai.startTime)) / float64(time.Second)
st := "OK"
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
}
serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st))
serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ai.method), attribute.String("grpc.status", st))

ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption)
ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), serverAttributeOption)
}

const (
Expand Down

0 comments on commit f7d3d3e

Please sign in to comment.