Skip to content

Commit

Permalink
Responded to Doug's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jul 24, 2024
1 parent 5d559bb commit d508527
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
6 changes: 3 additions & 3 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
// The defaults for all the per call optional labels that this
// OpenTelemetry component currently supports.
// The defaults for all the per call label from a plugin that
// executes on callpath that this OpenTelemetry component currently
// supports.
TelemetryLabels: map[string]string{
"grpc.lb.locality": "",
},
Expand Down Expand Up @@ -240,7 +241,6 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
// CSM Plugin Option layer by adding an optional labels API.
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
continue
}
}

Expand Down
20 changes: 10 additions & 10 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/structpb"
)

Expand All @@ -44,7 +45,7 @@ const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

const localityKey = "grpc.lb.locality"
const localityValue = "{\"region\":\"region-1\",\"zone\":\"zone-1\",\"subZone\":\"subzone-1\"}"
const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
Expand Down Expand Up @@ -129,14 +130,13 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// aren't started. All of these should have access to the desired telemetry
// labels.
case *stats.OutPayload, *stats.InPayload, *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKeyCSM]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKeyCSM, serviceNameValue, label)
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label)
want := map[string]string{
serviceNameKeyCSM: serviceNameValue,
serviceNamespaceKeyCSM: serviceNamespaceValue,
localityKey: localityValue,
}
if label, ok := fsh.labels.TelemetryLabels[localityKey]; !ok || label != localityValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", localityKey, localityValue, label)
if diff := cmp.Diff(fsh.labels.TelemetryLabels, want); diff != "" {
fsh.t.Fatalf("fsh.labels.TelemetryLabels (-got +want): %v", diff)
}
default:
// Nothing to assert for the other stats.Handler callouts.
Expand Down
27 changes: 18 additions & 9 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package clusterimpl

import (
"context"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -96,14 +98,23 @@ func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker {
}
}

func telemetryLabels(ctx context.Context) map[string]string {
if ctx == nil {
return nil
}
labels := stats.GetLabels(ctx)
if labels == nil {
return nil
}
return labels.TelemetryLabels
}

func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Unconditionally set labels if present, even dropped or queued RPC's can
// use these labels.
if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
for key, value := range d.telemetryLabels {
labels.TelemetryLabels[key] = value
}
if labels := telemetryLabels(info.Ctx); labels != nil {
for key, value := range d.telemetryLabels {
labels[key] = value
}
}

Expand Down Expand Up @@ -156,10 +167,8 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return pr, err
}

if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
labels.TelemetryLabels["grpc.lb.locality"] = lIDStr
}
if labels := telemetryLabels(info.Ctx); labels != nil {
labels["grpc.lb.locality"] = lIDStr
}

if d.loadStore != nil {
Expand Down

0 comments on commit d508527

Please sign in to comment.