Skip to content

Commit

Permalink
Move Aggregation/Temporality selection to the Exporter interface (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#3260)

* Add Aggregation/Temporality to Exporter iface

* Use Exporter selectors in periodic reader

* Move selector opts to just manual reader

* Simplify periodic reader ref to Exporter selectors

* Fix the periodic reader tests

* Add Aggregation/Temporality method to stdoutmetric

* Add Temporality/Aggregation to otlpmetric exp

* Add Temporality/Aggregation to http/grpc otlp clients

* Add oconf tests for selector opts

* Add tests to stdoutmetric for opts

* Correct comment subject

* Add changes to changelog

* Fix otest test client
  • Loading branch information
MrAlias authored Nov 1, 2022
1 parent 8a390ac commit 48a0547
Show file tree
Hide file tree
Showing 19 changed files with 436 additions and 106 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260)
- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260)
- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260)

### Fixed

Expand Down
9 changes: 9 additions & 0 deletions exporters/otlp/otlpmetric/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric
import (
"context"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality

// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation

// UploadMetrics transmits metric data to an OTLP receiver.
//
// All retry logic must be handled by UploadMetrics alone, the Exporter
Expand Down
34 changes: 32 additions & 2 deletions exporters/otlp/otlpmetric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -34,6 +36,20 @@ type exporter struct {
shutdownOnce sync.Once
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Aggregation(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
Expand Down Expand Up @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error {
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.client = shutdownClient{
temporalitySelector: client.Temporality,
aggregationSelector: client.Aggregation,
}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
Expand All @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter {
return &exporter{client: client}
}

type shutdownClient struct{}
type shutdownClient struct {
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
Expand All @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error {
return errShutdown
}

func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error {
return c.err(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/otlpmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -31,6 +34,14 @@ type client struct {
n int
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error {
c.n++
return nil
Expand Down
42 changes: 42 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -57,6 +61,9 @@ type (

// gRPC configurations
GRPCCredentials credentials.TransportCredentials

TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}

Config struct {
Expand All @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
Expand All @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
DialOptions: []grpc.DialOption{grpc.WithUserAgent(internal.GetUserAgentHeader())},
Expand Down Expand Up @@ -313,3 +326,32 @@ func WithTimeout(duration time.Duration) GenericOption {
return cfg
})
}

func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.TemporalitySelector = selector
return cfg
})
}

func WithAggregationSelector(selector metric.AggregationSelector) GenericOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}

return newGenericOption(func(cfg Config) Config {
cfg.Metrics.AggregationSelector = wrapped
return cfg
})
}
43 changes: 43 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal/envconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) {
assert.Equal(t, c.Metrics.Timeout, 5*time.Second)
},
},

// Temporality Selector Tests
{
name: "WithTemporalitySelector",
opts: []oconf.GenericOption{
oconf.WithTemporalitySelector(deltaSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a TemporalitySelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.TemporalitySelector
assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind))
},
},

// Aggregation Selector Tests
{
name: "WithAggregationSelector",
opts: []oconf.GenericOption{
oconf.WithAggregationSelector(dropSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a AggregationSelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.AggregationSelector
assert.Equal(t, aggregation.Drop{}, got(undefinedKind))
},
},
}

for _, tt := range tests {
Expand All @@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) {
}
}

func dropSelector(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}

func deltaSelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}

func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption {
converted := make([]oconf.HTTPOption, len(opts))
for i, o := range opts {
Expand Down
12 changes: 12 additions & 0 deletions exporters/otlp/otlpmetric/internal/otest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"testing"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -27,6 +31,14 @@ type client struct {
storage *Storage
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) Collect() *Storage {
return c.storage
}
Expand Down
19 changes: 19 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -53,6 +56,9 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
Expand All @@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}

if len(cfg.Metrics.Headers) > 0 {
Expand All @@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
return c, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)

// Option applies a configuration option to the Exporter.
Expand Down Expand Up @@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(settings RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(settings))}
}

// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}

// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}
Loading

0 comments on commit 48a0547

Please sign in to comment.