diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala index 730e9b96a3db..45e121d7f555 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala @@ -6,30 +6,16 @@ package com.daml.ledger.on.memory import akka.stream.Materializer import com.daml.api.util.TimeProvider import com.daml.caching.Cache -import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.kvutils.api._ -import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter -import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw} import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} -import com.daml.ledger.validator.batch.{ - BatchedSubmissionValidator, - BatchedSubmissionValidatorFactory, - BatchedValidatingCommitter, - ConflictDetection, -} -import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy} -import com.daml.ledger.validator.preexecution._ -import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} -import com.daml.ledger.validator.{StateKeySerializationStrategy, ValidateAndCommit} +import com.daml.ledger.validator.StateKeySerializationStrategy import com.daml.lf.engine.Engine -import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher -import scala.concurrent.ExecutionContext - object InMemoryLedgerReaderWriter { + final class BatchingOwner( ledgerId: LedgerId, batchingLedgerWriterConfig: BatchingLedgerWriterConfig, @@ -42,35 +28,21 @@ object InMemoryLedgerReaderWriter { engine: Engine, )(implicit materializer: Materializer) extends ResourceOwner[KeyValueLedger] { - override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = + override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { + val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) for { - ledgerDataExporter <- LedgerDataExporter.Owner.acquire() - keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) - committer = createBatchedCommitter( - keyValueCommitting, + writer <- new InMemoryLedgerWriter.BatchingOwner( batchingLedgerWriterConfig, - state, + participantId, metrics, timeProvider, stateValueCache, - ledgerDataExporter, - ) - reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) - writer = new InMemoryLedgerWriter( - participantId, dispatcher, state, - committer, - ) - // We need to generate batched submissions for the validator in order to improve throughput. - // Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the - // in-memory committer. - batchingWriter <- newLoggingContext { implicit loggingContext => - ResourceOwner - .forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer)) - .acquire() - } - } yield createKeyValueLedger(reader, batchingWriter) + engine, + ).acquire() + } yield createKeyValueLedger(reader, writer) + } } final class SingleParticipantBatchingOwner( @@ -83,7 +55,6 @@ object InMemoryLedgerReaderWriter { engine: Engine, )(implicit materializer: Materializer) extends ResourceOwner[KeyValueLedger] { - override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { val state = InMemoryState.empty for { @@ -116,139 +87,20 @@ object InMemoryLedgerReaderWriter { )(implicit materializer: Materializer) extends ResourceOwner[KeyValueLedger] { override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { - val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) - val committer = createPreExecutingCommitter( - keyValueCommitting, - keySerializationStrategy, - state, - metrics, - timeProvider, - stateValueCache, - ) val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) - val writer = new InMemoryLedgerWriter( - participantId, - dispatcher, - state, - committer, - ) - Resource.successful(createKeyValueLedger(reader, writer)) - } - } - - private def createBatchedCommitter( - keyValueCommitting: KeyValueCommitting, - batchingLedgerWriterConfig: BatchingLedgerWriterConfig, - state: InMemoryState, - metrics: Metrics, - timeProvider: TimeProvider, - stateValueCache: InMemoryLedgerWriter.StateValueCache, - ledgerDataExporter: LedgerDataExporter, - )(implicit materializer: Materializer): ValidateAndCommit = { - val validator = BatchedSubmissionValidator[Index]( - BatchedSubmissionValidatorFactory.defaultParametersFor( - batchingLedgerWriterConfig.enableBatching - ), - keyValueCommitting, - new ConflictDetection(metrics), - metrics, - ledgerDataExporter, - ) - val committer = BatchedValidatingCommitter[Index]( - () => timeProvider.getCurrentTime, - validator, - stateValueCache, - ) - locally { - implicit val executionContext: ExecutionContext = materializer.executionContext - - def validateAndCommit( - correlationId: String, - submissionEnvelope: Raw.Value, - submittingParticipantId: ParticipantId, - ) = - new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations => - committer.commit( - correlationId, - submissionEnvelope, - submittingParticipantId, - ledgerStateOperations, - ) - } - - validateAndCommit - } - } - - private def createPreExecutingCommitter( - keyValueCommitting: KeyValueCommitting, - keySerializationStrategy: StateKeySerializationStrategy, - state: InMemoryState, - metrics: Metrics, - timeProvider: TimeProvider, - stateValueCache: InMemoryLedgerWriter.StateValueCache, - )(implicit materializer: Materializer): ValidateAndCommit = { - val committer = new PreExecutingValidatingCommitter[ - Option[DamlStateValue], - RawPreExecutingCommitStrategy.ReadSet, - RawKeyValuePairsWithLogEntry, - ]( - transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache), - validator = new PreExecutingSubmissionValidator( - keyValueCommitting, - new RawPreExecutingCommitStrategy(keySerializationStrategy), - metrics, - ), - postExecutionConflictDetector = new EqualityBasedPostExecutionConflictDetector(), - postExecutionFinalizer = new RawPostExecutionFinalizer( - now = () => timeProvider.getCurrentTime - ), - ) - locally { - implicit val executionContext: ExecutionContext = materializer.executionContext - - def validateAndCommit( - correlationId: String, - submissionEnvelope: Raw.Value, - submittingParticipantId: ParticipantId, - ) = - committer.commit( - correlationId, - submissionEnvelope, - submittingParticipantId, - new InMemoryLedgerStateAccess(state, metrics), - ) - - validateAndCommit + for { + writer <- new InMemoryLedgerWriter.PreExecutingOwner( + participantId, + keySerializationStrategy, + metrics, + timeProvider, + stateValueCache, + dispatcher, + state, + engine, + ).acquire() + } yield createKeyValueLedger(reader, writer) } } - private def transformStateReader( - keySerializationStrategy: StateKeySerializationStrategy, - cache: Cache[DamlStateKey, DamlStateValue], - )(stateReader: LedgerStateReader): DamlLedgerStateReader = { - CachingStateReader( - cache, - ImmutablesOnlyCacheUpdatePolicy, - stateReader - .contramapKeys(keySerializationStrategy.serializeStateKey) - .mapValues(value => - value.map( - Envelope - .openStateValue(_) - .getOrElse(sys.error("Opening enveloped DamlStateValue failed")) - ) - ), - ) - } - - private def createKeyValueCommitting( - metrics: Metrics, - timeProvider: TimeProvider, - engine: Engine, - ): KeyValueCommitting = - new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider)) - - private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean = - timeProvider != TimeProvider.UTC } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala index 9a4b4a30da78..e3e7ada3eeae 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala @@ -3,18 +3,45 @@ package com.daml.ledger.on.memory +import akka.stream.Materializer import com.daml.api.util.TimeProvider import com.daml.caching.Cache import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} -import com.daml.ledger.participant.state.kvutils.Raw -import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter} +import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter +import com.daml.ledger.participant.state.kvutils.api.{ + BatchingLedgerWriter, + BatchingLedgerWriterConfig, + CommitMetadata, + LedgerWriter, +} +import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw} import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} -import com.daml.ledger.validator.ValidateAndCommit +import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +import com.daml.ledger.validator.batch.{ + BatchedSubmissionValidator, + BatchedSubmissionValidatorFactory, + BatchedValidatingCommitter, + ConflictDetection, +} +import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy} +import com.daml.ledger.validator.preexecution.{ + EqualityBasedPostExecutionConflictDetector, + PreExecutingSubmissionValidator, + PreExecutingValidatingCommitter, + RawKeyValuePairsWithLogEntry, + RawPostExecutionFinalizer, + RawPreExecutingCommitStrategy, +} +import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} +import com.daml.ledger.validator.{StateKeySerializationStrategy, ValidateAndCommit} +import com.daml.lf.engine.Engine +import com.daml.logging.LoggingContext.newLoggingContext +import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.Success final class InMemoryLedgerWriter private[memory] ( @@ -42,4 +69,157 @@ object InMemoryLedgerWriter { private[memory] type StateValueCache = Cache[DamlStateKey, DamlStateValue] + final class BatchingOwner( + batchingLedgerWriterConfig: BatchingLedgerWriterConfig, + participantId: ParticipantId, + metrics: Metrics, + timeProvider: TimeProvider = DefaultTimeProvider, + stateValueCache: StateValueCache = Cache.none, + dispatcher: Dispatcher[Index], + state: InMemoryState, + engine: Engine, + )(implicit materializer: Materializer) + extends ResourceOwner[LedgerWriter] { + override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = + for { + ledgerDataExporter <- LedgerDataExporter.Owner.acquire() + keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) + committer = createBatchedCommitter(keyValueCommitting, ledgerDataExporter) + writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer) + // We need to generate batched submissions for the validator in order to improve throughput. + // Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the + // in-memory committer. + batchingWriter <- newLoggingContext { implicit loggingContext => + ResourceOwner + .forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer)) + .acquire() + } + } yield batchingWriter + + private def createBatchedCommitter( + keyValueCommitting: KeyValueCommitting, + ledgerDataExporter: LedgerDataExporter, + )(implicit materializer: Materializer): ValidateAndCommit = { + val validator = BatchedSubmissionValidator[Index]( + BatchedSubmissionValidatorFactory.defaultParametersFor( + batchingLedgerWriterConfig.enableBatching + ), + keyValueCommitting, + new ConflictDetection(metrics), + metrics, + ledgerDataExporter, + ) + val committer = BatchedValidatingCommitter[Index]( + () => timeProvider.getCurrentTime, + validator, + stateValueCache, + ) + locally { + implicit val executionContext: ExecutionContext = materializer.executionContext + + def validateAndCommit( + correlationId: String, + submissionEnvelope: Raw.Value, + submittingParticipantId: ParticipantId, + ) = + new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations => + committer.commit( + correlationId, + submissionEnvelope, + submittingParticipantId, + ledgerStateOperations, + ) + } + + validateAndCommit + } + } + } + + final class PreExecutingOwner( + participantId: ParticipantId, + keySerializationStrategy: StateKeySerializationStrategy, + metrics: Metrics, + timeProvider: TimeProvider = DefaultTimeProvider, + stateValueCache: StateValueCache = Cache.none, + dispatcher: Dispatcher[Index], + state: InMemoryState, + engine: Engine, + )(implicit materializer: Materializer) + extends ResourceOwner[LedgerWriter] { + override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = { + val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) + val committer = createPreExecutingCommitter(keyValueCommitting) + val writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer) + Resource.successful(writer) + } + + private def createPreExecutingCommitter( + keyValueCommitting: KeyValueCommitting + )(implicit materializer: Materializer): ValidateAndCommit = { + val committer = new PreExecutingValidatingCommitter[ + Option[DamlStateValue], + RawPreExecutingCommitStrategy.ReadSet, + RawKeyValuePairsWithLogEntry, + ]( + transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache), + validator = new PreExecutingSubmissionValidator( + keyValueCommitting, + new RawPreExecutingCommitStrategy(keySerializationStrategy), + metrics, + ), + postExecutionConflictDetector = new EqualityBasedPostExecutionConflictDetector(), + postExecutionFinalizer = new RawPostExecutionFinalizer( + now = () => timeProvider.getCurrentTime + ), + ) + locally { + implicit val executionContext: ExecutionContext = materializer.executionContext + + def validateAndCommit( + correlationId: String, + submissionEnvelope: Raw.Value, + submittingParticipantId: ParticipantId, + ) = + committer.commit( + correlationId, + submissionEnvelope, + submittingParticipantId, + new InMemoryLedgerStateAccess(state, metrics), + ) + + validateAndCommit + } + } + } + + private def transformStateReader( + keySerializationStrategy: StateKeySerializationStrategy, + cache: Cache[DamlStateKey, DamlStateValue], + )(stateReader: LedgerStateReader): DamlLedgerStateReader = { + CachingStateReader( + cache, + ImmutablesOnlyCacheUpdatePolicy, + stateReader + .contramapKeys(keySerializationStrategy.serializeStateKey) + .mapValues(value => + value.map( + Envelope + .openStateValue(_) + .getOrElse(sys.error("Opening enveloped DamlStateValue failed")) + ) + ), + ) + } + + private def createKeyValueCommitting( + metrics: Metrics, + timeProvider: TimeProvider, + engine: Engine, + ): KeyValueCommitting = + new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider)) + + private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean = + timeProvider != TimeProvider.UTC + }