Skip to content

Commit

Permalink
resources: Pull out the shutdown hook logic into ProgramResource. (d…
Browse files Browse the repository at this point in the history
…igital-asset#4356)

It's not trivial, so it's almost certainly less error-prone to just
define it once.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Feb 4, 2020
1 parent 93a613f commit e203fde
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 175 deletions.
1 change: 1 addition & 0 deletions daml-script/runner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ da_scala_library(
"//ledger/ledger-api-common",
"//ledger/participant-state",
"//ledger/sandbox",
"//libs-scala/resources",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_spray_spray_json_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@
package com.digitalasset.daml.lf.engine.script

import java.io.FileInputStream

import akka.actor.ActorSystem
import akka.stream._
import com.typesafe.scalalogging.StrictLogging
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.io.Source
import scalaz.syntax.traverse._
import spray.json._
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.archive.{Dar, DarReader}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.archive.{Dar, DarReader, Decode}
import com.digitalasset.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName}
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.language.Ast.Package
Expand All @@ -35,8 +27,13 @@ import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.google.protobuf.ByteString
import com.typesafe.scalalogging.StrictLogging
import scalaz.syntax.traverse._
import spray.json._

import scala.util.control.NonFatal
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success}

object TestMain extends StrictLogging {
Expand All @@ -45,7 +42,7 @@ object TestMain extends StrictLogging {

TestConfig.parse(args) match {
case None => sys.exit(1)
case Some(config) => {
case Some(config) =>
val encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)] =
DarReader().readArchiveFromFile(config.darPath).get
val dar: Dar[(PackageId, Package)] = encodedDar.map {
Expand Down Expand Up @@ -79,7 +76,7 @@ object TestMain extends StrictLogging {

val runner = new Runner(dar, applicationId, commandUpdater)
val (participantParams, participantCleanup) = config.participantConfig match {
case Some(file) => {
case Some(file) =>
val source = Source.fromFile(file)
val fileContent = try {
source.mkString
Expand All @@ -88,39 +85,29 @@ object TestMain extends StrictLogging {
}
val jsVal = fileContent.parseJson
import ParticipantsJsonProtocol._
(jsVal.convertTo[Participants[ApiParameters]], () => ())
}
(jsVal.convertTo[Participants[ApiParameters]], () => Future.successful(()))
case None =>
val (apiParameters, cleanup) = if (config.ledgerHost.isEmpty) {
val sandboxConfig = SandboxConfig.default.copy(
port = 0, // Automatically choose a free port.
timeProviderType = config.timeProviderType
timeProviderType = config.timeProviderType,
)
val sandbox = new SandboxServer(sandboxConfig)
val sandboxClosed = new AtomicBoolean(false)

def closeSandbox(): Unit = {
if (sandboxClosed.compareAndSet(false, true)) sandbox.close()
}

try Runtime.getRuntime.addShutdownHook(new Thread(() => closeSandbox()))
catch {
case NonFatal(t) =>
logger.error(
"Shutting down Sandbox application because of initialization error",
t)
closeSandbox()
}
(ApiParameters("localhost", sandbox.port), () => closeSandbox())
val sandboxResource = SandboxServer.owner(sandboxConfig).acquire()
val sandboxPort = Await.result(sandboxResource.asFuture.flatMap(_.port), Duration.Inf)
(ApiParameters("localhost", sandboxPort), () => sandboxResource.release())
} else {
(ApiParameters(config.ledgerHost.get, config.ledgerPort.get), () => ())
(
ApiParameters(config.ledgerHost.get, config.ledgerPort.get),
() => Future.successful(()),
)
}
(
Participants(
default_participant = Some(apiParameters),
participants = Map.empty,
party_participants = Map.empty),
cleanup)
cleanup,
)
}

val flow: Future[Boolean] = for {
Expand Down Expand Up @@ -162,14 +149,13 @@ object TestMain extends StrictLogging {
} yield success.get()

flow.onComplete { _ =>
participantCleanup()
Await.result(participantCleanup(), Duration.Inf)
system.terminate()
}

if (!Await.result(flow, Duration.Inf)) {
sys.exit(1)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer}
import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer}
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import org.slf4j.LoggerFactory

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.ExecutionContext
import scala.util.Try

object ReferenceServer extends App {
Expand All @@ -41,26 +40,23 @@ object ReferenceServer extends App {
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.dispatcher

val resource = newLoggingContext { implicit logCtx =>
val owner = newLoggingContext { implicit logCtx =>
for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- AkkaResourceOwner.forActorSystem(() => system).acquire()
_ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire()
_ <- AkkaResourceOwner.forActorSystem(() => system)
_ <- AkkaResourceOwner.forMaterializer(() => materializer)
ledger <- ResourceOwner
.forCloseable(() => new InMemoryKVParticipantState(config.participantId))
.acquire()
_ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file =>
_ <- ResourceOwner.sequenceIgnoringValues(config.archiveFiles.map { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- ResourceOwner
.forTry(() =>
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file))
.acquire()
_ <- ResourceOwner
.forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None))
.acquire()
} yield ()
})
_ <- startIndexerServer(config, readService = ledger)
Expand All @@ -70,10 +66,8 @@ object ReferenceServer extends App {
writeService = ledger,
authService = AuthServiceWildcard,
)
_ <- Resource.sequenceIgnoringValues(
for {
(extraParticipantId, port, jdbcUrl) <- config.extraParticipants
} yield {
_ <- ResourceOwner.sequenceIgnoringValues(
for ((extraParticipantId, port, jdbcUrl) <- config.extraParticipants) yield {
val participantConfig = config.copy(
port = port,
participantId = extraParticipantId,
Expand All @@ -93,27 +87,23 @@ object ReferenceServer extends App {
} yield ()
}

resource.asFuture.failed.foreach { exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}

Runtime.getRuntime.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds)))
new ProgramResource(owner).run()

private def startIndexerServer(config: Config, readService: ReadService)(
implicit logCtx: LoggingContext): Resource[Unit] =
implicit logCtx: LoggingContext
): ResourceOwner[Unit] =
new StandaloneIndexerServer(
readService,
IndexerConfig(config.participantId, config.jdbcUrl, config.startupMode),
SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"),
).acquire()
)

private def startApiServer(
config: Config,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
)(implicit logCtx: LoggingContext): Resource[Unit] =
)(implicit logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneApiServer(
ApiServerConfig(
config.participantId,
Expand All @@ -130,5 +120,5 @@ object ReferenceServer extends App {
writeService,
authService,
SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"),
).acquire()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
package com.daml.ledger.on.memory

import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.resources.ProgramResource

import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
Runner(
"In-Memory Ledger",
(ledgerId, participantId) => InMemoryLedgerReaderWriter.owner(ledgerId, participantId),
).run(args)
new ProgramResource(
Runner("In-Memory Ledger", InMemoryLedgerReaderWriter.owner(_, _)).owner(args)).run()
}
1 change: 0 additions & 1 deletion ledger/ledger-on-posix-filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import java.nio.file.Path
import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
Runner("File System Ledger", FileSystemLedgerFactory).run(args)
new ProgramResource(Runner("File System Ledger", FileSystemLedgerFactory).owner(args)).run()

case class ExtraConfig(root: Option[Path])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,33 @@ import java.nio.file.{Files, Path}

import com.daml.ledger.on.filesystem.posix.DeleteFiles.deleteFiles
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.resources.Resource
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.{ProgramResource, Resource, ResourceOwner}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ExecutionContext, Future}

object MainWithEphemeralDirectory extends App {
implicit val executionContext: ExecutionContext = DirectExecutionContext
new ProgramResource(Runner("Ephemeral File System Ledger", owner _).owner(args)).run()

val root = Files.createTempDirectory("ledger-on-posix-filesystem-ephemeral-")
def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
): ResourceOwner[FileSystemLedgerReaderWriter] =
for {
root <- temporaryDirectory("ledger-on-posix-filesystem-ephemeral-")
participant <- FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
)
} yield participant

for {
root <- Resource[Path](
Future.successful(root),
directory => Future.successful(deleteFiles(directory)),
)
_ <- Runner(
"Ephemeral File System Ledger",
(ledgerId, participantId) =>
FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
),
).run(args)
} yield ()
def temporaryDirectory(prefix: String): ResourceOwner[Path] = new ResourceOwner[Path] {
def acquire()(implicit executionContext: ExecutionContext): Resource[Path] =
Resource[Path](
Future(Files.createTempDirectory(prefix))(executionContext),
directory => Future(deleteFiles(directory))(executionContext),
)(executionContext)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
Runner("SQL Ledger", SqlLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", SqlLedgerFactory).owner(args)).run()

case class ExtraConfig(jdbcUrl: Option[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import akka.stream.Materializer
import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global

object MainWithEphemeralDirectory extends App {
val DirectoryPattern = "%DIR"

val directory = Files.createTempDirectory("ledger-on-sql-ephemeral-")

Runner("SQL Ledger", TestLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", TestLedgerFactory).owner(args)).run()

object TestLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, ExtraConfig] {
override val defaultExtraConfig: ExtraConfig = SqlLedgerFactory.defaultExtraConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import akka.stream.Materializer
import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import com.digitalasset.testing.postgresql.PostgresAround
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global

object MainWithEphemeralPostgresql extends App with PostgresAround {
startEphemeralPostgres()
sys.addShutdownHook(stopAndCleanUpPostgres())

Runner("SQL Ledger", PostgresqlLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", PostgresqlLedgerFactory).owner(args)).run()

object PostgresqlLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, Unit] {
override val defaultExtraConfig: Unit = ()
Expand Down
1 change: 0 additions & 1 deletion ledger/participant-state/kvutils/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,5 @@ da_scala_library(
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_slf4j_slf4j_api",
],
)
Loading

0 comments on commit e203fde

Please sign in to comment.