Skip to content

Commit

Permalink
Add grpc server interceptor with golden metrics [PLEN-9] (digital-ass…
Browse files Browse the repository at this point in the history
  • Loading branch information
nicu-da authored Nov 10, 2022
1 parent 25bac20 commit 7ce5d05
Show file tree
Hide file tree
Showing 16 changed files with 721 additions and 108 deletions.
3 changes: 3 additions & 0 deletions ledger-api/testing-utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ da_scala_library(
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger/ledger-resources",
"//libs-scala/contextualized-logging",
"//libs-scala/grpc-utils",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_guava_guava",
"@maven//:com_typesafe_config",
"@maven//:io_grpc_grpc_core",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.testing.utils

import java.net.{InetAddress, InetSocketAddress}

import com.daml.ledger.resources.{ResourceContext, ResourceOwner, Resource => LedgerResource}
import io.grpc.netty.{NettyChannelBuilder, NettyServerBuilder}
import io.grpc.{BindableService, Channel, Server, ServerInterceptor}

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

object TestingServerInterceptors {

def channelOwner(
interceptor: ServerInterceptor,
service: BindableService,
): ResourceOwner[Channel] = {
for {
server <- serverOwner(interceptor, service)
channel <- ResourceOwner.forChannel(
NettyChannelBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, server.getPort))
.usePlaintext(),
shutdownTimeout = 5.seconds,
)
} yield channel
}

def serverOwner(
interceptor: ServerInterceptor,
service: BindableService,
): ResourceOwner[Server] =
new ResourceOwner[Server] {
def acquire()(implicit context: ResourceContext): LedgerResource[Server] =
LedgerResource(Future {
val server =
NettyServerBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
.directExecutor()
.intercept(interceptor)
.addService(service)
.build()
server.start()
server
})(server => Future(server.shutdown().awaitTermination()))
}
}
18 changes: 18 additions & 0 deletions ledger/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ da_scala_library(
srcs = glob(["src/main/scala/**/*.scala"]),
resources = glob(["src/main/resources/**/*"]),
scala_deps = [
"@maven//:com_thesamet_scalapb_scalapb_runtime",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_scalaz_scalaz_core",
Expand All @@ -29,10 +30,12 @@ da_scala_library(
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_dropwizard_metrics_metrics_graphite",
"@maven//:io_dropwizard_metrics_metrics_jmx",
"@maven//:io_dropwizard_metrics_metrics_jvm",
"@maven//:io_grpc_grpc_api",
"@maven//:io_netty_netty_transport",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
Expand All @@ -59,6 +62,8 @@ da_scala_library(
runtime_deps = [],
deps = [
":metrics",
"//libs-scala/scala-utils",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_sdk_testing",
Expand All @@ -72,6 +77,9 @@ da_scala_test_suite(
size = "small",
srcs = glob(["src/test/suite/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_thesamet_scalapb_lenses",
"@maven//:com_thesamet_scalapb_scalapb_runtime",
"@maven//:com_thesamet_scalapb_scalapb_runtime_grpc",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_scalactic_scalactic",
Expand All @@ -84,10 +92,20 @@ da_scala_test_suite(
":metrics",
":metrics-test-lib",
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-akka:rs-grpc-akka-tests-lib",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/sample-service",
"//ledger-api/testing-utils",
"//ledger/ledger-resources",
"//ledger/ledger-resources:ledger-resources-test-lib",
"//ledger/test-common",
"//libs-scala/concurrent",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_protobuf",
"@maven//:io_grpc_grpc_stub",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_sdk_testing",
Expand Down
30 changes: 29 additions & 1 deletion ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package com.daml.metrics
import com.codahale.metrics.MetricRegistry
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.metrics.{Meter => OtelMeter}
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.{MetricHandle, MetricName}
import com.daml.metrics.api.dropwizard.DropwizardFactory
import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory
import com.daml.metrics.grpc.GrpcServerMetrics

object Metrics {
lazy val ForTesting = new Metrics(new MetricRegistry, GlobalOpenTelemetry.getMeter("test"))
Expand Down Expand Up @@ -47,5 +49,31 @@ final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMe

object HttpJsonApi extends HttpJsonApiMetrics(prefix :+ "http_json_api", registry, otelMeter)

object grpc extends OpenTelemetryFactory with GrpcServerMetrics {

private val grpcServerMetricsPrefix = prefix :+ "grpc" :+ "server"

override def otelMeter: OtelMeter = Metrics.this.otelMeter
override val callTimer: MetricHandle.Timer = timer(grpcServerMetricsPrefix)
override val messagesSent: MetricHandle.Meter = meter(
grpcServerMetricsPrefix :+ "messages" :+ "sent"
)
override val messagesReceived: MetricHandle.Meter = meter(
grpcServerMetricsPrefix :+ "messages" :+ "received"
)
override val messagesSentSize: MetricHandle.Histogram = histogram(
grpcServerMetricsPrefix :+ "messages" :+ "sent" :+ "bytes"
)
override val messagesReceivedSize: MetricHandle.Histogram = histogram(
grpcServerMetricsPrefix :+ "messages" :+ "received" :+ "bytes"
)
override val callsStarted: MetricHandle.Meter = meter(
grpcServerMetricsPrefix :+ "started"
)
override val callsFinished: MetricHandle.Meter = meter(
grpcServerMetricsPrefix :+ "handled"
)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ object MetricHandle {

object Timer {

trait TimerHandle extends AutoCloseable {
def stop(): Unit = close()
trait TimerHandle {

def stop()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit

}

}
Expand Down Expand Up @@ -113,13 +117,13 @@ object MetricHandle {
override def metricType: String = "Counter"
def inc()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
): Unit = inc(1)
def inc(n: Long)(implicit
context: MetricsContext
): Unit
def dec()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit
): Unit = dec(1)
def dec(n: Long)(implicit
context: MetricsContext
): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ object MetricsContext {

def withEmptyMetricsContext[T](run: MetricsContext => T): T = run(Empty)

def withMetricLabels[T](labels: (String, String)*)(run: MetricsContext => T): T = run(
MetricsContext(Map(labels: _*))
)

def withExtraMetricLabels[T](labels: (String, String)*)(run: MetricsContext => T)(implicit
metrics: MetricsContext
): T = run(
metrics.merge(MetricsContext(Map(labels: _*)))
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.metrics.api.dropwizard
import java.time.Duration
import java.util.concurrent.TimeUnit

import com.codahale.metrics.Timer.Context
import com.codahale.{metrics => codahale}
import com.daml.metrics.api.MetricHandle.Timer.TimerHandle
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram, Meter, Timer}
Expand All @@ -27,13 +28,16 @@ case class DropwizardTimer(name: String, metric: codahale.Timer) extends Timer {
context: MetricsContext = MetricsContext.Empty
): TimerHandle = {
val ctx = metric.time()
() => {
ctx.stop()
()
}
DropwizardTimerHandle(ctx)
}
}

final case class DropwizardTimerHandle(ctx: Context) extends TimerHandle {

override def stop()(implicit context: MetricsContext): Unit = ctx.close()

}

sealed case class DropwizardMeter(name: String, metric: codahale.Meter) extends Meter {

def mark(value: Long)(implicit
Expand All @@ -47,12 +51,15 @@ sealed case class DropwizardCounter(name: String, metric: codahale.Counter) exte
override def inc()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.inc

override def inc(n: Long)(implicit
context: MetricsContext
): Unit = metric.inc(n)

override def dec()(implicit
context: MetricsContext = MetricsContext.Empty
): Unit = metric.dec

override def dec(n: Long)(implicit
context: MetricsContext
): Unit = metric.dec(n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ sealed case class NoOpTimer(name: String) extends Timer {
): T = call
override def startAsync()(implicit
context: MetricsContext = MetricsContext.Empty
): TimerHandle = () => ()
): TimerHandle = NoOpTimerHandle
}

case object NoOpTimerHandle extends TimerHandle {
override def stop()(implicit context: MetricsContext): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ case class OpenTelemetryTimer(name: String, histogram: LongHistogram, timerConte
): Unit =
histogram.record(
TimeUnit.MILLISECONDS.convert(duration, unit),
AttributesHelper.multiContextAsAttributes(context, timerContext),
AttributesHelper.multiContextAsAttributes(timerContext, context),
)
override def time[T](call: => T)(implicit
context: MetricsContext
Expand All @@ -149,20 +149,21 @@ case class OpenTelemetryTimer(name: String, histogram: LongHistogram, timerConte
val result = call
histogram.record(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS),
AttributesHelper.multiContextAsAttributes(context, timerContext),
AttributesHelper.multiContextAsAttributes(timerContext, context),
)
result
}

override def startAsync()(implicit
context: MetricsContext
): TimerHandle = {
override def startAsync()(implicit startContext: MetricsContext): TimerHandle = {
val start = System.nanoTime()
() =>
histogram.record(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS),
AttributesHelper.multiContextAsAttributes(context, timerContext),
)
new TimerHandle {
override def stop()(implicit stopContext: MetricsContext): Unit = {
histogram.record(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS),
AttributesHelper.multiContextAsAttributes(timerContext, startContext, stopContext),
)
}
}
}

override def update(duration: Duration)(implicit
Expand Down
Loading

0 comments on commit 7ce5d05

Please sign in to comment.