Skip to content

Commit

Permalink
ledger-api-client + participant-integration-api: Increase the default…
Browse files Browse the repository at this point in the history
… maximum inbound error size, and truncate errors well before that. (digital-asset#6807)

* participant-integration-api: `GrpcServerOwner` -> `GrpcServer.Owner`.

Mostly so I can create a test class named `GrpcServerSpec`.

* ports: Move the free port search from postgresql-testing.

* participant-integration-api: Test the basics of GrpcServer.

This uses the HelloService to make sure the server behaves normally.

* ledger-api-client: Extract out channel configuration from LedgerClient.

So we can test it independently of the LedgerClient itself.

* ledger-api-client: Increase the default maximum inbound header size.

Increased from 8 KB to 1 MB.

* participant-integration-api: Reduce the maximum error message size.

Truncate GRPC error descriptions to 256 KB.

* participant-integration-api: Use `Port.Dynamic` instead of `FreePort`.

In tests.

* participant-integration-api: Explicit null checks when they're shorter.

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* ledger-api-client: Reduce the max inbound message size back to 8 KB.

And reduce the maximum size of an error description pushed out by the
server accordingly.

CHANGELOG_BEGIN
- [Integration Kit] Truncate GPRC error messages at 4 KB. This ensures
  that we won't trigger a protocol error when sending errors to the
  client.
CHANGELOG_END

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
  • Loading branch information
SamirTalwar and stefanobaghino-da authored Jul 21, 2020
1 parent ee74551 commit d6fc2bb
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 88 deletions.
5 changes: 3 additions & 2 deletions ledger-api/sample-service/src/main/protobuf/hello.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message HelloResponse {
}

service HelloService {
rpc Single (HelloRequest) returns (HelloResponse);
rpc ServerStreaming (HelloRequest) returns (stream HelloResponse);
rpc Single (HelloRequest) returns (HelloResponse);
rpc ServerStreaming (HelloRequest) returns (stream HelloResponse);
rpc Fails (HelloRequest) returns (HelloResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ trait Responding extends HelloService {
override def single(request: HelloRequest): Future[HelloResponse] =
Future.successful(response(request))

override def fails(request: HelloRequest): Future[HelloResponse] =
Future.failed(new IllegalStateException(request.payload.toStringUtf8))

protected def response(request: HelloRequest): HelloResponse =
HelloResponse(request.reqInt * 2, request.payload)

Expand Down
2 changes: 2 additions & 0 deletions ledger/ledger-api-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ da_scala_library(
"//ledger/ledger-api-domain",
"//libs-scala/direct-execution-context",
"//libs-scala/grpc-utils",
"//libs-scala/ports",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_grpc_grpc_netty",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.client

import java.net.{InetAddress, InetSocketAddress}

import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.ManagedChannel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}

import scala.concurrent.{ExecutionContext, Future}

object GrpcChannel {

final class Owner(builder: NettyChannelBuilder, configuration: LedgerClientConfiguration)
extends ResourceOwner[ManagedChannel] {
def this(port: Port, configuration: LedgerClientConfiguration) =
this(
NettyChannelBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, port.value)),
configuration,
)

override def acquire()(implicit executionContext: ExecutionContext): Resource[ManagedChannel] =
Resource(
Future {
configuration.sslContext
.fold(builder.usePlaintext())(
builder.sslContext(_).negotiationType(NegotiationType.TLS))
builder.maxInboundMetadataSize(configuration.maxInboundMessageSize)
builder.build()
}
)(channel =>
Future {
channel.shutdownNow()
()
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import com.daml.ledger.api.v1.package_service.PackageServiceGrpc
import com.daml.ledger.api.v1.transaction_service.TransactionServiceGrpc
import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ledger.client.services.acs.ActiveContractSetClient
import com.daml.ledger.client.services.admin.PackageManagementClient
import com.daml.ledger.client.services.admin.PartyManagementClient
import com.daml.ledger.client.services.admin.{PackageManagementClient, PartyManagementClient}
import com.daml.ledger.client.services.commands.{CommandClient, SynchronousCommandClient}
import com.daml.ledger.client.services.identity.LedgerIdentityClient
import com.daml.ledger.client.services.pkg.PackageClient
import com.daml.ledger.client.services.transactions.TransactionClient
import io.grpc.{Channel, ManagedChannel}
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import io.grpc.netty.NettyChannelBuilder
import io.grpc.stub.AbstractStub
import io.grpc.{Channel, ManagedChannel}

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -126,15 +125,14 @@ object LedgerClient {
def fromBuilder(builder: NettyChannelBuilder, configuration: LedgerClientConfiguration)(
implicit ec: ExecutionContext,
esf: ExecutionSequencerFactory): Future[LedgerClient] = {
configuration.sslContext.fold(builder.usePlaintext())(
builder.sslContext(_).negotiationType(NegotiationType.TLS))
val channel = builder.build()
sys.addShutdownHook {
if (!channel.isShutdown) {
val _ = channel.shutdownNow()
val resource = new GrpcChannel.Owner(builder, configuration).acquire()
resource.asFuture.flatMap { channel =>
sys.addShutdownHook {
resource.release()
()
}
apply(channel, configuration)
}
apply(channel, configuration)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@

package com.daml.ledger.client.configuration

import io.grpc.internal.GrpcUtil
import io.netty.handler.ssl.SslContext

/**
* @param applicationId The string that will be used as an application identifier when issuing commands and retrieving transactions
* @param ledgerIdRequirement A [[LedgerIdRequirement]] specifying how the ledger identifier must be checked against the one returned by the LedgerIdentityService
* @param commandClient The [[CommandClientConfiguration]] that defines how the command client should be setup with regards to timeouts, commands in flight and command TTL
* @param sslContext If defined, the context will be passed on to the underlying gRPC code to ensure the communication channel is secured by TLS
* @param token If defined, the access token that will be passed by default, unless overridden in individual calls (mostly useful for short-lived applications)
* @param applicationId The string that will be used as an application identifier when issuing commands and retrieving transactions
* @param ledgerIdRequirement A [[LedgerIdRequirement]] specifying how the ledger identifier must be checked against the one returned by the LedgerIdentityService
* @param commandClient The [[CommandClientConfiguration]] that defines how the command client should be setup with regards to timeouts, commands in flight and command TTL
* @param sslContext If defined, the context will be passed on to the underlying gRPC code to ensure the communication channel is secured by TLS
* @param token If defined, the access token that will be passed by default, unless overridden in individual calls (mostly useful for short-lived applications)
* @param maxInboundMessageSize The maximum size of the response headers.
*/
final case class LedgerClientConfiguration(
applicationId: String,
ledgerIdRequirement: LedgerIdRequirement,
commandClient: CommandClientConfiguration,
sslContext: Option[SslContext],
token: Option[String] = None)
token: Option[String] = None,
maxInboundMessageSize: Int = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,72 @@ import java.net.{BindException, InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit.SECONDS

import com.daml.metrics.Metrics
import com.daml.platform.apiserver.GrpcServerOwner._
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import com.google.protobuf.Message
import io.grpc.netty.NettyServerBuilder
import io.grpc._
import io.grpc.netty.NettyServerBuilder
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.ssl.SslContext

import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace

final class GrpcServerOwner(
address: Option[String],
desiredPort: Port,
maxInboundMessageSize: Int,
sslContext: Option[SslContext] = None,
interceptors: List[ServerInterceptor] = List.empty,
metrics: Metrics,
eventLoopGroups: ServerEventLoopGroups,
services: Iterable[BindableService],
) extends ResourceOwner[Server] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[Server] = {
val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress)
Resource(Future {
val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value))
builder.sslContext(sslContext.orNull)
builder.channelType(classOf[NioServerSocketChannel])
builder.permitKeepAliveTime(10, SECONDS)
builder.permitKeepAliveWithoutCalls(true)
builder.directExecutor()
builder.maxInboundMessageSize(maxInboundMessageSize)
interceptors.foreach(builder.intercept)
builder.intercept(new MetricsInterceptor(metrics))
eventLoopGroups.populate(builder)
services.foreach { service =>
builder.addService(service)
toLegacyService(service).foreach(builder.addService)
}
val server = builder.build()
try {
server.start()
} catch {
case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] =>
throw new UnableToBind(desiredPort, e.getCause)
}
server
})(server => Future(server.shutdown().awaitTermination()))
object GrpcServer {

// Unfortunately, we can't get the maximum inbound message size from the client, so we don't know
// how big this should be. This seems long enough to contain useful data, but short enough that it
// won't break most well-configured clients.
// As the default response header limit for a Netty client is 8 KB, we set our limit to 4 KB to
// allow for extra information such as the exception stack trace.
private val MaximumStatusDescriptionLength = 4 * 1024 // 4 KB

final class Owner(
address: Option[String],
desiredPort: Port,
maxInboundMessageSize: Int,
sslContext: Option[SslContext] = None,
interceptors: List[ServerInterceptor] = List.empty,
metrics: Metrics,
eventLoopGroups: ServerEventLoopGroups,
services: Iterable[BindableService],
) extends ResourceOwner[Server] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[Server] = {
val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress)
Resource(Future {
val builder = NettyServerBuilder.forAddress(new InetSocketAddress(host, desiredPort.value))
builder.sslContext(sslContext.orNull)
builder.channelType(classOf[NioServerSocketChannel])
builder.permitKeepAliveTime(10, SECONDS)
builder.permitKeepAliveWithoutCalls(true)
builder.directExecutor()
builder.maxInboundMessageSize(maxInboundMessageSize)
interceptors.foreach(builder.intercept)
builder.intercept(new MetricsInterceptor(metrics))
builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength))
eventLoopGroups.populate(builder)
services.foreach { service =>
builder.addService(service)
toLegacyService(service).foreach(builder.addService)
}
val server = builder.build()
try {
server.start()
} catch {
case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] =>
throw new UnableToBind(desiredPort, e.getCause)
}
server
})(server => Future(server.shutdown().awaitTermination()))
}
}

final class UnableToBind(port: Port, cause: Throwable)
extends RuntimeException(
s"The API server was unable to bind to port $port. Terminate the process occupying the port, or choose a different one.",
cause)
with NoStackTrace

// This exposes the existing services under com.daml also under com.digitalasset.
// This is necessary to allow applications built with an earlier version of the SDK
// to still work.
Expand Down Expand Up @@ -88,14 +104,5 @@ final class GrpcServerOwner(
Option(digitalassetDef.build())
} else None
}
}

object GrpcServerOwner {

final class UnableToBind(port: Port, cause: Throwable)
extends RuntimeException(
s"The API server was unable to bind to port $port. Terminate the process occupying the port, or choose a different one.",
cause)
with NoStackTrace

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class LedgerApiServer(
).acquire()
apiServicesResource = apiServicesOwner.acquire()
apiServices <- apiServicesResource
server <- new GrpcServerOwner(
server <- new GrpcServer.Owner(
address,
desiredPort,
maxInboundMessageSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.apiserver

import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor, Status}

class TruncatedStatusInterceptor(maximumDescriptionLength: Int) extends ServerInterceptor {
override def interceptCall[ReqT, RespT](
call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT],
): ServerCall.Listener[ReqT] =
next.startCall(
new SimpleForwardingServerCall[ReqT, RespT](call) {
override def close(status: Status, trailers: Metadata): Unit = {
val truncatedStatus = status.withDescription(truncate(status.getDescription))
super.close(truncatedStatus, trailers)
}
},
headers,
)

private def truncate(description: String): String =
if (description != null && description.length > maximumDescriptionLength)
description.substring(0, maximumDescriptionLength - 3) + "..."
else
description
}
Loading

0 comments on commit d6fc2bb

Please sign in to comment.