Skip to content

Commit

Permalink
Sandbox: Manage resources and close them properly on failure. (digita…
Browse files Browse the repository at this point in the history
…l-asset#3871)

* sandbox: Create a monadic `ResourceOwner` to manage resources.

* sandbox: Rewrite `ResourceOwner` to be async.

* sandbox: Make sure failed resources are closed immediately.

* sandbox: Better naming in `Open`.

* sandbox: Rename `Open` to `Resource`, and open/close to acquire/release.

* sandbox: Convert `() => AutoCloseable` into `ResourceOwner`.

* sandbox: Refactor the LedgerApiServer in terms of resources.

* sandbox: Explicitly convert `() => AutoCloseable` to `ResourceOwner`.

Explicit > Implicit, right?

* sandbox: Create helpers for converting things to ResourceOwners.

Because I tried to start using them and there was so much code being
written at once.

* sandbox: Simplify construction of JdbcLedgerDao.

* sandbox: Releasing resources should be idempotent.

In that we should only do it once.

* sandbox: Fix the ResetService by closing the API services _first_.

They need to be shut down before the gRPC server.

* sandbox: Don't try and shut down PostgreSQL twice in tests.

* sandbox: Actually run the assertions in ResourceOwnerSpec.

Facepalm.

* sandbox: Test `Resource.sequence` more rigorously.

* sandbox: Move the helpers around `Resource` into `Resource.apply`.

* sandbox: Convert LedgerApiServer resource owners to classes.

* sandbox: Make `ResourceOwner` a monad too, delegating to `Resource`.

* sandbox: Turn `LedgerApiServer` into a ResourceOwner.

* sandbox: Simplify the public signature of `Resource.apply`.

* sandbox: Use ResourceOwners to simplify DB resource management.

This is one hell of a change. Sorry.

* sandbox: Try not to nest `Await.result` calls.

Causes issues when running in a `DirectExecutionContext`.

* sandbox: Turn index subscriptions into resources.

* sandbox: Fix warnings in RecoveringIndexerSpec.

* sandbox: Always release before recovering the indexer.

* sandbox: Add `flatten` and `transformWith` to `Resource`.

* sandbox: If releasing twice in parallel, the second should wait.

* sandbox: If the indexer recovers, clean up the old subscription.

* sandbox: Convert StandaloneIndexerServer into a resource owner.

* sandbox: Convert StandaloneApiServer into a resource owner.

* reference-v2: Rewrite ReferenceServer in terms of resources.

CHANGELOG_BEGIN

- [Reference v2] On an exception, shut down everything and crash.
  Previously, the server would stay in a half-running state.

CHANGELOG_END

* sandbox: Rewrite SandboxServer in terms of resources.

* sandbox: Write the port file in a Future.

* sandbox: JdbcIndexer no longer needs to manage the actorSystem.

* sandbox: Shut down the LedgerApiServer when closing the Sandbox.

* sandbox: Rename `Resource.pure` to `Resource.successful`.

* sandbox: Rename `Resource.sequence_` to `sequenceIgnoringValues`.

* sandbox: Delete `CloseableResource`.

It's only used in once place. Just inline it.

* sandbox: `LedgerDao` no longer needs to be closeable.

* sandbox: Delete implicit materializers where they're not used.

* http-json: Wait for the Sandbox to start in tests.

* sandbox: Convert `scheduleHeartbeats` into a ResourceOwner.

* reference-v2: Explain why we steal ownership of the actor system.

* sandbox: Document why we only release resources once.

* sandbox: Add clues to ResourceOwnerSpec.

* http-json: Fix HttpServiceTestFixture to pass auth service through.

* codegen-sample-app: In ScalaCodeGenIT, wait for the server to  start.
  • Loading branch information
SamirTalwar authored and mergify[bot] committed Dec 22, 2019
1 parent b72dbb9 commit 1794d8a
Show file tree
Hide file tree
Showing 47 changed files with 2,058 additions and 1,109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.api.util.TimestampConversion.fromInstant
import com.digitalasset.codegen.util.TestUtil.{TestContext, findOpenPort, requiredResource}
import com.digitalasset.codegen.util.TestUtil.{TestContext, requiredResource}
import com.digitalasset.grpc.adapter.AkkaExecutionSequencerPool
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.refinements.ApiTypes.{CommandId, WorkflowId}
Expand Down Expand Up @@ -75,10 +75,8 @@ class ScalaCodeGenIT
private val amat = Materializer(asys)
private val aesf = new AkkaExecutionSequencerPool("clientPool")(asys)

private val port: Int = findOpenPort().fold(e => throw new IllegalStateException(e), identity)

private val serverConfig = SandboxConfig.default.copy(
port = port,
port = 0,
damlPackages = archives,
timeProviderType = TimeProviderType.WallClock,
ledgerIdMode = LedgerIdMode.Static(LedgerId(ledgerId)),
Expand Down Expand Up @@ -115,7 +113,7 @@ class ScalaCodeGenIT
)

private val ledgerF: Future[LedgerClient] =
LedgerClient.singleHost("127.0.0.1", port, clientConfig)(ec, aesf)
LedgerClient.singleHost("127.0.0.1", sandbox.port, clientConfig)(ec, aesf)

private val ledger: LedgerClient = Await.result(ledgerF, shortTimeout)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import akka.Done
import akka.actor.ActorSystem

import scala.collection.breakOut
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}

class AkkaExecutionSequencerPool(
poolName: String,
actorCount: Int = AkkaExecutionSequencerPool.defaultActorCount,
terminationTimeout: FiniteDuration = 30.seconds)(implicit system: ActorSystem)
terminationTimeout: FiniteDuration = 30.seconds,
)(implicit system: ActorSystem)
extends ExecutionSequencerFactory {
require(actorCount > 0)

Expand All @@ -28,18 +29,18 @@ class AkkaExecutionSequencerPool(
override def getExecutionSequencer: ExecutionSequencer =
pool(counter.getAndIncrement() % actorCount)

@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def close(): Unit = {
implicit val ec: ExecutionContextExecutor = system.dispatcher
val eventualDones: Iterable[Future[Done]] = pool.map(_.closeAsync)(breakOut)
Await.result(
Future.firstCompletedOf[Any](
List(
system.whenTerminated, // Cut it short if the actorsystem is down already
Future.sequence(eventualDones))),
terminationTimeout
override def close(): Unit =
Await.result(closeAsync(), terminationTimeout)

def closeAsync(): Future[Unit] = {
implicit val ec: ExecutionContext = system.dispatcher
val eventuallyClosed: Future[Seq[Done]] = Future.sequence(pool.map(_.closeAsync)(breakOut))
Future.firstCompletedOf(
Seq(
system.whenTerminated.map(_ => ()), // Cut it short if the ActorSystem stops.
eventuallyClosed.map(_ => ()),
)
)
()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object HttpServiceTestFixture {

val contractDaoF: Future[Option[ContractDao]] = jdbcConfig.map(c => initializeDb(c)).sequence

val ledgerF: Future[(SandboxServer, Int)] = for {
port <- toFuture(findOpenPort())
ledger <- Future(new SandboxServer(ledgerConfig(port, dars, ledgerId)))
val ledgerF = for {
ledger <- Future(new SandboxServer(ledgerConfig(0, dars, ledgerId)))
port <- ledger.portF
} yield (ledger, port)

val httpServiceF: Future[(ServerBinding, Int)] = for {
Expand Down Expand Up @@ -116,9 +116,9 @@ object HttpServiceTestFixture {
val ledgerId = LedgerId(testName)
val applicationId = ApplicationId(testName)

val ledgerF: Future[(SandboxServer, Int)] = for {
port <- toFuture(findOpenPort())
ledger <- Future(new SandboxServer(ledgerConfig(port, dars, ledgerId, authService)))
val ledgerF = for {
ledger <- Future(new SandboxServer(ledgerConfig(0, dars, ledgerId, authService)))
port <- ledger.portF
} yield (ledger, port)

val clientF: Future[LedgerClient] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.ledger.api.server.damlonx.reference.v2

import java.io.File
import java.nio.file.Path

import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.api.util.TimeProvider
Expand All @@ -13,7 +14,7 @@ import com.digitalasset.platform.indexer.IndexerStartupMode

final case class Config(
port: Int,
portFile: Option[File],
portFile: Option[Path],
archiveFiles: List[File],
maxInboundMessageSize: Int,
timeProvider: TimeProvider, // enables use of non-wall-clock time in tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@
package com.daml.ledger.api.server.damlonx.reference.v2

import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.participant.state.v1.SubmissionId
import com.daml.ledger.api.server.damlonx.reference.v2.cli.Cli
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.daml.ledger.participant.state.v1.{ReadService, SubmissionId, WriteService}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.AuthServiceWildcard
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer}
import com.digitalasset.platform.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
import scala.util.control.NonFatal

object ReferenceServer extends App {
val logger = LoggerFactory.getLogger("indexed-kvutils")
Expand All @@ -38,51 +38,73 @@ object ReferenceServer extends App {

implicit val system: ActorSystem = ActorSystem("indexed-kvutils")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher

val ledger = new InMemoryKVParticipantState(config.participantId)

val readService = ledger
val writeService = ledger
val authService = AuthServiceWildcard
implicit val executionContext: ExecutionContext = system.dispatcher

val resource = for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- ResourceOwner.forActorSystem(() => system).acquire()
_ <- ResourceOwner.forMaterializer(() => materializer).acquire()
ledger <- ResourceOwner
.forCloseable(() => new InMemoryKVParticipantState(config.participantId))
.acquire()
_ = config.archiveFiles.foreach { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file)
} yield ledger.uploadPackages(submissionId, dar.all, None)
}
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
_ <- Resource.sequenceIgnoringValues(
for {
(extraParticipantId, port, jdbcUrl) <- config.extraParticipants
} yield {
val participantConfig = config.copy(
port = port,
participantId = extraParticipantId,
jdbcUrl = jdbcUrl,
)
for {
_ <- startIndexerServer(participantConfig, readService = ledger)
_ <- startApiServer(
participantConfig,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
}
)
} yield ()

config.archiveFiles.foreach { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file)
} yield ledger.uploadPackages(submissionId, dar.all, None)
resource.asFuture.failed.foreach { exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}

val participantF: Future[(AutoCloseable, AutoCloseable)] = for {
indexer <- newIndexer(config)
apiServer <- newApiServer(config).start()
} yield (indexer, apiServer)
Runtime.getRuntime.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds)))

val extraParticipants =
for {
(extraParticipantId, port, jdbcUrl) <- config.extraParticipants
} yield {
val participantConfig = config.copy(
port = port,
participantId = extraParticipantId,
jdbcUrl = jdbcUrl,
)
for {
extraIndexer <- newIndexer(participantConfig)
extraLedgerApiServer <- newApiServer(participantConfig).start()
} yield (extraIndexer, extraLedgerApiServer)
}

def newIndexer(config: Config) =
StandaloneIndexerServer(
private def startIndexerServer(config: Config, readService: ReadService): Resource[Unit] =
new StandaloneIndexerServer(
readService,
IndexerConfig(config.participantId, config.jdbcUrl, config.startupMode),
NamedLoggerFactory.forParticipant(config.participantId),
SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"),
)

def newApiServer(config: Config) =
).acquire()

private def startApiServer(
config: Config,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
): Resource[Unit] =
new StandaloneApiServer(
ApiServerConfig(
config.participantId,
Expand All @@ -99,42 +121,5 @@ object ReferenceServer extends App {
authService,
NamedLoggerFactory.forParticipant(config.participantId),
SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"),
)

val closed = new AtomicBoolean(false)

def closeServer(): Unit = {
if (closed.compareAndSet(false, true)) {
participantF.foreach {
case (indexer, apiServer) =>
indexer.close()
apiServer.close()
}

for (extraParticipantF <- extraParticipants) {
extraParticipantF.foreach {
case (indexer, apiServer) =>
indexer.close()
apiServer.close()
}
}
ledger.close()
materializer.shutdown()
val _ = system.terminate()
}
}

private def startupFailed(e: Throwable): Unit = {
logger.error("Shutting down because of an initialization error.", e)
closeServer()
}

participantF.failed.foreach(startupFailed)

try {
Runtime.getRuntime.addShutdownHook(new Thread(() => closeServer()))
} catch {
case NonFatal(e) =>
startupFailed(e)
}
).acquire()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object Cli {
.text("Server port. If not set, a random port is allocated.")
opt[File]("port-file")
.optional()
.action((f, c) => c.copy(portFile = Some(f)))
.action((f, c) => c.copy(portFile = Some(f.toPath)))
.text("File to write the allocated port number to. Used to inform clients in CI about the allocated port.")
opt[String]("pem")
.optional()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.digitalasset.platform.apiserver

import java.io.File
import java.nio.file.Path

import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.api.util.TimeProvider
Expand All @@ -17,5 +18,5 @@ case class ApiServerConfig(
tlsConfig: Option[TlsConfiguration],
timeProvider: TimeProvider, // enables use of non-wall-clock time in tests
maxInboundMessageSize: Int,
portFile: Option[File],
portFile: Option[Path],
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,26 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService}
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.domain.{ParticipantId => _, _}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.dec.{DirectExecutionContext => DEC}
import com.digitalasset.platform.resources.Resource
import com.digitalasset.platform.sandbox.stores.LedgerBackedIndexService
import com.digitalasset.platform.sandbox.stores.ledger.{
Ledger,
MeteredReadOnlyLedger,
SandboxContractStore
}

import scala.concurrent.Future

object JdbcIndex {
def apply(
readService: ReadService,
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String,
loggerFactory: NamedLoggerFactory,
metrics: MetricRegistry)(
implicit mat: Materializer): Future[IndexService with AutoCloseable] =
metrics: MetricRegistry,
)(implicit mat: Materializer): Resource[IndexService] =
Ledger
.jdbcBackedReadOnly(jdbcUrl, ledgerId, loggerFactory, metrics)
.map { ledger =>
Expand All @@ -46,5 +45,6 @@ object JdbcIndex {
v2.LedgerConfiguration(cond.config.timeModel.minTtl, cond.config.timeModel.maxTtl)
}
}
}(DEC)
}(DirectExecutionContext)
.vary
}
Loading

0 comments on commit 1794d8a

Please sign in to comment.