From f8d04ccb4528da8aad8b8d1e0af82b727bc5aa31 Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:25:32 -0400 Subject: [PATCH] stats/opentelemetry: Add e2e testing for CSM Observability (#7279) --- stats/opentelemetry/client_metrics.go | 15 +- stats/opentelemetry/csm/observability_test.go | 611 ++++++++++++++ stats/opentelemetry/e2e_test.go | 426 ++-------- stats/opentelemetry/example_test.go | 5 - .../internal/testutils/testutils.go | 796 ++++++++++++++++++ stats/opentelemetry/opentelemetry.go | 6 - 6 files changed, 1475 insertions(+), 384 deletions(-) create mode 100644 stats/opentelemetry/csm/observability_test.go create mode 100644 stats/opentelemetry/internal/testutils/testutils.go diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index cbc4e36fdcbe..8654132153fd 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -63,7 +63,7 @@ func (h *clientStatsHandler) initializeMetrics() { func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ci := &callInfo{ - target: h.determineTarget(cc), + target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) @@ -83,17 +83,6 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string return err } -// determineTarget determines the target to record attributes with. This will be -// "other" if target filter is set and specifies, the target name as is -// otherwise. -func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string { - target := cc.CanonicalTarget() - if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) { - target = "other" - } - return target -} - // determineMethod determines the method to record attributes with. This will be // "other" if StaticMethod isn't specified or if method filter is set and // specifies, the method name as is otherwise. @@ -108,7 +97,7 @@ func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOpt func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ci := &callInfo{ - target: h.determineTarget(cc), + target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go new file mode 100644 index 000000000000..efa97199f530 --- /dev/null +++ b/stats/opentelemetry/csm/observability_test.go @@ -0,0 +1,611 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package csm + +import ( + "context" + "errors" + "io" + "os" + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + istats "google.golang.org/grpc/internal/stats" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/bootstrap" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats/opentelemetry" + "google.golang.org/grpc/stats/opentelemetry/internal/testutils" +) + +// setupEnv configures the environment for CSM Observability Testing. It sets +// the bootstrap env var to a bootstrap file with a nodeID provided. It sets CSM +// Env Vars as well, and mocks the resource detector's returned attribute set to +// simulate the environment. It registers a cleanup function on the provided t +// to restore the environment to it's original state. +func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID, csmCanonicalServiceName, csmWorkloadName string) { + cleanup, err := bootstrap.CreateFile(bootstrap.Options{ + NodeID: nodeID, + ServerURI: "xds_server_uri", + }) + if err != nil { + t.Fatalf("Failed to create bootstrap: %v", err) + } + oldCSMCanonicalServiceName, csmCanonicalServiceNamePresent := os.LookupEnv("CSM_CANONICAL_SERVICE_NAME") + oldCSMWorkloadName, csmWorkloadNamePresent := os.LookupEnv("CSM_WORKLOAD_NAME") + os.Setenv("CSM_CANONICAL_SERVICE_NAME", csmCanonicalServiceName) + os.Setenv("CSM_WORKLOAD_NAME", csmWorkloadName) + + var attributes []attribute.KeyValue + for k, v := range resourceDetectorEmissions { + attributes = append(attributes, attribute.String(k, v)) + } + // Return the attributes configured as part of the test in place + // of reading from resource. + attrSet := attribute.NewSet(attributes...) + origGetAttrSet := getAttrSetFromResourceDetector + getAttrSetFromResourceDetector = func(context.Context) *attribute.Set { + return &attrSet + } + t.Cleanup(func() { + cleanup() + if csmCanonicalServiceNamePresent { + os.Setenv("CSM_CANONICAL_SERVICE_NAME", oldCSMCanonicalServiceName) + } else { + os.Unsetenv("CSM_CANONICAL_SERVICE_NAME") + } + if csmWorkloadNamePresent { + os.Setenv("CSM_WORKLOAD_NAME", oldCSMWorkloadName) + } else { + os.Unsetenv("CSM_WORKLOAD_NAME") + } + + getAttrSetFromResourceDetector = origGetAttrSet + }) +} + +// TestCSMPluginOptionUnary tests the CSM Plugin Option and labels. It +// configures the environment for the CSM Plugin Option to read from. It then +// configures a system with a gRPC Client and gRPC server with the OpenTelemetry +// Dial and Server Option configured with a CSM Plugin Option with a certain +// unary handler set to induce different ways of setting metadata exchange +// labels, and makes a Unary RPC. This RPC should cause certain recording for +// each registered metric observed through a Manual Metrics Reader on the +// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the +// plugin option should be attached to the relevant metrics. +func (s) TestCSMPluginOptionUnary(t *testing.T) { + resourceDetectorEmissions := map[string]string{ + "cloud.platform": "gcp_kubernetes_engine", + "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location + "cloud.account.id": "cloud_account_id_val", + "k8s.namespace.name": "k8s_namespace_name_val", + "k8s.cluster.name": "k8s_cluster_name_val", + } + const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + const csmCanonicalServiceName = "csm_canonical_service_name" + const csmWorkloadName = "csm_workload_name" + setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) + + attributesWant := map[string]string{ + "csm.workload_canonical_service": csmCanonicalServiceName, // from env + "csm.mesh_id": "mesh_id", // from bootstrap env var + + // No xDS Labels - this happens in a test below. + + "csm.remote_workload_type": "gcp_kubernetes_engine", + "csm.remote_workload_canonical_service": csmCanonicalServiceName, + "csm.remote_workload_project_id": "cloud_account_id_val", + "csm.remote_workload_cluster_name": "k8s_cluster_name_val", + "csm.remote_workload_namespace_name": "k8s_namespace_name_val", + "csm.remote_workload_location": "cloud_region_val", + "csm.remote_workload_name": csmWorkloadName, + } + + var csmLabels []attribute.KeyValue + for k, v := range attributesWant { + csmLabels = append(csmLabels, attribute.String(k, v)) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tests := []struct { + name string + // To test the different operations for Unary RPC's from the interceptor + // level that can plumb metadata exchange header in. + unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + opts testutils.MetricDataOptions + }{ + { + name: "normal-flow", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + { + name: "trailers-only", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return nil, errors.New("some error") // return an error and no message - this triggers trailers only - no messages or headers sent + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCallFailed: true, + }, + }, + { + name: "set-header", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + { + name: "send-header", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + { + name: "send-msg", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{UnaryCallF: test.unaryCallFunc} + po := newPluginOption(ctx) + sopts := []grpc.ServerOption{ + serverOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + }}, po), + } + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, + }, + }, po)} + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + var request *testpb.SimpleRequest + if test.opts.UnaryCompressedMessageSize != 0 { + request = &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }} + } + // Make a Unary RPC. These should cause certain metrics to be + // emitted, which should be able to be observed through the Metric + // Reader. + ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name)) + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + opts := test.opts + opts.Target = ss.Target + wantMetrics := testutils.MetricDataUnary(opts) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + }) + } +} + +// TestCSMPluginOptionStreaming tests the CSM Plugin Option and labels. It +// configures the environment for the CSM Plugin Option to read from. It then +// configures a system with a gRPC Client and gRPC server with the OpenTelemetry +// Dial and Server Option configured with a CSM Plugin Option with a certain +// streaming handler set to induce different ways of setting metadata exchange +// labels, and makes a Streaming RPC. This RPC should cause certain recording +// for each registered metric observed through a Manual Metrics Reader on the +// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the +// plugin option should be attached to the relevant metrics. +func (s) TestCSMPluginOptionStreaming(t *testing.T) { + resourceDetectorEmissions := map[string]string{ + "cloud.platform": "gcp_kubernetes_engine", + "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location + "cloud.account.id": "cloud_account_id_val", + "k8s.namespace.name": "k8s_namespace_name_val", + "k8s.cluster.name": "k8s_cluster_name_val", + } + const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + const csmCanonicalServiceName = "csm_canonical_service_name" + const csmWorkloadName = "csm_workload_name" + setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) + + attributesWant := map[string]string{ + "csm.workload_canonical_service": csmCanonicalServiceName, // from env + "csm.mesh_id": "mesh_id", // from bootstrap env var + + // No xDS Labels - this happens in a test below. + + "csm.remote_workload_type": "gcp_kubernetes_engine", + "csm.remote_workload_canonical_service": csmCanonicalServiceName, + "csm.remote_workload_project_id": "cloud_account_id_val", + "csm.remote_workload_cluster_name": "k8s_cluster_name_val", + "csm.remote_workload_namespace_name": "k8s_namespace_name_val", + "csm.remote_workload_location": "cloud_region_val", + "csm.remote_workload_name": csmWorkloadName, + } + + var csmLabels []attribute.KeyValue + for k, v := range attributesWant { + csmLabels = append(csmLabels, attribute.String(k, v)) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tests := []struct { + name string + // To test the different operations for Streaming RPC's from the + // interceptor level that can plumb metadata exchange header in. + streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error + opts testutils.MetricDataOptions + }{ + { + name: "trailers-only", + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + if _, err := stream.Recv(); err == io.EOF { + return nil + } + } + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + }, + }, + { + name: "set-header", + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + for { + if _, err := stream.Recv(); err == io.EOF { + return nil + } + } + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + }, + }, + { + name: "send-header", + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.SendHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + for { + if _, err := stream.Recv(); err == io.EOF { + return nil + } + } + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + }, + }, + { + name: "send-msg", + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.Send(&testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}) + for { + if _, err := stream.Recv(); err == io.EOF { + return nil + } + } + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + StreamingCompressedMessageSize: float64(57), + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{FullDuplexCallF: test.streamingCallFunc} + po := newPluginOption(ctx) + sopts := []grpc.ServerOption{ + serverOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + }}, po), + } + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, + }, + }, po)} + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + stream, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + if test.opts.StreamingCompressedMessageSize != 0 { + if err := stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("stream.Send failed") + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("stream.Recv failed with error: %v", err) + } + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + opts := test.opts + opts.Target = ss.Target + wantMetrics := testutils.MetricDataStreaming(opts) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + }) + } +} + +func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = istats.SetLabels(ctx, &istats.Labels{ + TelemetryLabels: map[string]string{ + // mock what the cluster impl would write here ("csm." xDS Labels) + "csm.service_name": "service_name_val", + "csm.service_namespace_name": "service_namespace_val", + }, + }) + + // TagRPC will just see this in the context and set it's xDS Labels to point + // to this map on the heap. + return invoker(ctx, method, req, reply, cc, opts...) +} + +// TestXDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics. +// This test configures OpenTelemetry with the CSM Plugin Option, and xDS +// Optional Labels turned on. It then configures an interceptor to attach +// labels, representing the cluster_impl picker. It then makes a unary RPC, and +// expects xDS Labels labels to be attached to emitted relevant metrics. Full +// xDS System alongside OpenTelemetry will be tested with interop. (there is +// a test for xDS -> Stats handler and this tests -> OTel -> emission). +func (s) TestXDSLabels(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + } + + po := newPluginOption(ctx) + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, + }, + }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)} + if err := ss.Start(nil, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + + defer ss.Stop() + ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, grpc.UseCompressor(gzip.Name)) + + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + targetAttr := attribute.String("grpc.target", ss.Target) + unaryStatusAttr := attribute.String("grpc.status", "OK") + + serviceNameAttr := attribute.String("csm.service_name", "service_name_val") + serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val") + meshIDAttr := attribute.String("csm.mesh_id", "unknown") + workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown") + remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown") + remoteWorkloadCanonicalServiceAttr := attribute.String("csm.remote_workload_canonical_service", "unknown") + + unaryMethodClientSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + targetAttr, + unaryStatusAttr, + serviceNameAttr, + serviceNamespaceAttr, + meshIDAttr, + workloadCanonicalServiceAttr, + remoteWorkloadTypeAttr, + remoteWorkloadCanonicalServiceAttr, + } + + unaryCompressedBytesSentRecv := int64(57) // Fixed 10000 bytes with gzip assumption. + unaryBucketCounts := []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + unaryExtrema := metricdata.NewExtrema(int64(57)) + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, // Doesn't have xDS Labels, CSM Labels start from header or trailer from server, whichever comes first, so this doesn't need it + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testutils.DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testutils.DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr), + Count: 1, + Bounds: testutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } + + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) +} + +// TestObservability tests that Observability global function compiles and runs +// without error. The actual functionality of this function will be verified in +// interop tests. +func (s) TestObservability(t *testing.T) { + cleanup := EnableObservability(context.Background(), opentelemetry.Options{}) + cleanup() +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index c0850d6219b7..aa4734741dd8 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -14,11 +14,10 @@ * limitations under the License. */ -package opentelemetry +package opentelemetry_test import ( "context" - "fmt" "io" "testing" "time" @@ -29,6 +28,8 @@ import ( "google.golang.org/grpc/internal/stubserver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/stats/opentelemetry" + "google.golang.org/grpc/stats/opentelemetry/internal/testutils" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" @@ -46,35 +47,10 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// waitForServerCompletedRPCs waits until the unary and streaming stats.End -// calls are finished processing. It does this by waiting for the expected -// metric triggered by stats.End to appear through the passed in metrics reader. -func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) { - for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - rm := &metricdata.ResourceMetrics{} - reader.Collect(ctx, rm) - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m - } - } - val, ok := gotMetrics[wantMetric.Name] - if !ok { - continue - } - if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - continue - } - return gotMetrics, nil - } - return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) -} - // setup creates a stub server with OpenTelemetry component configured on client // and server side. It returns a reader for metrics emitted from OpenTelemetry // component and the server. -func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() provider := metric.NewMeterProvider( metric.WithReader(reader), @@ -82,7 +58,7 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade ss := &stubserver.StubServer{ UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { @@ -94,24 +70,16 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade } }, } - var taf func(string) bool - if tafOn { - taf = func(str string) bool { - return str != ss.Target - } - } - if err := ss.Start([]grpc.ServerOption{ServerOption(Options{ - MetricsOptions: MetricsOptions{ - MeterProvider: provider, - Metrics: DefaultMetrics, - TargetAttributeFilter: taf, - MethodAttributeFilter: maf, - }})}, DialOption(Options{ - MetricsOptions: MetricsOptions{ + + if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: DefaultMetrics, - TargetAttributeFilter: taf, - MethodAttributeFilter: maf, + Metrics: opentelemetry.DefaultMetrics, + MethodAttributeFilter: methodAttributeFilter, + }})}, opentelemetry.DialOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, }, })); err != nil { t.Fatalf("Error starting endpoint server: %v", err) @@ -119,20 +87,19 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade return reader, ss } -// TestMethodTargetAttributeFilter tests the method and target attribute filter. -// The method and target filter set should bucket the grpc.method/grpc.target -// attribute into "other" if filter specifies. -func (s) TestMethodTargetAttributeFilter(t *testing.T) { +// TestMethodAttributeFilter tests the method attribute filter. The method +// filter set should bucket the grpc.method attribute into "other" if the method +// attribute filter specifies. +func (s) TestMethodAttributeFilter(t *testing.T) { maf := func(str string) bool { // Will allow duplex/any other type of RPC. - return str != "/grpc.testing.TestService/UnaryCall" + return str != testpb.TestService_UnaryCall_FullMethodName } - // pull out setup into a helper - reader, ss := setup(t, true, maf) + reader, ss := setup(t, maf) defer ss.Stop() - // make a single RPC (unary rpc), and filter out the target and method - // that would correspond. + // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the + // method attribute filter, and the Full Duplex (Streaming) RPC should not. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ @@ -147,10 +114,16 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } wantMetrics := []metricdata.Metrics{ { @@ -160,11 +133,11 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", "other")), + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", ss.Target)), Value: 1, }, { - Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", "other")), + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", ss.Target)), Value: 1, }, }, @@ -172,54 +145,29 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { IsMonotonic: true, }, }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { // Method should go to "other" due to the method attribute filter. + Attributes: attribute.NewSet(attribute.String("grpc.method", "other"), attribute.String("grpc.status", "OK")), + Count: 1, + Bounds: testutils.DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.status", "OK")), + Count: 1, + Bounds: testutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, } - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m - } - } - - for _, metric := range wantMetrics { - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) - } - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - } -} -// assertDataPointWithinFiveSeconds asserts the metric passed in contains -// a histogram with dataPoints that fall within buckets that are <=5. -func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { - histo, ok := metric.Data.(metricdata.Histogram[float64]) - if !ok { - return fmt.Errorf("metric data is not histogram") - } - for _, dataPoint := range histo.DataPoints { - var boundWithFive int - for i, bucket := range dataPoint.Bounds { - if bucket >= 5 { - boundWithFive = i - } - } - foundPoint := false - for i, bucket := range dataPoint.BucketCounts { - if i >= boundWithFive { - return fmt.Errorf("data point not found in bucket <=5 seconds") - } - if bucket == 1 { - foundPoint = true - break - } - } - if !foundPoint { - return fmt.Errorf("no data point found for metric") - } - } - return nil + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) } // TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry @@ -232,7 +180,7 @@ func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setup(t, false, nil) + reader, ss := setup(t, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -251,7 +199,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } rm := &metricdata.ResourceMetrics{} @@ -264,257 +212,11 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } } - unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") - duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") - - targetAttr := attribute.String("grpc.target", ss.Target) - statusAttr := attribute.String("grpc.status", "OK") - - wantMetrics := []metricdata.Metrics{ - { - Name: "grpc.client.attempt.started", - Description: "Number of client call attempts started.", - Unit: "attempt", - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), - Value: 1, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), - Value: 1, - }, - }, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - }, - }, - { - Name: "grpc.client.attempt.duration", - Description: "End-to-end time taken to complete a client call attempt.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.attempt.sent_total_compressed_message_size", - Description: "Compressed message bytes sent per client call attempt.", - Unit: "By", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.attempt.rcvd_total_compressed_message_size", - Description: "Compressed message bytes received per call attempt.", - Unit: "By", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.call.duration", - Description: "Time taken by gRPC to complete an RPC from application's perspective.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.started", - Description: "Number of server calls started.", - Unit: "call", - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr), - Value: 1, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr), - Value: 1, - }, - }, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - }, - }, - { - Name: "grpc.server.call.sent_total_compressed_message_size", - Unit: "By", - Description: "Compressed message bytes sent per server call.", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.rcvd_total_compressed_message_size", - Unit: "By", - Description: "Compressed message bytes received per server call.", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.duration", - Description: "End-to-end time taken to complete a call from server transport's perspective.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - } - - for _, metric := range wantMetrics { - if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { - // Sync the metric reader to see the event because stats.End is - // handled async server side. Thus, poll until metrics created from - // stats.End show up. - if gotMetrics, err = waitForServerCompletedRPCs(ctx, reader, metric, t); err != nil { - t.Fatalf("error waiting for sent total compressed message size for metric: %v", metric.Name) - } - continue - } - - // If one of the duration metrics, ignore the bucket counts, and make - // sure it count falls within a bucket <= 5 seconds (maximum duration of - // test due to context). - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) - } - if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - if err := assertDataPointWithinFiveSeconds(val); err != nil { - t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) - } - continue - } - - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - } + wantMetrics := testutils.MetricData(testutils.MetricDataOptions{ + Target: ss.Target, + UnaryCompressedMessageSize: float64(57), + }) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) stream, err = ss.Client.FullDuplexCall(ctx) if err != nil { @@ -523,7 +225,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } // This Invoke doesn't pass the StaticMethodCallOption. Thus, the method // attribute should become "other" on client side metrics. Since it is also @@ -541,6 +243,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { gotMetrics[m.Name] = m } } + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + + targetAttr := attribute.String("grpc.target", ss.Target) otherMethodAttr := attribute.String("grpc.method", "other") wantMetrics = []metricdata.Metrics{ { @@ -593,10 +299,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { for _, metric := range wantMetrics { val, ok := gotMetrics[metric.Name] if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) } if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } } } diff --git a/stats/opentelemetry/example_test.go b/stats/opentelemetry/example_test.go index 607bcf55e0a0..b4d6755b2299 100644 --- a/stats/opentelemetry/example_test.go +++ b/stats/opentelemetry/example_test.go @@ -17,8 +17,6 @@ package opentelemetry_test import ( - "strings" - "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/stats/opentelemetry" @@ -55,9 +53,6 @@ func Example_dialOption() { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty - TargetAttributeFilter: func(str string) bool { - return !strings.HasPrefix(str, "dns") // Filter out DNS targets. - }, }, } do := opentelemetry.DialOption(opts) diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go new file mode 100644 index 000000000000..10856409eeaf --- /dev/null +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -0,0 +1,796 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package testutils contains helpers for OpenTelemetry tests. +package testutils + +import ( + "context" + "fmt" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +// Redefine default bounds here to avoid a cyclic dependency with top level +// opentelemetry package. Could define once through internal, but would make +// external opentelemetry godoc less readable. +var ( + // DefaultLatencyBounds are the default bounds for latency metrics. + DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} + // DefaultSizeBounds are the default bounds for metrics which record size. + DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} +) + +// waitForServerCompletedRPCs waits until the unary and streaming stats.End +// calls are finished processing. It does this by waiting for the expected +// metric triggered by stats.End to appear through the passed in metrics reader. +// +// Returns a new gotMetrics map containing the metric data being polled for, or +// an error if failed to wait for metric. +func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + val, ok := gotMetrics[wantMetric.Name] + if !ok { + continue + } + metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + return gotMetrics, nil + } + return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) +} + +// checkDataPointWithinFiveSeconds checks if the metric passed in contains a +// histogram with dataPoints that fall within buckets that are <=5. Returns an +// error if check fails. +func checkDataPointWithinFiveSeconds(metric metricdata.Metrics) error { + histo, ok := metric.Data.(metricdata.Histogram[float64]) + if !ok { + return fmt.Errorf("metric data is not histogram") + } + for _, dataPoint := range histo.DataPoints { + var boundWithFive int + for i, bound := range dataPoint.Bounds { + if bound >= 5 { + boundWithFive = i + } + } + foundPoint := false + for i, count := range dataPoint.BucketCounts { + if i >= boundWithFive { + return fmt.Errorf("data point not found in bucket <=5 seconds") + } + if count == 1 { + foundPoint = true + break + } + } + if !foundPoint { + return fmt.Errorf("no data point found for metric") + } + } + return nil +} + +// MetricDataOptions are the options used to configure the metricData emissions +// of expected metrics data from NewMetricData. +type MetricDataOptions struct { + // CSMLabels are the csm labels to attach to metrics which receive csm + // labels (all A66 expect client call and started RPC's client and server + // side). + CSMLabels []attribute.KeyValue + // Target is the target of the client and server. + Target string + // UnaryCallFailed is whether the Unary Call failed, which would trigger + // trailers only. + UnaryCallFailed bool + // UnaryCompressedMessageSize is the compressed message size of the Unary + // RPC. This assumes both client and server sent the same message size. + UnaryCompressedMessageSize float64 + // StreamingCompressedMessageSize is the compressed message size of the + // Streaming RPC. This assumes both client and server sent the same message + // size. + StreamingCompressedMessageSize float64 +} + +// createBucketCounts creates a list of bucket counts based off the +// recordingPoints and bounds. Both recordingPoints and bounds are assumed to be +// in order. +func createBucketCounts(recordingPoints []float64, bounds []float64) []uint64 { + var bucketCounts []uint64 + var recordingPointIndex int + for _, bound := range bounds { + var bucketCount uint64 + if recordingPointIndex >= len(recordingPoints) { + bucketCounts = append(bucketCounts, bucketCount) + continue + } + for recordingPoints[recordingPointIndex] <= bound { + bucketCount += 1 + recordingPointIndex += 1 + if recordingPointIndex >= len(recordingPoints) { + break + } + } + bucketCounts = append(bucketCounts, bucketCount) + } + // The rest of the recording points are last bound -> infinity. + bucketCounts = append(bucketCounts, uint64(len(recordingPoints)-recordingPointIndex)) + return bucketCounts +} + +// MetricDataUnary returns a list of expected metrics defined in A66 for a +// client and server for one unary RPC. +func MetricDataUnary(options MetricDataOptions) []metricdata.Metrics { + methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + targetAttr := attribute.String("grpc.target", options.Target) + statusAttr := attribute.String("grpc.status", "OK") + if options.UnaryCallFailed { + statusAttr = attribute.String("grpc.status", "UNKNOWN") + } + clientSideEnd := []attribute.KeyValue{ + methodAttr, + targetAttr, + statusAttr, + } + serverSideEnd := []attribute.KeyValue{ + methodAttr, + statusAttr, + } + clientSideEnd = append(clientSideEnd, options.CSMLabels...) + serverSideEnd = append(serverSideEnd, options.CSMLabels...) + compressedBytesSentRecv := int64(options.UnaryCompressedMessageSize) + bucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds) + extrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize)) + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } +} + +// MetricDataStreaming returns a list of expected metrics defined in A66 for a +// client and server for one streaming RPC. +func MetricDataStreaming(options MetricDataOptions) []metricdata.Metrics { + methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + targetAttr := attribute.String("grpc.target", options.Target) + statusAttr := attribute.String("grpc.status", "OK") + clientSideEnd := []attribute.KeyValue{ + methodAttr, + targetAttr, + statusAttr, + } + serverSideEnd := []attribute.KeyValue{ + methodAttr, + statusAttr, + } + clientSideEnd = append(clientSideEnd, options.CSMLabels...) + serverSideEnd = append(serverSideEnd, options.CSMLabels...) + compressedBytesSentRecv := int64(options.StreamingCompressedMessageSize) + bucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds) + extrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize)) + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } +} + +// MetricData returns a metricsDataSlice for A66 metrics for client and server +// with a unary RPC and streaming RPC with certain compression and message flow +// sent. If csmAttributes is set to true, the corresponding CSM Metrics (not +// client side call metrics, or started on client and server side). +func MetricData(options MetricDataOptions) []metricdata.Metrics { + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + targetAttr := attribute.String("grpc.target", options.Target) + unaryStatusAttr := attribute.String("grpc.status", "OK") + streamingStatusAttr := attribute.String("grpc.status", "OK") + if options.UnaryCallFailed { + unaryStatusAttr = attribute.String("grpc.status", "UNKNOWN") + } + unaryMethodClientSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + targetAttr, + unaryStatusAttr, + } + streamingMethodClientSideEnd := []attribute.KeyValue{ + duplexMethodAttr, + targetAttr, + streamingStatusAttr, + } + unaryMethodServerSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + unaryStatusAttr, + } + streamingMethodServerSideEnd := []attribute.KeyValue{ + duplexMethodAttr, + streamingStatusAttr, + } + + unaryMethodClientSideEnd = append(unaryMethodClientSideEnd, options.CSMLabels...) + streamingMethodClientSideEnd = append(streamingMethodClientSideEnd, options.CSMLabels...) + unaryMethodServerSideEnd = append(unaryMethodServerSideEnd, options.CSMLabels...) + streamingMethodServerSideEnd = append(streamingMethodServerSideEnd, options.CSMLabels...) + unaryCompressedBytesSentRecv := int64(options.UnaryCompressedMessageSize) + unaryBucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds) + unaryExtrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize)) + + streamingCompressedBytesSentRecv := int64(options.StreamingCompressedMessageSize) + streamingBucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds) + streamingExtrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize)) + + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, streamingStatusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } +} + +// CompareMetrics asserts wantMetrics are what we expect. It polls for eventual +// server metrics (not emitted synchronously with client side rpc returning), +// and for duration metrics makes sure the data point is within possible testing +// time (five seconds from context timeout). +func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { + for _, metric := range wantMetrics { + if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { + // Sync the metric reader to see the event because stats.End is + // handled async server side. Thus, poll until metrics created from + // stats.End show up. + var err error + if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper + t.Fatal(err) + } + continue + } + + // If one of the duration metrics, ignore the bucket counts, and make + // sure it count falls within a bucket <= 5 seconds (maximum duration of + // test due to context). + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) + } + if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } + if err := checkDataPointWithinFiveSeconds(val); err != nil { + t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) + } + continue + } + + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 6a6bcd1627da..4bc195b4e12c 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -120,12 +120,6 @@ type MetricsOptions struct { // will be recorded. Metrics *Metrics - // TargetAttributeFilter is a callback that takes the target string of the - // channel and returns a bool representing whether to use target as a label - // value or use the string "other". If unset, will use the target string as - // is. This only applies for client side metrics. - TargetAttributeFilter func(string) bool - // MethodAttributeFilter is to record the method name of RPCs handled by // grpc.UnknownServiceHandler, but take care to limit the values allowed, as // allowing too many will increase cardinality and could cause severe memory