Skip to content

Commit

Permalink
participant-integration-api: Always wait for the first config lookup. (
Browse files Browse the repository at this point in the history
…#10500)

* participant-integration-api: Always wait for the first config lookup.

This means we don't have to guess how long a `lookupConfiguration()`
round trip will take. Instead we can just submit the initial
configuration if one is not found.

Some drivers may still need to wait a while for other reasons.

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils: Explain `delayBeforeSubmitting`.

Co-authored-by: Robert Autenrieth <31539813+rautenrieth-da@users.noreply.github.com>

Co-authored-by: Robert Autenrieth <31539813+rautenrieth-da@users.noreply.github.com>
  • Loading branch information
SamirTalwar and rautenrieth-da authored Aug 6, 2021
1 parent 313110c commit 07da936
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import com.daml.ledger.validator.DefaultStateKeySerializationStrategy
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.configuration.InitialLedgerConfiguration
import scopt.OptionParser

private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state: InMemoryState)
Expand Down Expand Up @@ -50,11 +49,6 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
)
}

override def initialLedgerConfig(config: Config[Unit]): InitialLedgerConfiguration =
super
.initialLedgerConfig(config)
.copy(delayBeforeSubmitting = Config.LocalInitialConfigurationSubmissionDelay)

override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = ()

override val defaultExtraConfig: Unit = ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ import com.daml.ledger.participant.state.kvutils.caching._
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.InitialLedgerConfiguration
import scopt.OptionParser

object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
override val defaultExtraConfig: ExtraConfig = ExtraConfig(
jdbcUrl = None
)

override def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration =
super
.initialLedgerConfig(config)
.copy(delayBeforeSubmitting = Config.LocalInitialConfigurationSubmissionDelay)

override def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {
parser
.opt[String]("jdbc-url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
* configuration available to consumers.
*
* This class helps avoiding code duplication and limiting the number of database lookups, as
* multiple services and validators require the latest ledger config.
* multiple services and validators require the latest ledger configuration.
*
* The [[subscription]] method returns a [[ResourceOwner]] which succeeds after the first lookup,
* regardless of whether the index contains a configuration or not. It then starts a background
* stream of configuration updates to keep its internal configuration state in sync with the index.
*
* The [[Subscription.ready]] method returns a [[Future]] that will not resolve until a
* configuration is found or the load timeout expires.
*/
private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
indexService: IndexConfigManagementService,
Expand All @@ -44,142 +51,97 @@ private[apiserver] final class LedgerConfigurationSubscriptionFromIndex(
override def acquire()(implicit
context: ResourceContext
): Resource[LedgerConfigurationSubscription with IsReady] =
Resource(Future {
new Subscription(configurationLoadTimeout)
}(context.executionContext))(subscription =>
Resource(
indexService
.lookupConfiguration()
.map { startingConfiguration =>
new Subscription(startingConfiguration, configurationLoadTimeout)
}(context.executionContext)
)(subscription =>
Future {
subscription.cancel()
()
subscription.stop()
}(context.executionContext)
)
}

private final class Subscription(
configurationLoadTimeout: Duration
startingConfiguration: Option[(LedgerOffset.Absolute, Configuration)],
configurationLoadTimeout: Duration,
)(implicit loggingContext: LoggingContext)
extends LedgerConfigurationSubscription
with IsReady
with Cancellable {
// The latest offset that was read (if any), and the latest ledger configuration found (if any)
private val latestConfigurationState = new AtomicReference[StateType](None -> None)
private val state = new AtomicReference[SubscriptionState](SubscriptionState.LookingUp)
with IsReady {
private val readyPromise = Promise[Unit]()

private val scheduledTimeout = {
scheduler.scheduleOnce(
configurationLoadTimeout.toNanos.nanos,
new Runnable {
override def run(): Unit = {
if (readyPromise.trySuccess(())) {
logger.warn(
s"No ledger configuration found after $configurationLoadTimeout. The ledger API server will now start but all services that depend on the ledger configuration will return UNAVAILABLE until at least one ledger configuration is found."
)
private val (latestConfigurationState, scheduledTimeout) = startingConfiguration match {
case Some((offset, configuration)) =>
logger.info(
s"Initial ledger configuration lookup found configuration $configuration at $offset. Looking for new ledger configurations from this offset."
)
readyPromise.trySuccess(())
val configurationState: StateType = Some(offset) -> Some(configuration)
(new AtomicReference[StateType](configurationState), Cancellable.alreadyCancelled)
case None =>
logger.info(
s"Initial ledger configuration lookup did not find any configuration. Looking for new ledger configurations from the ledger beginning."
)
val configurationState: StateType = None -> None
val scheduledTimeout = scheduler.scheduleOnce(
configurationLoadTimeout.toNanos.nanos,
new Runnable {
override def run(): Unit = {
if (readyPromise.trySuccess(())) {
logger.warn(
s"No ledger configuration found after $configurationLoadTimeout. The ledger API server will now start but all services that depend on the ledger configuration will return UNAVAILABLE until at least one ledger configuration is found."
)
}
()
}
()
}
},
)(servicesExecutionContext)
},
)(servicesExecutionContext)
(new AtomicReference[StateType](configurationState), scheduledTimeout)
}

// Looks up the latest ledger configuration, then subscribes to a stream of configuration changes.
indexService
.lookupConfiguration()
.map {
case Some(result) =>
logger.info(
s"Initial ledger configuration lookup found configuration ${result._2} at ${result._1}. Looking for new ledger configurations from this offset."
)
configFound(result._1, result._2)
case None =>
logger.info(
s"Initial ledger configuration lookup did not find any configuration. Looking for new ledger configurations from the ledger beginning."
)
latestConfigurationState.set(None -> None)
}(servicesExecutionContext)
.map(_ => startStreamingUpdates())(servicesExecutionContext)
.failed
.foreach { exception =>
readyPromise.tryFailure(exception)
logger.error("Could not load the ledger configuration.", exception)
}(servicesExecutionContext)

private def startStreamingUpdates(): Unit = {
val killSwitch = RestartSource
.withBackoff(
RestartSettings(
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.1,
)
) { () =>
indexService
.configurationEntries(latestConfigurationState.get._1)
.map {
case (offset, domain.ConfigurationEntry.Accepted(_, config)) =>
logger.info(s"New ledger configuration $config found at $offset")
configFound(offset, config)
case (offset, domain.ConfigurationEntry.Rejected(_, _, _)) =>
logger.info(s"New ledger configuration rejection found at $offset")
latestConfigurationState.updateAndGet(previous => Some(offset) -> previous._2)
()
}
}
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.left[UniqueKillSwitch, Future[Done]])
.run()(materializer)
if (
!state.compareAndSet(SubscriptionState.LookingUp, SubscriptionState.Streaming(killSwitch))
) {
killSwitch.shutdown()
private val killSwitch = RestartSource
.withBackoff(
RestartSettings(
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.1,
)
) { () =>
indexService
.configurationEntries(latestConfigurationState.get._1)
.map {
case (offset, domain.ConfigurationEntry.Accepted(_, config)) =>
logger.info(s"New ledger configuration $config found at $offset")
scheduledTimeout.cancel()
latestConfigurationState.set(Some(offset) -> Some(config))
readyPromise.trySuccess(())
()
case (offset, domain.ConfigurationEntry.Rejected(_, _, _)) =>
logger.info(s"New ledger configuration rejection found at $offset")
latestConfigurationState.updateAndGet(previous => Some(offset) -> previous._2)
()
}
}
()
}

private def configFound(
offset: LedgerOffset.Absolute,
config: Configuration,
): Unit = {
scheduledTimeout.cancel()
latestConfigurationState.set(Some(offset) -> Some(config))
readyPromise.trySuccess(())
()
}
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.left[UniqueKillSwitch, Future[Done]])
.run()(materializer)

override def ready: Future[Unit] = readyPromise.future

override def latestConfiguration(): Option[Configuration] = latestConfigurationState.get._2

override def cancel(): Boolean = {
def stop(): Unit = {
scheduledTimeout.cancel()
state.getAndSet(SubscriptionState.Stopped) match {
case SubscriptionState.LookingUp =>
true
case SubscriptionState.Streaming(killSwitch) =>
killSwitch.shutdown()
true
case SubscriptionState.Stopped =>
false
}
killSwitch.shutdown()
}

override def isCancelled: Boolean =
state.get == SubscriptionState.Stopped
}
}

private[apiserver] object LedgerConfigurationSubscriptionFromIndex {
private type StateType = (Option[LedgerOffset.Absolute], Option[Configuration])

private sealed trait SubscriptionState

private object SubscriptionState {
case object LookingUp extends SubscriptionState

final case class Streaming(killSwitch: UniqueKillSwitch) extends SubscriptionState

case object Stopped extends SubscriptionState
}

trait IsReady {
def ready: Future[Unit]
}
Expand Down
Loading

0 comments on commit 07da936

Please sign in to comment.