diff --git a/daml-script/runner/BUILD.bazel b/daml-script/runner/BUILD.bazel index cd90e63658ac..199a5ecb4c8f 100644 --- a/daml-script/runner/BUILD.bazel +++ b/daml-script/runner/BUILD.bazel @@ -29,6 +29,7 @@ da_scala_library( "//ledger/ledger-api-common", "//ledger/participant-state", "//ledger/sandbox", + "//libs-scala/resources", "@maven//:com_github_scopt_scopt_2_12", "@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:io_spray_spray_json_2_12", diff --git a/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/TestMain.scala b/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/TestMain.scala index ce237581b8a0..6568fe5a00c4 100644 --- a/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/TestMain.scala +++ b/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/TestMain.scala @@ -4,21 +4,13 @@ package com.digitalasset.daml.lf.engine.script import java.io.FileInputStream - -import akka.actor.ActorSystem -import akka.stream._ -import com.typesafe.scalalogging.StrictLogging import java.time.Instant import java.util.concurrent.atomic.AtomicBoolean -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration -import scala.io.Source -import scalaz.syntax.traverse._ -import spray.json._ +import akka.actor.ActorSystem +import akka.stream.Materializer import com.digitalasset.api.util.TimeProvider -import com.digitalasset.daml.lf.archive.{Dar, DarReader} -import com.digitalasset.daml.lf.archive.Decode +import com.digitalasset.daml.lf.archive.{Dar, DarReader, Decode} import com.digitalasset.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName} import com.digitalasset.daml.lf.language.Ast import com.digitalasset.daml.lf.language.Ast.Package @@ -35,8 +27,13 @@ import com.digitalasset.platform.sandbox.SandboxServer import com.digitalasset.platform.sandbox.config.SandboxConfig import com.digitalasset.platform.services.time.TimeProviderType import com.google.protobuf.ByteString +import com.typesafe.scalalogging.StrictLogging +import scalaz.syntax.traverse._ +import spray.json._ -import scala.util.control.NonFatal +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.io.Source import scala.util.{Failure, Success} object TestMain extends StrictLogging { @@ -45,7 +42,7 @@ object TestMain extends StrictLogging { TestConfig.parse(args) match { case None => sys.exit(1) - case Some(config) => { + case Some(config) => val encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)] = DarReader().readArchiveFromFile(config.darPath).get val dar: Dar[(PackageId, Package)] = encodedDar.map { @@ -79,7 +76,7 @@ object TestMain extends StrictLogging { val runner = new Runner(dar, applicationId, commandUpdater) val (participantParams, participantCleanup) = config.participantConfig match { - case Some(file) => { + case Some(file) => val source = Source.fromFile(file) val fileContent = try { source.mkString @@ -88,39 +85,29 @@ object TestMain extends StrictLogging { } val jsVal = fileContent.parseJson import ParticipantsJsonProtocol._ - (jsVal.convertTo[Participants[ApiParameters]], () => ()) - } + (jsVal.convertTo[Participants[ApiParameters]], () => Future.successful(())) case None => val (apiParameters, cleanup) = if (config.ledgerHost.isEmpty) { val sandboxConfig = SandboxConfig.default.copy( port = 0, // Automatically choose a free port. - timeProviderType = config.timeProviderType + timeProviderType = config.timeProviderType, ) - val sandbox = new SandboxServer(sandboxConfig) - val sandboxClosed = new AtomicBoolean(false) - - def closeSandbox(): Unit = { - if (sandboxClosed.compareAndSet(false, true)) sandbox.close() - } - - try Runtime.getRuntime.addShutdownHook(new Thread(() => closeSandbox())) - catch { - case NonFatal(t) => - logger.error( - "Shutting down Sandbox application because of initialization error", - t) - closeSandbox() - } - (ApiParameters("localhost", sandbox.port), () => closeSandbox()) + val sandboxResource = SandboxServer.owner(sandboxConfig).acquire() + val sandboxPort = Await.result(sandboxResource.asFuture.flatMap(_.port), Duration.Inf) + (ApiParameters("localhost", sandboxPort), () => sandboxResource.release()) } else { - (ApiParameters(config.ledgerHost.get, config.ledgerPort.get), () => ()) + ( + ApiParameters(config.ledgerHost.get, config.ledgerPort.get), + () => Future.successful(()), + ) } ( Participants( default_participant = Some(apiParameters), participants = Map.empty, party_participants = Map.empty), - cleanup) + cleanup, + ) } val flow: Future[Boolean] = for { @@ -162,14 +149,13 @@ object TestMain extends StrictLogging { } yield success.get() flow.onComplete { _ => - participantCleanup() + Await.result(participantCleanup(), Duration.Inf) system.terminate() } if (!Await.result(flow, Duration.Inf)) { sys.exit(1) } - } } } } diff --git a/ledger/api-server-damlonx/reference-v2/src/main/scala/com/daml/ledger/api/server/damlonx/reference/v2/ReferenceServer.scala b/ledger/api-server-damlonx/reference-v2/src/main/scala/com/daml/ledger/api/server/damlonx/reference/v2/ReferenceServer.scala index 31da4e49faef..38297e8dd985 100644 --- a/ledger/api-server-damlonx/reference-v2/src/main/scala/com/daml/ledger/api/server/damlonx/reference/v2/ReferenceServer.scala +++ b/ledger/api-server-damlonx/reference-v2/src/main/scala/com/daml/ledger/api/server/damlonx/reference/v2/ReferenceServer.scala @@ -18,11 +18,10 @@ import com.digitalasset.logging.LoggingContext.newLoggingContext import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer} import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer} import com.digitalasset.resources.akka.AkkaResourceOwner -import com.digitalasset.resources.{Resource, ResourceOwner} +import com.digitalasset.resources.{ProgramResource, ResourceOwner} import org.slf4j.LoggerFactory -import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.ExecutionContext import scala.util.Try object ReferenceServer extends App { @@ -41,26 +40,23 @@ object ReferenceServer extends App { implicit val materializer: Materializer = Materializer(system) implicit val executionContext: ExecutionContext = system.dispatcher - val resource = newLoggingContext { implicit logCtx => + val owner = newLoggingContext { implicit logCtx => for { // Take ownership of the actor system and materializer so they're cleaned up properly. // This is necessary because we can't declare them as implicits within a `for` comprehension. - _ <- AkkaResourceOwner.forActorSystem(() => system).acquire() - _ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire() + _ <- AkkaResourceOwner.forActorSystem(() => system) + _ <- AkkaResourceOwner.forMaterializer(() => materializer) ledger <- ResourceOwner .forCloseable(() => new InMemoryKVParticipantState(config.participantId)) - .acquire() - _ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file => + _ <- ResourceOwner.sequenceIgnoringValues(config.archiveFiles.map { file => val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString) for { dar <- ResourceOwner .forTry(() => DarReader { case (_, x) => Try(Archive.parseFrom(x)) } .readArchiveFromFile(file)) - .acquire() _ <- ResourceOwner .forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None)) - .acquire() } yield () }) _ <- startIndexerServer(config, readService = ledger) @@ -70,10 +66,8 @@ object ReferenceServer extends App { writeService = ledger, authService = AuthServiceWildcard, ) - _ <- Resource.sequenceIgnoringValues( - for { - (extraParticipantId, port, jdbcUrl) <- config.extraParticipants - } yield { + _ <- ResourceOwner.sequenceIgnoringValues( + for ((extraParticipantId, port, jdbcUrl) <- config.extraParticipants) yield { val participantConfig = config.copy( port = port, participantId = extraParticipantId, @@ -93,27 +87,23 @@ object ReferenceServer extends App { } yield () } - resource.asFuture.failed.foreach { exception => - logger.error("Shutting down because of an initialization error.", exception) - System.exit(1) - } - - Runtime.getRuntime.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds))) + new ProgramResource(owner).run() private def startIndexerServer(config: Config, readService: ReadService)( - implicit logCtx: LoggingContext): Resource[Unit] = + implicit logCtx: LoggingContext + ): ResourceOwner[Unit] = new StandaloneIndexerServer( readService, IndexerConfig(config.participantId, config.jdbcUrl, config.startupMode), SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"), - ).acquire() + ) private def startApiServer( config: Config, readService: ReadService, writeService: WriteService, authService: AuthService, - )(implicit logCtx: LoggingContext): Resource[Unit] = + )(implicit logCtx: LoggingContext): ResourceOwner[Unit] = new StandaloneApiServer( ApiServerConfig( config.participantId, @@ -130,5 +120,5 @@ object ReferenceServer extends App { writeService, authService, SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"), - ).acquire() + ) } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/Main.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/Main.scala index 55d74834f9bb..a22e950436cb 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/Main.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/Main.scala @@ -4,12 +4,11 @@ package com.daml.ledger.on.memory import com.daml.ledger.participant.state.kvutils.app.Runner +import com.digitalasset.resources.ProgramResource import scala.concurrent.ExecutionContext.Implicits.global object Main extends App { - Runner( - "In-Memory Ledger", - (ledgerId, participantId) => InMemoryLedgerReaderWriter.owner(ledgerId, participantId), - ).run(args) + new ProgramResource( + Runner("In-Memory Ledger", InMemoryLedgerReaderWriter.owner(_, _)).owner(args)).run() } diff --git a/ledger/ledger-on-posix-filesystem/BUILD.bazel b/ledger/ledger-on-posix-filesystem/BUILD.bazel index 143fbefebf96..2ae27e2183df 100644 --- a/ledger/ledger-on-posix-filesystem/BUILD.bazel +++ b/ledger/ledger-on-posix-filesystem/BUILD.bazel @@ -56,7 +56,6 @@ da_scala_library( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//ledger/participant-state/kvutils/app", - "//libs-scala/direct-execution-context", "//libs-scala/resources", ], ) diff --git a/ledger/ledger-on-posix-filesystem/src/main/scala/com/daml/ledger/on/filesystem/posix/Main.scala b/ledger/ledger-on-posix-filesystem/src/main/scala/com/daml/ledger/on/filesystem/posix/Main.scala index e9ba93a610e0..b9932c25aab2 100644 --- a/ledger/ledger-on-posix-filesystem/src/main/scala/com/daml/ledger/on/filesystem/posix/Main.scala +++ b/ledger/ledger-on-posix-filesystem/src/main/scala/com/daml/ledger/on/filesystem/posix/Main.scala @@ -9,13 +9,13 @@ import java.nio.file.Path import akka.stream.Materializer import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner} import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} -import com.digitalasset.resources.ResourceOwner +import com.digitalasset.resources.{ProgramResource, ResourceOwner} import scopt.OptionParser import scala.concurrent.ExecutionContext.Implicits.global object Main extends App { - Runner("File System Ledger", FileSystemLedgerFactory).run(args) + new ProgramResource(Runner("File System Ledger", FileSystemLedgerFactory).owner(args)).run() case class ExtraConfig(root: Option[Path]) diff --git a/ledger/ledger-on-posix-filesystem/src/test/lib/scala/com/daml/ledger/on/filesystem/posix/MainWithEphemeralDirectory.scala b/ledger/ledger-on-posix-filesystem/src/test/lib/scala/com/daml/ledger/on/filesystem/posix/MainWithEphemeralDirectory.scala index 131ee0c17ad2..ab1796593106 100644 --- a/ledger/ledger-on-posix-filesystem/src/test/lib/scala/com/daml/ledger/on/filesystem/posix/MainWithEphemeralDirectory.scala +++ b/ledger/ledger-on-posix-filesystem/src/test/lib/scala/com/daml/ledger/on/filesystem/posix/MainWithEphemeralDirectory.scala @@ -7,29 +7,33 @@ import java.nio.file.{Files, Path} import com.daml.ledger.on.filesystem.posix.DeleteFiles.deleteFiles import com.daml.ledger.participant.state.kvutils.app.Runner -import com.digitalasset.dec.DirectExecutionContext -import com.digitalasset.resources.Resource +import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} +import com.digitalasset.resources.{ProgramResource, Resource, ResourceOwner} +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{ExecutionContext, Future} object MainWithEphemeralDirectory extends App { - implicit val executionContext: ExecutionContext = DirectExecutionContext + new ProgramResource(Runner("Ephemeral File System Ledger", owner _).owner(args)).run() - val root = Files.createTempDirectory("ledger-on-posix-filesystem-ephemeral-") + def owner( + ledgerId: LedgerId, + participantId: ParticipantId, + ): ResourceOwner[FileSystemLedgerReaderWriter] = + for { + root <- temporaryDirectory("ledger-on-posix-filesystem-ephemeral-") + participant <- FileSystemLedgerReaderWriter.owner( + ledgerId = ledgerId, + participantId = participantId, + root = root, + ) + } yield participant - for { - root <- Resource[Path]( - Future.successful(root), - directory => Future.successful(deleteFiles(directory)), - ) - _ <- Runner( - "Ephemeral File System Ledger", - (ledgerId, participantId) => - FileSystemLedgerReaderWriter.owner( - ledgerId = ledgerId, - participantId = participantId, - root = root, - ), - ).run(args) - } yield () + def temporaryDirectory(prefix: String): ResourceOwner[Path] = new ResourceOwner[Path] { + def acquire()(implicit executionContext: ExecutionContext): Resource[Path] = + Resource[Path]( + Future(Files.createTempDirectory(prefix))(executionContext), + directory => Future(deleteFiles(directory))(executionContext), + )(executionContext) + } } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Main.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Main.scala index 1a85a6b1ad4f..f2c0c9cf0d9e 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Main.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Main.scala @@ -7,13 +7,13 @@ import akka.stream.Materializer import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner} import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} import com.digitalasset.logging.LoggingContext.newLoggingContext -import com.digitalasset.resources.ResourceOwner +import com.digitalasset.resources.{ProgramResource, ResourceOwner} import scopt.OptionParser import scala.concurrent.ExecutionContext.Implicits.global object Main extends App { - Runner("SQL Ledger", SqlLedgerFactory).run(args) + new ProgramResource(Runner("SQL Ledger", SqlLedgerFactory).owner(args)).run() case class ExtraConfig(jdbcUrl: Option[String]) diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralDirectory.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralDirectory.scala index c0b43daafbad..258d62632ae2 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralDirectory.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralDirectory.scala @@ -9,15 +9,17 @@ import akka.stream.Materializer import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory} import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner} import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} -import com.digitalasset.resources.ResourceOwner +import com.digitalasset.resources.{ProgramResource, ResourceOwner} import scopt.OptionParser +import scala.concurrent.ExecutionContext.Implicits.global + object MainWithEphemeralDirectory extends App { val DirectoryPattern = "%DIR" val directory = Files.createTempDirectory("ledger-on-sql-ephemeral-") - Runner("SQL Ledger", TestLedgerFactory).run(args) + new ProgramResource(Runner("SQL Ledger", TestLedgerFactory).owner(args)).run() object TestLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, ExtraConfig] { override val defaultExtraConfig: ExtraConfig = SqlLedgerFactory.defaultExtraConfig diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralPostgresql.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralPostgresql.scala index 3234d7b6ef62..8f52d3c4aa7f 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralPostgresql.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/MainWithEphemeralPostgresql.scala @@ -7,15 +7,17 @@ import akka.stream.Materializer import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory} import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner} import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} -import com.digitalasset.resources.ResourceOwner +import com.digitalasset.resources.{ProgramResource, ResourceOwner} import com.digitalasset.testing.postgresql.PostgresAround import scopt.OptionParser +import scala.concurrent.ExecutionContext.Implicits.global + object MainWithEphemeralPostgresql extends App with PostgresAround { startEphemeralPostgres() sys.addShutdownHook(stopAndCleanUpPostgres()) - Runner("SQL Ledger", PostgresqlLedgerFactory).run(args) + new ProgramResource(Runner("SQL Ledger", PostgresqlLedgerFactory).owner(args)).run() object PostgresqlLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, Unit] { override val defaultExtraConfig: Unit = () diff --git a/ledger/participant-state/kvutils/app/BUILD.bazel b/ledger/participant-state/kvutils/app/BUILD.bazel index 5bc06e9bab63..26444edb0045 100644 --- a/ledger/participant-state/kvutils/app/BUILD.bazel +++ b/ledger/participant-state/kvutils/app/BUILD.bazel @@ -42,6 +42,5 @@ da_scala_library( "@maven//:com_typesafe_akka_akka_actor_2_12", "@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:io_dropwizard_metrics_metrics_core", - "@maven//:org_slf4j_slf4j_api", ], ) diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index 31f6967db4a3..45e99e663cf4 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -9,6 +9,7 @@ import akka.actor.ActorSystem import akka.stream.Materializer import com.codahale.metrics.SharedMetricRegistries import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState +import com.daml.ledger.participant.state.kvutils.app.Runner._ import com.daml.ledger.participant.state.v1.{ LedgerId, ParticipantId, @@ -29,51 +30,46 @@ import com.digitalasset.platform.indexer.{ IndexerStartupMode, StandaloneIndexerServer } +import com.digitalasset.resources.ProgramResource.SuppressedException +import com.digitalasset.resources.ResourceOwner import com.digitalasset.resources.akka.AkkaResourceOwner -import com.digitalasset.resources.{Resource, ResourceOwner} -import org.slf4j.LoggerFactory -import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.ExecutionContext import scala.util.Try class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, Extra]) { - def run(args: Seq[String]): Resource[Unit] = { - val config = Config - .parse(name, factory.extraConfigParser, factory.defaultExtraConfig, args) - .getOrElse(sys.exit(1)) - - val logger = LoggerFactory.getLogger(getClass) - + def owner(args: Seq[String]): ResourceOwner[Unit] = { implicit val system: ActorSystem = ActorSystem( "[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-")) implicit val materializer: Materializer = Materializer(system) implicit val executionContext: ExecutionContext = system.dispatcher - val ledgerId = - config.ledgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) - - val resource = newLoggingContext { implicit logCtx => + newLoggingContext { implicit logCtx => for { // Take ownership of the actor system and materializer so they're cleaned up properly. // This is necessary because we can't declare them as implicits within a `for` comprehension. - _ <- AkkaResourceOwner.forActorSystem(() => system).acquire() - _ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire() + _ <- AkkaResourceOwner.forActorSystem(() => system) + _ <- AkkaResourceOwner.forMaterializer(() => materializer) + + config <- Config + .parse(name, factory.extraConfigParser, factory.defaultExtraConfig, args) + .fold(ResourceOwner.failed[Config[Extra]](new ConfigParseException))( + ResourceOwner.successful) + + ledgerId = config.ledgerId.getOrElse( + Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) readerWriter <- factory .owner(ledgerId, config.participantId, config.extra) - .acquire() ledger = new KeyValueParticipantState(readerWriter, readerWriter) - _ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file => + _ <- ResourceOwner.sequenceIgnoringValues(config.archiveFiles.map { file => val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString) for { dar <- ResourceOwner .forTry(() => DarReader { case (_, x) => Try(Archive.parseFrom(x)) } .readArchiveFromFile(file.toFile)) - .acquire() _ <- ResourceOwner .forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None)) - .acquire() } yield () }) _ <- startIndexerServer(config, readService = ledger) @@ -85,22 +81,12 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, ) } yield () } - - resource.asFuture.failed.foreach { exception => - logger.error("Shutting down because of an initialization error.", exception) - System.exit(1) - } - - Runtime.getRuntime - .addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds))) - - resource } private def startIndexerServer( config: Config[Extra], readService: ReadService, - )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Resource[Unit] = + )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] = new StandaloneIndexerServer( readService, IndexerConfig( @@ -109,14 +95,14 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, startupMode = IndexerStartupMode.MigrateAndStart, ), SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"), - ).acquire() + ) private def startApiServer( config: Config[Extra], readService: ReadService, writeService: WriteService, authService: AuthService, - )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Resource[Unit] = + )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] = new StandaloneApiServer( ApiServerConfig( config.participantId, @@ -133,7 +119,7 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, writeService, authService, SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"), - ).acquire() + ) } object Runner { @@ -148,4 +134,6 @@ object Runner { factory: LedgerFactory[T, Extra], ): Runner[T, Extra] = new Runner(name, factory) + + class ConfigParseException extends SuppressedException } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxMain.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxMain.scala index 89f458594d1f..68a82782ed5b 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxMain.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxMain.scala @@ -3,44 +3,22 @@ package com.digitalasset.platform.sandbox -import java.util.concurrent.atomic.AtomicBoolean - import ch.qos.logback.classic.Level +import com.digitalasset.dec.DirectExecutionContext import com.digitalasset.platform.sandbox.cli.Cli +import com.digitalasset.resources.ProgramResource import org.slf4j.{Logger, LoggerFactory} -import scala.util.control.NonFatal +import scala.concurrent.ExecutionContext object SandboxMain extends App { + private implicit val executionContext: ExecutionContext = DirectExecutionContext - private val logger = LoggerFactory.getLogger(this.getClass) + private val logger: Logger = LoggerFactory.getLogger(this.getClass) Cli.parse(args).fold(sys.exit(1)) { config => setGlobalLogLevel(config.logLevel) - - val server = new SandboxServer(config) - - val closed = new AtomicBoolean(false) - - def closeServer(): Unit = { - if (closed.compareAndSet(false, true)) server.close() - } - - server.failure.foreach { exception => - logger.error( - s"Shutting down Sandbox application due to an initialization error:\n${exception.getMessage}") - closeServer() - sys.exit(1) - } - - try { - Runtime.getRuntime.addShutdownHook(new Thread(() => closeServer())) - } catch { - case NonFatal(exception) => - logger.error("Shutting down Sandbox application due to an initialization error.", exception) - closeServer() - sys.exit(1) - } + new ProgramResource(SandboxServer.owner(config)).run() } // Copied from language-support/scala/codegen/src/main/scala/com/digitalasset/codegen/Main.scala diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala index 08fffa0297d6..7463a03e4459 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala @@ -51,6 +51,7 @@ import com.digitalasset.resources.{Resource, ResourceOwner} import scala.collection.JavaConverters._ import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.Try object SandboxServer { private val ActorSystemName = "sandbox" @@ -62,6 +63,16 @@ object SandboxServer { // repeated validation of the sames packages after each reset private val engine = Engine() + def owner(config: SandboxConfig): ResourceOwner[SandboxState] = new ResourceOwner[SandboxState] { + override def acquire()(implicit executionContext: ExecutionContext): Resource[SandboxState] = { + for { + server <- ResourceOwner.forTry(() => Try(new SandboxServer(config))).acquire() + state <- server.sandboxState + _ <- state.apiServer + } yield state + } + } + // if requested, initialize the ledger state with the given scenario private def createInitialState(config: SandboxConfig, packageStore: InMemoryPackageStore) : (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Option[Instant]) = { @@ -91,21 +102,20 @@ object SandboxServer { } } - private final case class SandboxState( + final case class SandboxState( // nested resource so we can release it independently when restarting apiServer: Resource[ApiServer], packageStore: InMemoryPackageStore, materializer: Materializer, ) { - val executionContext: ExecutionContext = materializer.executionContext + private implicit val executionContext: ExecutionContext = materializer.executionContext - def port: Future[Int] = { - apiServer.asFuture.map(_.port)(executionContext) - } + def port: Future[Int] = + apiServer.map(_.port).asFuture } } -final class SandboxServer(config: => SandboxConfig) extends AutoCloseable { +final class SandboxServer(config: SandboxConfig) extends AutoCloseable { // Name of this participant // TODO: Pass this info in command-line (See issue #2025) @@ -127,21 +137,11 @@ final class SandboxServer(config: => SandboxConfig) extends AutoCloseable { } yield state } - def failure: Option[Throwable] = - Await - .result( - sandboxState.asFuture - .flatMap(_.apiServer.asFuture)(DirectExecutionContext) - .transformWith(Future.successful)(DirectExecutionContext), - AsyncTolerance) - .failed - .toOption - def port: Int = Await.result(portF(DirectExecutionContext), AsyncTolerance) def portF(implicit executionContext: ExecutionContext): Future[Int] = - sandboxState.asFuture.flatMap(_.apiServer.asFuture).map(_.port) + sandboxState.flatMap(_.apiServer).map(_.port).asFuture /** the reset service is special, since it triggers a server shutdown */ private def resetService( @@ -158,7 +158,7 @@ final class SandboxServer(config: => SandboxConfig) extends AutoCloseable { def resetAndRestartServer()(implicit executionContext: ExecutionContext): Future[Unit] = { val apiServicesClosed = - sandboxState.asFuture.flatMap(_.apiServer.asFuture).flatMap(_.servicesClosed()) + sandboxState.flatMap(_.apiServer).asFuture.flatMap(_.servicesClosed()) // Need to run this async otherwise the callback kills the server under the in-flight reset service request! // TODO: eliminate the state mutation somehow diff --git a/libs-scala/resources/BUILD.bazel b/libs-scala/resources/BUILD.bazel index 93f4fbdb23f4..8f5b9f7fe2e0 100644 --- a/libs-scala/resources/BUILD.bazel +++ b/libs-scala/resources/BUILD.bazel @@ -11,6 +11,7 @@ da_scala_library( "//visibility:public", ], deps = [ + "//libs-scala/contextualized-logging", "@maven//:org_scala_lang_modules_scala_java8_compat_2_12", ], ) diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ProgramResource.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ProgramResource.scala new file mode 100644 index 000000000000..5b93a7e28ec0 --- /dev/null +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ProgramResource.scala @@ -0,0 +1,48 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.resources + +import com.digitalasset.logging.ContextualizedLogger +import com.digitalasset.logging.LoggingContext.newLoggingContext +import com.digitalasset.resources.ProgramResource._ + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContext} +import scala.util.control.NonFatal + +class ProgramResource[T](owner: ResourceOwner[T]) { + private val logger = ContextualizedLogger.get(getClass) + + def run()(implicit executionContext: ExecutionContext): Resource[T] = { + newLoggingContext { implicit logCtx => + val resource = owner.acquire() + + resource.asFuture.failed.foreach { + case _: SuppressedException => + System.exit(1) + case exception => + logger.error("Shutting down because of an initialization error.", exception) + System.exit(1) + } + + try { + sys.runtime.addShutdownHook(new Thread(() => + Await.result(resource.release(), AsyncTimeout))) + } catch { + case NonFatal(exception) => + logger.error("Shutting down because of an initialization error.", exception) + Await.result(resource.release(), AsyncTimeout) + System.exit(1) + } + + resource + } + } +} + +object ProgramResource { + private val AsyncTimeout = 10.seconds + + abstract class SuppressedException extends RuntimeException +} diff --git a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceOwner.scala b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceOwner.scala index 29a525f42fc7..bc5af6f66835 100644 --- a/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceOwner.scala +++ b/libs-scala/resources/src/main/scala/com/digitalasset/resources/ResourceOwner.scala @@ -6,8 +6,10 @@ package com.digitalasset.resources import java.util.Timer import java.util.concurrent.{CompletionStage, ExecutorService} +import scala.collection.generic.CanBuildFrom import scala.compat.java8.FutureConverters._ import scala.concurrent.{ExecutionContext, Future} +import scala.language.higherKinds import scala.util.{Failure, Success, Try} @FunctionalInterface @@ -75,4 +77,26 @@ object ResourceOwner { def forTimer(acquire: () => Timer): ResourceOwner[Timer] = new TimerResourceOwner(acquire) + + def sequence[T, C[X] <: TraversableOnce[X]](seq: C[ResourceOwner[T]])( + implicit bf: CanBuildFrom[C[ResourceOwner[T]], T, C[T]], + executionContext: ExecutionContext, + ): ResourceOwner[C[T]] = + seq + .foldLeft(ResourceOwner.successful(bf()))((builderResource, elementResource) => + for { + builder <- builderResource + element <- elementResource + } yield builder += element) + .map(_.result()) + + def sequenceIgnoringValues[T, C[X] <: TraversableOnce[X]](seq: C[ResourceOwner[T]])( + implicit executionContext: ExecutionContext, + ): ResourceOwner[Unit] = + seq + .foldLeft(ResourceOwner.successful(()))((builderResource, elementResource) => + for { + _ <- builderResource + _ <- elementResource + } yield ()) }