Skip to content

Commit

Permalink
participant-integration-api: Rename and restructure the configuration…
Browse files Browse the repository at this point in the history
… initialization classes. [KVL-1046] (#10496)

* Rename configuration initialization classes.

CHANGELOG_BEGIN
CHANGELOG_END

* Add call parentheses to `latestConfiguration()`, because it's impure.

* Split construction of configuration classes into multiple steps.

* participant-integration-api: Rename a class. Again.
  • Loading branch information
SamirTalwar authored Aug 6, 2021
1 parent 35641b7 commit a7fa7d3
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import com.daml.lf.data.Ref
import com.daml.lf.engine._
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.configuration.{CurrentLedgerConfiguration, LedgerConfigProvider}
import com.daml.platform.apiserver.configuration.{
LedgerConfigurationInitializer,
LedgerConfigurationSubscription,
}
import com.daml.platform.apiserver.execution.{
LedgerTimeAwareCommandExecutor,
StoreBackedCommandExecutor,
Expand Down Expand Up @@ -105,20 +108,22 @@ private[daml] object ApiServices {
private val configManagementService: IndexConfigManagementService = indexService
private val submissionService: IndexSubmissionService = indexService

private val configurationInitializer = new LedgerConfigurationInitializer(
indexService = indexService,
optWriteService = optWriteService,
timeProvider = timeProvider,
materializer = materializer,
servicesExecutionContext = servicesExecutionContext,
)

override def acquire()(implicit context: ResourceContext): Resource[ApiServices] = {
logger.info(engine.info.toString)
for {
ledgerId <- Resource.fromFuture(indexService.getLedgerId())
currentLedgerConfiguration <- LedgerConfigProvider
.owner(
initialLedgerConfiguration = initialLedgerConfiguration,
configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos),
indexService = indexService,
optWriteService = optWriteService,
timeProvider = timeProvider,
servicesExecutionContext = servicesExecutionContext,
)
.acquire()
currentLedgerConfiguration <- configurationInitializer.initialize(
initialLedgerConfiguration = initialLedgerConfiguration,
configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos),
)
services <- Resource(
Future(createServices(ledgerId, currentLedgerConfiguration)(servicesExecutionContext))
)(services =>
Expand All @@ -134,7 +139,7 @@ private[daml] object ApiServices {

private def createServices(
ledgerId: LedgerId,
currentLedgerConfiguration: CurrentLedgerConfiguration,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
)(implicit executionContext: ExecutionContext): List[BindableService] = {
val apiTransactionService =
ApiTransactionService.create(ledgerId, transactionsService, metrics)
Expand Down Expand Up @@ -164,7 +169,7 @@ private[daml] object ApiServices {
val writeServiceBackedApiServices =
intitializeWriteServiceBackedApiServices(
ledgerId,
currentLedgerConfiguration,
ledgerConfigurationSubscription,
apiCompletionService,
apiTransactionService,
)
Expand All @@ -190,7 +195,7 @@ private[daml] object ApiServices {

private def intitializeWriteServiceBackedApiServices(
ledgerId: LedgerId,
currentLedgerConfiguration: CurrentLedgerConfiguration,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
apiCompletionService: GrpcCommandCompletionService,
apiTransactionService: GrpcTransactionService,
)(implicit executionContext: ExecutionContext): List[BindableService] = {
Expand Down Expand Up @@ -218,7 +223,7 @@ private[daml] object ApiServices {
partyManagementService,
timeProvider,
timeProviderType,
currentLedgerConfiguration,
ledgerConfigurationSubscription,
seedService,
commandExecutor,
ApiSubmissionService.Configuration(
Expand Down Expand Up @@ -247,7 +252,7 @@ private[daml] object ApiServices {
apiTransactionService.getFlatTransactionById,
),
timeProvider,
currentLedgerConfiguration,
ledgerConfigurationSubscription,
metrics,
)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,55 @@ import com.daml.api.util.TimeProvider
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.LoggingContext
import com.daml.platform.configuration.InitialLedgerConfiguration

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

object LedgerConfigProvider {
def owner(
final class LedgerConfigurationInitializer(
indexService: IndexConfigManagementService,
optWriteService: Option[state.WriteConfigService],
timeProvider: TimeProvider,
materializer: Materializer,
servicesExecutionContext: ExecutionContext,
) {
private val scheduler = materializer.system.scheduler
private val subscriptionBuilder = new LedgerConfigurationSubscriptionFromIndex(
indexService,
scheduler,
materializer,
servicesExecutionContext,
)

def initialize(
initialLedgerConfiguration: InitialLedgerConfiguration,
configurationLoadTimeout: Duration,
indexService: IndexConfigManagementService,
optWriteService: Option[state.WriteConfigService],
timeProvider: TimeProvider,
servicesExecutionContext: ExecutionContext,
)(implicit
materializer: Materializer,
resourceContext: ResourceContext,
loggingContext: LoggingContext,
): ResourceOwner[CurrentLedgerConfiguration] = {
val scheduler = materializer.system.scheduler
for {
): Resource[LedgerConfigurationSubscription] = {
val owner = for {
// First, we acquire the mechanism for looking up the current ledger configuration.
currentLedgerConfiguration <- IndexStreamingCurrentLedgerConfiguration.owner(
configurationLoadTimeout,
indexService,
scheduler,
materializer,
servicesExecutionContext,
)
ledgerConfigurationSubscription <- subscriptionBuilder.subscription(configurationLoadTimeout)
// Next, we provision the configuration if one does not already exist on the ledger.
_ <- optWriteService match {
case None => ResourceOwner.unit
case Some(writeService) =>
val submissionIdGenerator = SubmissionIdGenerator.Random
LedgerConfigProvisioner.owner(
initialLedgerConfiguration,
currentLedgerConfiguration,
new LedgerConfigurationProvisioner(
ledgerConfigurationSubscription,
writeService,
timeProvider,
submissionIdGenerator,
scheduler,
servicesExecutionContext,
)
).submit(initialLedgerConfiguration)(servicesExecutionContext, loggingContext)
}
// Finally, we wait until either an existing configuration or the provisioned configuration
// appears on the index.
_ <- ResourceOwner.forFuture(() => currentLedgerConfiguration.ready)
} yield currentLedgerConfiguration
_ <- ResourceOwner.forFuture(() => ledgerConfigurationSubscription.ready)
} yield ledgerConfigurationSubscription
owner.acquire()
}
}
Loading

0 comments on commit a7fa7d3

Please sign in to comment.