Skip to content

Commit

Permalink
sandbox: Capture timing metrics for API server calls. (digital-asset#…
Browse files Browse the repository at this point in the history
…5145)

* sandbox: Capture timing metrics for API server calls.

`timer` is a superset of `meter`, so this doesn't lose any existing
behavior; just adds new behavior.

CHANGELOG_BEGIN
- [Ledger API Server] Added timing metrics for all GRPC endpoints.
CHANGELOG_END

* sandbox: Rename SandboxClientResource to GrpcClientResource.

* sample-service: Clean up warnings.

* sandbox: Add tests for MetricsInterceptor.

* sandbox: Split the API metrics interceptor from the naming.

* sandbox: Use `MetricRegistry.name` instead of string interpolation.

* rs-grpc-akka: Restrict the test library to the DAML workspace.

Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
  • Loading branch information
SamirTalwar and stefanobaghino-da authored Mar 25, 2020
1 parent 2c0627c commit bfe27d2
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.digitalasset.ledger.api.v1.{CommandServiceGrpc, TransactionServiceGrp
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxClientResource
import com.digitalasset.platform.sandbox.services.GrpcClientResource
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.ports.Port
import com.google.protobuf.Empty
Expand Down Expand Up @@ -54,7 +54,7 @@ object TestUtil {

val channelOwner = for {
server <- SandboxServer.owner(config)
channel <- SandboxClientResource.owner(server.port)
channel <- GrpcClientResource.owner(server.port)
} yield channel
channelOwner.use(channel => Future(testCode(channel)))
}
Expand Down
3 changes: 3 additions & 0 deletions ledger-api/rs-grpc-akka/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ da_scala_library(
"src/test/**/*Test.scala",
],
),
visibility = [
"//:__subpackages__",
],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:io_grpc_grpc_netty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import com.digitalasset.grpc.adapter.server.akka.ServerAdapter
import com.digitalasset.grpc.sampleservice.Responding
import com.digitalasset.platform.hello.HelloServiceGrpc.HelloService
import com.digitalasset.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc}
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import io.grpc.{BindableService, ServerServiceDefinition}

import scala.concurrent.ExecutionContext.Implicits.global

class AkkaImplementation(
implicit executionSequencerFactory: ExecutionSequencerFactory,
actorMaterializer: Materializer)
extends HelloService
materializer: Materializer,
) extends HelloService
with Responding
with BindableService {

Expand All @@ -34,25 +33,12 @@ class AkkaImplementation(

override def serverStreaming(
request: HelloRequest,
responseObserver: StreamObserver[HelloResponse]): Unit = {
responseObserver: StreamObserver[HelloResponse],
): Unit =
Source
.single(request)
.via(Flow[HelloRequest].mapConcat(responses))
.runWith(ServerAdapter.toSink(responseObserver))
.onComplete(_ => serverStreamingCalls.incrementAndGet())
}

private val clientStreamingFlow = {
Flow[HelloRequest]
.fold((0, ByteString.EMPTY))(
(acc, request) =>
(
acc._1 + request.reqInt,
ByteString.copyFrom(
acc._2.toByteArray
.zip(request.payload.toByteArray)
.map(t => (t._1 ^ t._2).byteValue())))
)
.map(t => HelloResponse(t._1, t._2))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ trait Responding extends HelloService {
override def single(request: HelloRequest): Future[HelloResponse] =
Future.successful(response(request))

protected def response(request: HelloRequest) = HelloResponse(request.reqInt * 2, request.payload)
protected def response(request: HelloRequest): HelloResponse =
HelloResponse(request.reqInt * 2, request.payload)

protected def responses(request: HelloRequest) =
protected def responses(request: HelloRequest): List[HelloResponse] =
(1 to request.reqInt).map(i => HelloResponse(i, request.payload)).toList

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver
import io.grpc.{BindableService, ServerServiceDefinition, Status}

import scala.concurrent.ExecutionContext

class ReferenceImplementation
extends HelloService
with Responding
Expand All @@ -23,13 +24,14 @@ class ReferenceImplementation

override def serverStreaming(
request: HelloRequest,
responseObserver: StreamObserver[HelloResponse]): Unit = {
responseObserver: StreamObserver[HelloResponse],
): Unit = {
validateRequest(request)
for (i <- 1.to(request.reqInt)) responseObserver.onNext(HelloResponse(i))
responseObserver.onCompleted()
}

private def validateRequest(request: HelloRequest) =
private def validateRequest(request: HelloRequest): Unit =
if (request.reqInt < 0)
throw Status.INVALID_ARGUMENT
.withDescription("request cannot be negative")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.digitalasset.ledger.api.testing.utils.{OwnedResource, Resource}
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxClientResource
import com.digitalasset.platform.sandbox.services.GrpcClientResource
import com.digitalasset.platform.services.time.TimeProviderType.Static
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
Expand Down Expand Up @@ -50,7 +50,7 @@ object LedgerFactories {
PostgresResource.owner().map(fixture => Some(fixture.jdbcUrl))
}
server <- SandboxServer.owner(sandboxConfig(jdbcUrl, darFiles))
channel <- SandboxClientResource.owner(server.port)
channel <- GrpcClientResource.owner(server.port)
} yield new LedgerContext(channel, darFiles.map(getPackageIdOrThrow))
)
}
3 changes: 3 additions & 0 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ da_scala_test_suite(
"//daml-lf/transaction",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-akka:rs-grpc-akka-tests-lib",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/sample-service",
"//ledger-api/testing-utils",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
Expand All @@ -298,6 +300,7 @@ da_scala_test_suite(
"@maven//:com_typesafe_config",
"@maven//:commons_io_commons_io",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_grpc_grpc_services",
"@maven//:io_netty_netty_handler",
"@maven//:org_awaitility_awaitility",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,13 @@

package com.digitalasset.platform.apiserver

import com.codahale.metrics.MetricRegistry
import java.util.concurrent.atomic.AtomicBoolean

import com.codahale.metrics.{MetricRegistry, Timer}
import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}

import scala.collection.concurrent.TrieMap

object MetricsInterceptor {

private[this] val capitalization = "[A-Z]+".r
private[this] val startWordCapitalization = "^[A-Z]+".r
private[this] val endWordAcronym = "[A-Z]{2,}$".r

private[this] val snakifyWholeWord = (s: String) => if (s.forall(_.isUpper)) s.toLowerCase else s

private[this] val snakify = (s: String) =>
capitalization.findAllMatchIn(s).foldRight(s) { (m, r) =>
val s = m.toString
if (s.length == 1) r.patch(m.start, s"_${s.toLowerCase}", 1)
else r.patch(m.start, s"_${s.init.toLowerCase}_${s.last.toLower}", s.length)
}

private[this] val snakifyStart = (s: String) =>
startWordCapitalization.findFirstIn(s).fold(s) { m =>
s.patch(
0,
if (m.length == 1) m.toLowerCase else m.init.toLowerCase,
math.max(m.length - 1, 1))
}

private[this] val snakifyEnd = (s: String) =>
endWordAcronym.findFirstIn(s).fold(s) { m =>
s.patch(s.length - m.length, s"_${m.toLowerCase}", m.length)
}

// Turns a camelCased string into a snake_cased one
private[apiserver] val camelCaseToSnakeCase: String => String =
snakifyWholeWord andThen snakifyStart andThen snakifyEnd andThen snakify

// assert(fullServiceName("org.example.SomeService/someMethod") == "daml.lapi.some_service.some_method")
private[apiserver] def nameFor(fullMethodName: String): String = {
val serviceAndMethodName = fullMethodName.split('/')
assert(
serviceAndMethodName.length == 2,
s"Expected service and method names separated by '/', got '$fullMethodName'")
val serviceName = camelCaseToSnakeCase(serviceAndMethodName(0).split('.').last)
val methodName = camelCaseToSnakeCase(serviceAndMethodName(1))
s"daml.lapi.$serviceName.$methodName"
}

}

/**
* An interceptor that counts all incoming calls by method.
*
Expand All @@ -79,12 +36,46 @@ final class MetricsInterceptor(metrics: MetricRegistry) extends ServerIntercepto
override def interceptCall[ReqT, RespT](
call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
next: ServerCallHandler[ReqT, RespT],
): ServerCall.Listener[ReqT] = {
val fullMethodName = call.getMethodDescriptor.getFullMethodName
val metricName = fullServiceToMetricNameCache.getOrElseUpdate(
fullMethodName,
MetricsInterceptor.nameFor(fullMethodName))
metrics.meter(metricName).mark()
next.startCall(call, headers)
MetricsNaming.nameFor(fullMethodName))
val timer = metrics.timer(metricName).time()
val listener = next.startCall(call, headers)
new TimedListener(listener, timer)
}

class TimedListener[ReqT](listener: ServerCall.Listener[ReqT], timer: Timer.Context)
extends ServerCall.Listener[ReqT] {
private val timerStopped = new AtomicBoolean(false)

override def onReady(): Unit =
listener.onReady()

override def onMessage(message: ReqT): Unit =
listener.onMessage(message)

override def onHalfClose(): Unit =
listener.onHalfClose()

override def onCancel(): Unit = {
listener.onCancel()
stopTimer()
}

override def onComplete(): Unit = {
listener.onComplete()
stopTimer()
}

private def stopTimer(): Unit = {
if (timerStopped.compareAndSet(false, true)) {
timer.stop()
()
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.apiserver

import com.codahale.metrics.MetricRegistry

object MetricsNaming {

private[this] val capitalization = "[A-Z]+".r
private[this] val startWordCapitalization = "^[A-Z]+".r
private[this] val endWordAcronym = "[A-Z]{2,}$".r

private[this] val snakifyWholeWord = (s: String) => if (s.forall(_.isUpper)) s.toLowerCase else s

private[this] val snakify = (s: String) =>
capitalization.findAllMatchIn(s).foldRight(s) { (m, r) =>
val s = m.toString
if (s.length == 1) r.patch(m.start, s"_${s.toLowerCase}", 1)
else r.patch(m.start, s"_${s.init.toLowerCase}_${s.last.toLower}", s.length)
}

private[this] val snakifyStart = (s: String) =>
startWordCapitalization.findFirstIn(s).fold(s) { m =>
s.patch(
0,
if (m.length == 1) m.toLowerCase else m.init.toLowerCase,
math.max(m.length - 1, 1))
}

private[this] val snakifyEnd = (s: String) =>
endWordAcronym.findFirstIn(s).fold(s) { m =>
s.patch(s.length - m.length, s"_${m.toLowerCase}", m.length)
}

// Turns a camelCased string into a snake_cased one
private[apiserver] val camelCaseToSnakeCase: String => String =
snakifyWholeWord andThen snakifyStart andThen snakifyEnd andThen snakify

// assert(fullServiceName("org.example.SomeService/someMethod") == "daml.lapi.some_service.some_method")
private[apiserver] def nameFor(fullMethodName: String): String = {
val serviceAndMethodName = fullMethodName.split('/')
assert(
serviceAndMethodName.length == 2,
s"Expected service and method names separated by '/', got '$fullMethodName'")
val serviceName = camelCaseToSnakeCase(serviceAndMethodName(0).split('.').last)
val methodName = camelCaseToSnakeCase(serviceAndMethodName(1))
MetricRegistry.name("daml", "lapi", serviceName, methodName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import io.netty.channel.EventLoopGroup

import scala.concurrent.{ExecutionContext, Future}

object SandboxClientResource {
object GrpcClientResource {
def owner(port: Port): ResourceOwner[Channel] =
for {
eventLoopGroup <- new EventLoopGroupOwner("api-client", sys.runtime.availableProcessors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait SandboxFixture extends AbstractSandboxFixture with SuiteResource[(SandboxS
.fold[ResourceOwner[Option[String]]](ResourceOwner.successful(None))(_.map(info =>
Some(info.jdbcUrl)))
server <- SandboxServer.owner(config.copy(jdbcUrl = jdbcUrl))
channel <- SandboxClientResource.owner(server.port)
channel <- GrpcClientResource.owner(server.port)
} yield (server, channel)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.daml.ledger.participant.state.v1.SeedService
import com.digitalasset.ledger.api.testing.utils.{OwnedResource, Resource, SuiteResource}
import com.digitalasset.platform.sandbox.AbstractSandboxFixture
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxClientResource
import com.digitalasset.platform.sandbox.services.GrpcClientResource
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
import io.grpc.Channel
Expand Down Expand Up @@ -35,7 +35,7 @@ trait SandboxNextFixture extends AbstractSandboxFixture with SuiteResource[(Port
.fold[ResourceOwner[Option[String]]](ResourceOwner.successful(None))(_.map(info =>
Some(info.jdbcUrl)))
port <- new Runner(config.copy(jdbcUrl = jdbcUrl))
channel <- SandboxClientResource.owner(port)
channel <- GrpcClientResource.owner(port)
} yield (port, channel)
)
}
Expand Down
Loading

0 comments on commit bfe27d2

Please sign in to comment.