Skip to content

Commit

Permalink
ledger-on-memory: Split the Writer from the Reader. (digital-asset#8732)
Browse files Browse the repository at this point in the history
There's no real reason to connect the two.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Feb 3, 2021
1 parent d7d9543 commit b413d7e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@

package com.daml.ledger.on.memory

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.v1.{LedgerId, Offset, ParticipantId, SubmissionResult}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
Expand All @@ -31,48 +27,16 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success

final class InMemoryLedgerReaderWriter private[memory] (
override val ledgerId: LedgerId,
override val participantId: ParticipantId,
dispatcher: Dispatcher[Index],
state: InMemoryState,
validateAndCommit: ValidateAndCommit,
metrics: Metrics,
) extends LedgerReader
with LedgerWriter {
override def commit(
correlationId: String,
envelope: Raw.Value,
metadata: CommitMetadata,
): Future[SubmissionResult] =
validateAndCommit(correlationId, envelope, participantId)
.andThen { case Success(SubmissionResult.Acknowledged) =>
dispatcher.signalNewHead(state.newHeadSinceLastWrite())
}(DirectExecutionContext)

override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
reader.events(startExclusive)

override def currentHealth(): HealthStatus = Healthy

private val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
}
import scala.concurrent.ExecutionContext

object InMemoryLedgerReaderWriter {
val DefaultTimeProvider: TimeProvider = TimeProvider.UTC

type StateValueCache = Cache[DamlStateKey, DamlStateValue]

final class BatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
Expand All @@ -91,31 +55,30 @@ object InMemoryLedgerReaderWriter {
stateValueCache,
ledgerDataExporter,
)
readerWriter = new InMemoryLedgerReaderWriter(
ledgerId,
reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
writer = new InMemoryLedgerWriter(
participantId,
dispatcher,
state,
committer,
metrics,
)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
ledgerWriter <- newLoggingContext { implicit loggingContext =>
batchingWriter <- newLoggingContext { implicit loggingContext =>
ResourceOwner
.forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, readerWriter))
.forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer))
.acquire()
}
} yield createKeyValueLedger(readerWriter, ledgerWriter)
} yield createKeyValueLedger(reader, batchingWriter)
}

final class SingleParticipantBatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none,
metrics: Metrics,
engine: Engine,
)(implicit materializer: Materializer)
Expand Down Expand Up @@ -145,17 +108,15 @@ object InMemoryLedgerReaderWriter {
participantId: ParticipantId,
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val keyValueCommitting =
createKeyValueCommitting(metrics, timeProvider, engine)

val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
val committer = createPreExecutingCommitter(
keyValueCommitting,
keySerializationStrategy,
Expand All @@ -164,17 +125,14 @@ object InMemoryLedgerReaderWriter {
timeProvider,
stateValueCache,
)

val readerWriter = new InMemoryLedgerReaderWriter(
ledgerId,
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
val writer = new InMemoryLedgerWriter(
participantId,
dispatcher,
state,
committer,
metrics,
)

Resource.successful(createKeyValueLedger(readerWriter, readerWriter))
Resource.successful(createKeyValueLedger(reader, writer))
}
}

Expand All @@ -184,7 +142,7 @@ object InMemoryLedgerReaderWriter {
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: StateValueCache,
stateValueCache: InMemoryLedgerWriter.StateValueCache,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
Expand Down Expand Up @@ -228,7 +186,7 @@ object InMemoryLedgerReaderWriter {
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: StateValueCache,
stateValueCache: InMemoryLedgerWriter.StateValueCache,
)(implicit materializer: Materializer): ValidateAndCommit = {
val committer = new PreExecutingValidatingCommitter[
Option[DamlStateValue],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.on.memory

import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.validator.ValidateAndCommit
import com.daml.platform.akkastreams.dispatcher.Dispatcher

import scala.concurrent.Future
import scala.util.Success

final class InMemoryLedgerWriter private[memory] (
override val participantId: ParticipantId,
dispatcher: Dispatcher[Index],
state: InMemoryState,
validateAndCommit: ValidateAndCommit,
) extends LedgerWriter {
override def commit(
correlationId: String,
envelope: Raw.Value,
metadata: CommitMetadata,
): Future[SubmissionResult] =
validateAndCommit(correlationId, envelope, participantId)
.andThen { case Success(SubmissionResult.Acknowledged) =>
dispatcher.signalNewHead(state.newHeadSinceLastWrite())
}(DirectExecutionContext)

override def currentHealth(): HealthStatus = Healthy
}

object InMemoryLedgerWriter {

private[memory] val DefaultTimeProvider: TimeProvider = TimeProvider.UTC

private[memory] type StateValueCache = Cache[DamlStateKey, DamlStateValue]

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

package com.daml.ledger.on.memory

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.validator.ValidateAndCommit
import com.daml.lf.data.Ref
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.google.protobuf.ByteString
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
Expand All @@ -19,7 +17,7 @@ import org.scalatest.wordspec.AsyncWordSpec

import scala.concurrent.Future

class InMemoryLedgerReaderWriterSpec
class InMemoryLedgerWriterSpec
extends AsyncWordSpec
with AkkaBeforeAndAfterAll
with Matchers
Expand All @@ -33,13 +31,11 @@ class InMemoryLedgerReaderWriterSpec
.thenReturn(
Future.successful(SubmissionResult.InternalError("Validation failed with an exception"))
)
val instance = new InMemoryLedgerReaderWriter(
"ledger ID",
val instance = new InMemoryLedgerWriter(
Ref.ParticipantId.assertFromString("participant ID"),
mockDispatcher,
InMemoryState.empty,
mockCommitter,
new Metrics(new MetricRegistry),
)

instance
Expand Down

0 comments on commit b413d7e

Please sign in to comment.