Skip to content

Commit

Permalink
Add runner for non-repudiation (#9076)
Browse files Browse the repository at this point in the history
* Add runner for non-repudiation

Closes #8633

changelog_begin
changelog_end

* Fix existing tests

* Add test for non-repudiation-app option parser

* Remove unnecessary dependencies from non-repudiation testing

* Fix Scala 2.13 build errors

* Fix help message for --api-address and --api-port
  • Loading branch information
stefanobaghino-da authored Mar 11, 2021
1 parent 4784cfa commit 0e4af74
Show file tree
Hide file tree
Showing 16 changed files with 454 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ final case class PostgresDatabase private[postgresql] (

def password: String = server.password

def urlWithoutCredentials: String =
s"jdbc:postgresql://$hostName:$port/$databaseName"

def url: String =
s"jdbc:postgresql://$hostName:$port/$databaseName?user=$userName&password=$password"
s"$urlWithoutCredentials?user=$userName&password=$password"

override def toString: String = url
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,46 @@

package com.daml.nonrepudiation.api

import java.net.InetAddress
import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{concat, pathPrefix}
import akka.http.scaladsl.server.Route
import com.daml.nonrepudiation.api.NonRepudiationApi.Configuration
import com.daml.nonrepudiation.{CertificateRepository, CommandIdString, SignedPayloadRepository}
import com.daml.resources.{AbstractResourceOwner, HasExecutionContext, ReleasableResource, Resource}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.duration.FiniteDuration

object NonRepudiationApi {

// We don't need access to the underlying resource, we use
// the owner only to manage the server's life cycle
def owner[Context: HasExecutionContext](
configuration: Configuration,
address: InetSocketAddress,
shutdownTimeout: FiniteDuration,
certificateRepository: CertificateRepository,
signedPayloadRepository: SignedPayloadRepository.Read[CommandIdString],
)(implicit system: ActorSystem): AbstractResourceOwner[Context, Unit] =
actorSystem: ActorSystem,
): AbstractResourceOwner[Context, Unit] =
new NonRepudiationApi[Context](
configuration,
address,
shutdownTimeout,
certificateRepository,
signedPayloadRepository,
actorSystem,
).map(_ => ())

final case class Configuration(
interface: String,
port: Int,
shutdownTimeout: FiniteDuration,
)

// Non-repudiation -> NR -> N = 78 and R = 82 in ASCII
val DefaultPort: Int = 7882

object Configuration {

val Default: Configuration =
Configuration(
interface = InetAddress.getLoopbackAddress.getHostAddress,
port = DefaultPort,
shutdownTimeout = 10.seconds,
)

}

}

final class NonRepudiationApi[Context: HasExecutionContext] private (
configuration: Configuration,
address: InetSocketAddress,
shutdownTimeout: FiniteDuration,
certificates: CertificateRepository,
signedPayloads: SignedPayloadRepository.Read[CommandIdString],
)(implicit system: ActorSystem)
extends AbstractResourceOwner[Context, Http.ServerBinding] {
actorSystem: ActorSystem,
) extends AbstractResourceOwner[Context, Http.ServerBinding] {

private val route: Route =
pathPrefix("v1") {
Expand All @@ -68,19 +52,16 @@ final class NonRepudiationApi[Context: HasExecutionContext] private (
)
}

private def bindNewServer(): Future[Http.ServerBinding] =
Http()
.newServerAt(
interface = configuration.interface,
port = configuration.port,
)
.bind(route)
private def bindNewServer(): Future[Http.ServerBinding] = {
implicit val system: ActorSystem = actorSystem
Http().newServerAt(address.getAddress.getHostAddress, address.getPort).bind(route)
}

override def acquire()(implicit context: Context): Resource[Context, Http.ServerBinding] =
ReleasableResource(bindNewServer()) { server =>
for {
_ <- server.unbind()
_ <- server.terminate(configuration.shutdownTimeout)
_ <- server.terminate(shutdownTimeout)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.nonrepudiation.api

import java.net.{InetAddress, InetSocketAddress}
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -238,16 +239,18 @@ final class NonRepudiationApiSpec

val port = FreePort.find().value

val configuration = NonRepudiationApi.Configuration.Default.copy(port = port)
val address = new InetSocketAddress(InetAddress.getLoopbackAddress, port)

val api =
NonRepudiationApi.owner(
configuration,
address = address,
shutdownTimeout = 10.seconds,
certificates,
signedPayloads,
actorSystem,
)

val baseUrl = s"http://${configuration.interface}:${configuration.port}"
val baseUrl = s"http://${address.getAddress.getHostAddress}:${address.getPort}"

api.use { _ => test(baseUrl, certificates, signedPayloads) }

Expand Down
50 changes: 50 additions & 0 deletions runtime-components/non-repudiation-app/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_test",
)

da_scala_binary(
name = "non-repudiation-app",
srcs = glob(["src/main/scala/**/*.scala"]),
main_class = "com.daml.nonrepudiation.app.NonRepudiationApp",
scala_deps = [
"@maven//:com_github_scopt_scopt",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_tpolecat_doobie_core",
"@maven//:org_tpolecat_doobie_hikari",
"@maven//:org_typelevel_cats_effect",
],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:org_postgresql_postgresql",
],
deps = [
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//libs-scala/doobie-slf4j",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//runtime-components/non-repudiation",
"//runtime-components/non-repudiation-api",
"//runtime-components/non-repudiation-postgresql",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_core",
"@maven//:io_grpc_grpc_netty",
"@maven//:org_slf4j_slf4j_api",
],
)

da_scala_test(
name = "test",
srcs = glob(["src/test/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_github_scopt_scopt",
],
deps = [
":non-repudiation-app",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.nonrepudiation.app

import java.net.{InetAddress, InetSocketAddress}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

object Configuration {

private val LocalHost = InetAddress.getLocalHost.getHostAddress
private val LoopbackAddress = InetAddress.getLoopbackAddress.getHostAddress

private val ApiDefaultPort: Int = 7882 // Non-repudiation -> NR -> N = 78 and R = 82 in ASCII
private val ParticipantDefaultPort: Int = 6865
private val ProxyDefaultPort: Int = ParticipantDefaultPort

val Default: Configuration =
Configuration(
participantAddress = new InetSocketAddress(LocalHost, ParticipantDefaultPort),
proxyAddress = new InetSocketAddress(LoopbackAddress, ProxyDefaultPort),
apiAddress = new InetSocketAddress(LoopbackAddress, ApiDefaultPort),
apiShutdownTimeout = 10.seconds,
databaseJdbcUrl = "jdbc:postgresql:/",
databaseJdbcUsername = "nonrepudiation",
databaseJdbcPassword = "nonrepudiation",
databaseMaxPoolSize = 10,
metricsReportingPeriod = 5.seconds,
)

}

final case class Configuration private (
participantAddress: InetSocketAddress,
proxyAddress: InetSocketAddress,
apiAddress: InetSocketAddress,
apiShutdownTimeout: FiniteDuration,
databaseJdbcUrl: String,
databaseJdbcUsername: String,
databaseJdbcPassword: String,
databaseMaxPoolSize: Int,
metricsReportingPeriod: FiniteDuration,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.nonrepudiation.app

import java.time.Clock

import akka.actor.ActorSystem
import com.daml.doobie.logging.Slf4jLogHandler
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionService
import com.daml.nonrepudiation.api.NonRepudiationApi
import com.daml.nonrepudiation.postgresql.{Tables, createTransactor}
import com.daml.nonrepudiation.{MetricsReporterOwner, NonRepudiationProxy}
import com.daml.resources.akka.AkkaResourceOwnerFactories
import com.daml.resources.{
AbstractResourceOwner,
HasExecutionContext,
ProgramResource,
ResourceOwnerFactories,
}
import io.grpc.Server
import io.grpc.netty.{NettyChannelBuilder, NettyServerBuilder}

import scala.concurrent.ExecutionContext

object NonRepudiationApp {

private[app] val Name = "non-repudiation-app"

private val resourceFactory = new ResourceOwnerFactories[ExecutionContext]
with AkkaResourceOwnerFactories[ExecutionContext] {
override protected implicit val hasExecutionContext: HasExecutionContext[ExecutionContext] =
HasExecutionContext.`ExecutionContext has itself`
}

def main(args: Array[String]): Unit = {

val configuration: Configuration =
OptionParser.parse(args, Configuration.Default).getOrElse(sys.exit(1))

val program = new ProgramResource(appOwner(configuration))

program.run(identity)

}

def appOwner(
configuration: Configuration
): AbstractResourceOwner[ExecutionContext, Server] = {

val participantChannel =
NettyChannelBuilder.forAddress(configuration.participantAddress).usePlaintext().build()

val proxyChannelBuilder =
NettyServerBuilder.forAddress(configuration.proxyAddress)

for {
actorSystem <- resourceFactory.forActorSystem(() => ActorSystem(Name))
transactor <- createTransactor(
configuration.databaseJdbcUrl,
configuration.databaseJdbcUsername,
configuration.databaseJdbcPassword,
configuration.databaseMaxPoolSize,
resourceFactory,
)
logHandler = Slf4jLogHandler(getClass)
db = Tables.initialize(transactor)(logHandler)
_ <- MetricsReporterOwner.slf4j(period = configuration.metricsReportingPeriod)
_ <- NonRepudiationApi.owner(
configuration.apiAddress,
configuration.apiShutdownTimeout,
db.certificates,
db.signedPayloads,
actorSystem,
)
proxy <- NonRepudiationProxy.owner(
participantChannel,
proxyChannelBuilder,
db.certificates,
db.signedPayloads,
Clock.systemUTC(),
CommandService.scalaDescriptor.fullName,
CommandSubmissionService.scalaDescriptor.fullName,
)
} yield proxy
}

}
Loading

0 comments on commit 0e4af74

Please sign in to comment.