Skip to content

Commit

Permalink
kvutils|ledger: Don't use Scala's global execution context. (digital-…
Browse files Browse the repository at this point in the history
…asset#4374)

* kvutils|ledger: Don't use Scala's global execution context.

I have just discovered that Scala's global execution context uses daemon
threads, which means they don't stick around past the end of the `main`
function. This is bad when application startup fails and we end up with
no other threads; the application terminates before it has a chance to
print the error message, which is very unhelpful.

We resolve this by creating a cached thread pool in the
`ProgramResource` class, specifically used for acquisition and release
of resources.

To do this, we need to pass the execution context through to the
resource constructors, which makes usage a little more verbose, because
now we can't use lambda functions (which can't take implicit arguments
such as the `executionContext`).

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils-app|reference-v2: Sort `Runner` and `ReferenceServer` methods.
  • Loading branch information
SamirTalwar authored Feb 5, 2020
1 parent fcab7d1 commit 1df292c
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

package com.daml.ledger.api.server.damlonx.reference.v2

import java.io.File
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.api.server.damlonx.reference.v2.cli.Cli
import com.daml.ledger.participant.state.v1.{ReadService, SubmissionId, WriteService}
import com.digitalasset.daml.lf.archive.{Dar, DarReader}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
import com.digitalasset.logging.LoggingContext
Expand All @@ -19,7 +20,6 @@ import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer
import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer}
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import java.io.File

import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -30,42 +30,6 @@ object ReferenceServer {
private implicit val materializer: Materializer = Materializer(system)
private implicit val executionContext: ExecutionContext = system.dispatcher

private def readDar(from: File): Future[Dar[Archive]] =
Future(DarReader { case (_, x) => Try(Archive.parseFrom(x)) }.readArchiveFromFile(from).get)

private def uploadDar(from: File, to: InMemoryKVParticipantState): Future[Unit] = {
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- readDar(from)
_ <- to.uploadPackages(submissionId, dar.all, None).toScala
} yield ()
}

private def startParticipant(
config: Config,
ledger: InMemoryKVParticipantState): ResourceOwner[Unit] =
newLoggingContext { implicit logCtx =>
for {
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
}

private def extraParticipantConfig(base: Config): Vector[Config] =
for {
(extraParticipantId, port, jdbcUrl) <- base.extraParticipants
} yield
base.copy(
port = port,
participantId = extraParticipantId,
jdbcUrl = jdbcUrl,
)

def main(args: Array[String]): Unit = {
val config =
Cli
Expand Down Expand Up @@ -94,6 +58,40 @@ object ReferenceServer {
new ProgramResource(owner).run()
}

private def uploadDar(from: File, to: InMemoryKVParticipantState): Future[Unit] = {
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- Future(
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }.readArchiveFromFile(from).get)
_ <- to.uploadPackages(submissionId, dar.all, None).toScala
} yield ()
}

private def extraParticipantConfig(base: Config): Vector[Config] =
for ((extraParticipantId, port, jdbcUrl) <- base.extraParticipants)
yield
base.copy(
port = port,
participantId = extraParticipantId,
jdbcUrl = jdbcUrl,
)

private def startParticipant(
config: Config,
ledger: InMemoryKVParticipantState
): ResourceOwner[Unit] =
newLoggingContext { implicit logCtx =>
for {
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
}

private def startIndexerServer(config: Config, readService: ReadService)(
implicit logCtx: LoggingContext
): ResourceOwner[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,28 @@

package com.daml.ledger.on.memory

import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.LedgerFactory.SimpleLedgerFactory
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.resources.ProgramResource
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.{ProgramResource, ResourceOwner}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

object Main {
def main(args: Array[String]): Unit = {
new ProgramResource(
Runner("In-Memory Ledger", InMemoryLedgerReaderWriter.owner(_, _)).owner(args)).run()
new ProgramResource(new Runner("In-Memory Ledger", InMemoryLedgerFactory).owner(args)).run()
}

object InMemoryLedgerFactory extends SimpleLedgerFactory[InMemoryLedgerReaderWriter] {
override def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
config: Unit
)(
implicit executionContext: ExecutionContext,
materializer: Materializer,
): ResourceOwner[InMemoryLedgerReaderWriter] =
InMemoryLedgerReaderWriter.owner(ledgerId, participantId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

object Main {
def main(args: Array[String]): Unit = {
new ProgramResource(Runner("SQL Ledger", SqlLedgerFactory).owner(args)).run()
new ProgramResource(new Runner("SQL Ledger", SqlLedgerFactory).owner(args)).run()
}

case class ExtraConfig(jdbcUrl: Option[String])
Expand All @@ -38,7 +38,10 @@ object Main {
ledgerId: LedgerId,
participantId: ParticipantId,
config: ExtraConfig,
)(implicit materializer: Materializer): ResourceOwner[SqlLedgerReaderWriter] = {
)(
implicit executionContext: ExecutionContext,
materializer: Materializer,
): ResourceOwner[SqlLedgerReaderWriter] = {
val jdbcUrl = config.jdbcUrl.getOrElse {
throw new IllegalStateException("No JDBC URL provided.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

object MainWithEphemeralDirectory {
private val DirectoryPattern = "%DIR"

def main(args: Array[String]): Unit = {
new ProgramResource(Runner("SQL Ledger", TestLedgerFactory).owner(args)).run()
new ProgramResource(new Runner("SQL Ledger", TestLedgerFactory).owner(args)).run()
}

object TestLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, ExtraConfig] {
Expand All @@ -31,7 +31,10 @@ object MainWithEphemeralDirectory {
ledgerId: LedgerId,
participantId: ParticipantId,
config: ExtraConfig,
)(implicit materializer: Materializer): ResourceOwner[SqlLedgerReaderWriter] = {
)(
implicit executionContext: ExecutionContext,
materializer: Materializer,
): ResourceOwner[SqlLedgerReaderWriter] = {
val directory = Files.createTempDirectory("ledger-on-sql-ephemeral-")
val jdbcUrl = config.jdbcUrl.map(_.replace(DirectoryPattern, directory.toString))
SqlLedgerFactory.owner(ledgerId, participantId, config.copy(jdbcUrl = jdbcUrl))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,30 @@ package com.daml.ledger.on.sql

import akka.stream.Materializer
import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.kvutils.app.LedgerFactory.SimpleLedgerFactory
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import com.digitalasset.testing.postgresql.PostgresAround
import scopt.OptionParser

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

object MainWithEphemeralPostgresql extends PostgresAround {
def main(args: Array[String]): Unit = {
startEphemeralPostgres()
sys.addShutdownHook(stopAndCleanUpPostgres())
new ProgramResource(Runner("SQL Ledger", PostgresqlLedgerFactory).owner(args)).run()
new ProgramResource(new Runner("SQL Ledger", PostgresqlLedgerFactory).owner(args)).run()
}

object PostgresqlLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, Unit] {
override val defaultExtraConfig: Unit = ()

override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = ()

object PostgresqlLedgerFactory extends SimpleLedgerFactory[SqlLedgerReaderWriter] {
override def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
config: Unit,
)(implicit materializer: Materializer): ResourceOwner[SqlLedgerReaderWriter] = {
)(
implicit executionContext: ExecutionContext,
materializer: Materializer,
): ResourceOwner[SqlLedgerReaderWriter] = {
SqlLedgerFactory.owner(
ledgerId,
participantId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import scopt.OptionParser

import scala.concurrent.ExecutionContext

trait LedgerFactory[T <: KeyValueLedger, ExtraConfig] {
val defaultExtraConfig: ExtraConfig

Expand All @@ -17,28 +19,15 @@ trait LedgerFactory[T <: KeyValueLedger, ExtraConfig] {
ledgerId: LedgerId,
participantId: ParticipantId,
config: ExtraConfig,
)(implicit materializer: Materializer): ResourceOwner[T]
)(implicit executionContext: ExecutionContext, materializer: Materializer): ResourceOwner[T]
}

object LedgerFactory {
def apply[T <: KeyValueLedger](
newOwner: (LedgerId, ParticipantId) => ResourceOwner[T],
): LedgerFactory[T, Unit] =
new SimpleLedgerFactory(newOwner)

class SimpleLedgerFactory[T <: KeyValueLedger](
newOwner: (LedgerId, ParticipantId) => ResourceOwner[T]
) extends LedgerFactory[T, Unit] {
override val defaultExtraConfig: Unit = ()
abstract class SimpleLedgerFactory[T <: KeyValueLedger] extends LedgerFactory[T, Unit] {
override final val defaultExtraConfig: Unit = ()

override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit =
override final def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit =
()

override def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
config: Unit,
)(implicit materializer: Materializer): ResourceOwner[T] =
newOwner(ledgerId, participantId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@ import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.app.Runner._
import com.daml.ledger.participant.state.v1.{
LedgerId,
ParticipantId,
ReadService,
SubmissionId,
WriteService
}
import com.daml.ledger.participant.state.v1.{ReadService, SubmissionId, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.archive.{Dar, DarReader}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
Expand All @@ -36,45 +30,11 @@ import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.akka.AkkaResourceOwner

import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, Extra]) {

private def readDar(from: Path)(
implicit executionContext: ExecutionContext): Future[Dar[Archive]] =
Future(
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }.readArchiveFromFile(from.toFile).get)

private def uploadDar(from: Path, to: KeyValueParticipantState)(
implicit executionContext: ExecutionContext): Future[Unit] = {
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- readDar(from)
_ <- to.uploadPackages(submissionId, dar.all, None).toScala
} yield ()
}

private def startParticipant(config: Config[Extra], ledger: KeyValueParticipantState)(
implicit executionContext: ExecutionContext): ResourceOwner[Unit] =
newLoggingContext { implicit logCtx =>
for {
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
}

def owner(args: Seq[String]): ResourceOwner[Unit] = {
val config = Config
.parse(name, factory.extraConfigParser, factory.defaultExtraConfig, args)
.getOrElse(sys.exit(1))

implicit val system: ActorSystem = ActorSystem(
"[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-"))
implicit val materializer: Materializer = Materializer(system)
Expand Down Expand Up @@ -102,6 +62,32 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
} yield ()
}

private def uploadDar(from: Path, to: KeyValueParticipantState)(
implicit executionContext: ExecutionContext
): Future[Unit] = {
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- Future(
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }.readArchiveFromFile(from.toFile).get)
_ <- to.uploadPackages(submissionId, dar.all, None).toScala
} yield ()
}

private def startParticipant(config: Config[Extra], ledger: KeyValueParticipantState)(
implicit executionContext: ExecutionContext
): ResourceOwner[Unit] =
newLoggingContext { implicit logCtx =>
for {
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
}

private def startIndexerServer(
config: Config[Extra],
readService: ReadService,
Expand Down Expand Up @@ -142,17 +128,5 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
}

object Runner {
def apply[T <: KeyValueLedger](
name: String,
newOwner: (LedgerId, ParticipantId) => ResourceOwner[T],
): Runner[T, Unit] =
apply(name, LedgerFactory(newOwner))

def apply[T <: KeyValueLedger, Extra](
name: String,
factory: LedgerFactory[T, Extra],
): Runner[T, Extra] =
new Runner(name, factory)

class ConfigParseException extends SuppressedException
}
Loading

0 comments on commit 1df292c

Please sign in to comment.