Skip to content

Commit

Permalink
Add a library that defines Resource factory methods for gRPC servers …
Browse files Browse the repository at this point in the history
…and channels (#8604)

* Add a library that defines Resource factory methods for gRPC servers and channels

This is the first of four PRs in which 6ea70c4
has been broken up to facilitate review.

The endgame is to have the non-repudiation prototype merged. These
factory methods have been used to make resource management easier,
specifically as part of testing.

changelog_begin
changelog_end

* Let warning log output in tests

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>

* Address #8604 (comment)

* Address #8604 (comment)

* Address #8604 (comment)

* Address #8604 (comment)

* Add resources-grpc to release/artifacts.yaml

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
  • Loading branch information
stefanobaghino-da and cocreature authored Jan 25, 2021
1 parent 5693394 commit fc69db0
Show file tree
Hide file tree
Showing 29 changed files with 390 additions and 131 deletions.
1 change: 1 addition & 0 deletions daml-script/test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ da_scala_test_suite(
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_auth0_java_jwt",
"@maven//:com_typesafe_akka_akka_http_core_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ da_scala_library(
"//libs-scala/concurrent",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_netty_netty_handler",
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ da_scala_library(
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ da_scala_library(
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_zaxxer_HikariCP",
"@maven//:io_dropwizard_metrics_metrics_core",
Expand Down
4 changes: 4 additions & 0 deletions ledger/ledger-resources/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ da_scala_library(
deps = [
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:io_grpc_grpc_api",
"@maven//:io_netty_netty_common",
"@maven//:io_netty_netty_transport",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package com.daml.ledger.resources

import com.daml.resources.akka.AkkaResourceOwnerFactories
import com.daml.resources.grpc.GrpcResourceOwnerFactories
import com.daml.resources.{HasExecutionContext, ResourceOwnerFactories}

object ResourceOwner
extends ResourceOwnerFactories[ResourceContext]
with AkkaResourceOwnerFactories[ResourceContext] {
with AkkaResourceOwnerFactories[ResourceContext]
with GrpcResourceOwnerFactories[ResourceContext] {
override protected implicit val hasExecutionContext: HasExecutionContext[ResourceContext] =
ResourceContext.`Context has ExecutionContext`
}
6 changes: 4 additions & 2 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ compile_deps = [
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/scala-utils",
"//libs-scala/timer-utils",
"@maven//:com_google_guava_guava",
Expand All @@ -51,9 +52,7 @@ compile_deps = [
"@maven//:io_dropwizard_metrics_metrics_graphite",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_grpc_grpc_services",
"@maven//:io_netty_netty_common",
"@maven//:io_netty_netty_handler",
"@maven//:io_netty_netty_transport",
"@maven//:net_logstash_logback_logstash_logback_encoder",
"@maven//:org_flywaydb_flyway_core",
"@maven//:io_opentelemetry_opentelemetry_api",
Expand Down Expand Up @@ -172,6 +171,8 @@ da_scala_library(
"//libs-scala/ports",
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/timer-utils",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
Expand Down Expand Up @@ -220,6 +221,7 @@ test_deps = [
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/timer-utils",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ package com.daml.platform.apiserver

import java.io.IOException
import java.net.{BindException, InetAddress, InetSocketAddress}
import java.util.concurrent.{Executor, TimeUnit}
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit.SECONDS

import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.resources.{Resource, ResourceContext}
import com.daml.metrics.Metrics
import com.daml.ports.Port
import com.daml.resources.grpc.ServerResourceOwner
import com.google.protobuf.Message
import io.grpc._
import io.grpc.netty.NettyServerBuilder
import io.netty.handler.ssl.SslContext

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success}

private[apiserver] object GrpcServer {

Expand All @@ -28,6 +30,36 @@ private[apiserver] object GrpcServer {
// allow for extra information such as the exception stack trace.
private val MaximumStatusDescriptionLength = 4 * 1024 // 4 KB

private def makeBuilder(
address: Option[String],
desiredPort: Port,
maxInboundMessageSize: Int,
sslContext: Option[SslContext],
interceptors: List[ServerInterceptor],
metrics: Metrics,
servicesExecutor: Executor,
services: Iterable[BindableService],
): NettyServerBuilder = {
val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress)
val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value))
builder.sslContext(sslContext.orNull)
builder.permitKeepAliveTime(10, SECONDS)
builder.permitKeepAliveWithoutCalls(true)
builder.executor(servicesExecutor)
builder.maxInboundMessageSize(maxInboundMessageSize)
interceptors.foreach(builder.intercept)
builder.intercept(new MetricsInterceptor(metrics))
builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength))
services.foreach { service =>
builder.addService(service)
toLegacyService(service).foreach(builder.addService)
}
builder
}

private def causedByBindException(e: IOException): Boolean =
e.getCause != null && e.getCause.isInstanceOf[BindException]

final class Owner(
address: Option[String],
desiredPort: Port,
Expand All @@ -37,43 +69,28 @@ private[apiserver] object GrpcServer {
metrics: Metrics,
servicesExecutor: Executor,
services: Iterable[BindableService],
) extends ResourceOwner[Server] {
) extends ServerResourceOwner[ResourceContext](
builder = makeBuilder(
address,
desiredPort,
maxInboundMessageSize,
sslContext,
interceptors,
metrics,
servicesExecutor,
services,
),
shutdownTimeout = 1.second,
) {
override def acquire()(implicit context: ResourceContext): Resource[Server] = {
val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress)
Resource(Future {
val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value))
builder.sslContext(sslContext.orNull)
builder.permitKeepAliveTime(10, SECONDS)
builder.permitKeepAliveWithoutCalls(true)
builder.executor(servicesExecutor)
builder.maxInboundMessageSize(maxInboundMessageSize)
interceptors.foreach(builder.intercept)
builder.intercept(new MetricsInterceptor(metrics))
builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength))
services.foreach { service =>
builder.addService(service)
toLegacyService(service).foreach(builder.addService)
}
val server = builder.build()
try {
server.start()
} catch {
case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] =>
throw new UnableToBind(desiredPort, e.getCause)
}
server
})(server =>
Future {
// Phase 1, initialize shutdown, but wait for termination.
// If the shutdown has been initiated by the reset service, this gives the service time to gracefully complete the request.
server.shutdown()
server.awaitTermination(1, TimeUnit.SECONDS)

// Phase 2: Now cut off all remaining connections.
server.shutdownNow()
server.awaitTermination()
}
)
super.acquire().transformWith {
case Success(server) =>
Resource.successful(server)
case Failure(e: IOException) if causedByBindException(e) =>
Resource.failed(new UnableToBind(desiredPort, e.getCause))
case Failure(exception) =>
Resource.failed(exception)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,45 @@
package com.daml.platform.apiserver.services

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit
import java.util.UUID

import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.platform.apiserver.EventLoopGroupOwner
import com.daml.ledger.resources.ResourceOwner
import com.daml.ports.Port
import io.grpc.Channel
import io.grpc.netty.NegotiationType
import io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import io.netty.channel.EventLoopGroup
import io.netty.handler.ssl.SslContext
import io.netty.util.concurrent.DefaultThreadFactory

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

object GrpcClientResource {
def owner(port: Port, sslContext: Option[SslContext] = None): ResourceOwner[Channel] =
def owner(port: Port, sslContext: Option[SslContext] = None): ResourceOwner[Channel] = {
val threadFactoryName = s"api-client-grpc-event-loop-${UUID.randomUUID()}"
val threadFactory = new DefaultThreadFactory(threadFactoryName, true)
val threadCount = sys.runtime.availableProcessors()
for {
eventLoopGroup <- new EventLoopGroupOwner("api-client", sys.runtime.availableProcessors())
channel <- channelOwner(
port,
EventLoopGroupOwner.clientChannelType,
eventLoopGroup,
sslContext,
)
eventLoopGroup <- ResourceOwner.forEventLoopGroup(threadCount, threadFactory)
channelBuilder = makeChannelBuilder(port, eventLoopGroup, sslContext)
channel <- ResourceOwner.forChannel(channelBuilder, shutdownTimeout = 5.seconds)
} yield channel
}

private def channelOwner(
private def makeChannelBuilder(
port: Port,
channelType: Class[_ <: io.netty.channel.Channel],
eventLoopGroup: EventLoopGroup,
sslContext: Option[SslContext],
): ResourceOwner[Channel] =
new ResourceOwner[Channel] {
override def acquire()(implicit context: ResourceContext): Resource[Channel] = {
Resource(Future {
val builder = NettyChannelBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, port.value))
.channelType(channelType)
.eventLoopGroup(eventLoopGroup)
.directExecutor()
): NettyChannelBuilder = {
val builder =
NettyChannelBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, port.value))
.channelType(ResourceOwner.EventLoopGroupChannelType)
.eventLoopGroup(eventLoopGroup)
.directExecutor()

sslContext
.fold(builder.usePlaintext())(
builder.sslContext(_).negotiationType(NegotiationType.TLS)
)
.build()
})(channel =>
Future {
channel.shutdownNow()
if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
sys.error(
"Unable to shutdown channel to a remote API under tests. Unable to recover. Terminating."
)
}
}
)
}
}
sslContext
.fold(builder.usePlaintext())(
builder.sslContext(_).negotiationType(NegotiationType.TLS)
)
}
}
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ da_scala_library(
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/timer-utils",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ da_scala_library(
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_netty_netty_handler",
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/tools/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ da_scala_library(
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
Expand Down
1 change: 1 addition & 0 deletions ledger/recovering-indexer-integration-tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ da_scala_test_suite(
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/timer-utils",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:io_dropwizard_metrics_metrics_core",
Expand Down
Loading

0 comments on commit fc69db0

Please sign in to comment.