-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extract the setting of the time bounds on the commit context [KVL-117…
…3] (#11516) CHANGELOG_BEGIN CHANGELOG_END
- Loading branch information
Showing
7 changed files
with
279 additions
and
162 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
...om/daml/ledger/participant/state/kvutils/committer/transaction/TimeBoundBindingStep.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.ledger.participant.state.kvutils.committer.transaction | ||
|
||
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} | ||
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp} | ||
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration | ||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult} | ||
import com.daml.lf.data.Time.Timestamp | ||
import com.daml.logging.LoggingContext | ||
|
||
object TimeBoundBindingStep { | ||
|
||
def setTimeBoundsInContextStep(defaultConfig: Configuration): Step = new Step { | ||
override def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)( | ||
implicit loggingContext: LoggingContext | ||
): StepResult[DamlTransactionEntrySummary] = { | ||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext) | ||
val timeModel = config.timeModel | ||
|
||
if (commitContext.preExecute) { | ||
val maybeDeduplicateUntil = | ||
getLedgerDeduplicateUntil(transactionEntry, commitContext) | ||
val minimumRecordTime = transactionMinRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
maybeDeduplicateUntil, | ||
timeModel, | ||
) | ||
val maximumRecordTime = transactionMaxRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
timeModel, | ||
) | ||
commitContext.deduplicateUntil = maybeDeduplicateUntil | ||
commitContext.minimumRecordTime = Some(minimumRecordTime) | ||
commitContext.maximumRecordTime = Some(maximumRecordTime) | ||
} | ||
StepContinue(transactionEntry) | ||
} | ||
} | ||
|
||
@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable")) | ||
private def transactionMinRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
maybeDeduplicateUntil: Option[Timestamp], | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List( | ||
maybeDeduplicateUntil | ||
.map( | ||
_.add(Timestamp.Resolution) | ||
), // DeduplicateUntil defines a rejection window, endpoints inclusive | ||
Some(timeModel.minRecordTime(ledgerTime)), | ||
Some(timeModel.minRecordTime(submissionTime)), | ||
).flatten.max | ||
|
||
private def transactionMaxRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min | ||
|
||
private def getLedgerDeduplicateUntil( | ||
transactionEntry: DamlTransactionEntrySummary, | ||
commitContext: CommitContext, | ||
): Option[Timestamp] = | ||
for { | ||
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo)) | ||
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) { | ||
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil | ||
} | ||
} yield parseTimestamp(dedupTimestamp) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
141 changes: 141 additions & 0 deletions
141
...ml/ledger/participant/state/kvutils/committer/transaction/TimeBoundsBindingStepSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.ledger.participant.state.kvutils.committer.transaction | ||
|
||
import com.daml.ledger.configuration.Configuration | ||
import com.daml.ledger.participant.state.kvutils.Conversions.configurationStateKey | ||
import com.daml.ledger.participant.state.kvutils.TestHelpers.{ | ||
createCommitContext, | ||
createEmptyTransactionEntry, | ||
theDefaultConfig, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop} | ||
import com.daml.ledger.participant.state.kvutils.store.events.{ | ||
DamlConfigurationEntry, | ||
DamlTransactionEntry, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.store.{ | ||
DamlCommandDedupValue, | ||
DamlStateKey, | ||
DamlStateValue, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.{Conversions, TestHelpers} | ||
import com.daml.lf.data.Time | ||
import com.daml.logging.LoggingContext | ||
import com.google.protobuf.timestamp | ||
import com.google.protobuf.timestamp.Timestamp.toJavaProto | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.wordspec.AnyWordSpec | ||
|
||
import scala.concurrent.duration._ | ||
|
||
class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers { | ||
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting | ||
|
||
private val step = TimeBoundBindingStep.setTimeBoundsInContextStep(TestHelpers.theDefaultConfig) | ||
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter")) | ||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry) | ||
private val aSubmissionTime = createProtobufTimestamp(seconds = 1) | ||
private val aLedgerEffectiveTime = createProtobufTimestamp(seconds = 2) | ||
private val aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes: DamlTransactionEntry = | ||
aDamlTransactionEntry.toBuilder | ||
.setSubmissionTime(toJavaProto(aSubmissionTime)) | ||
.setLedgerEffectiveTime(toJavaProto(aLedgerEffectiveTime)) | ||
.build() | ||
private val aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes = | ||
DamlTransactionEntrySummary(aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes) | ||
|
||
private val aDedupKey: DamlStateKey = Conversions | ||
.commandDedupKey(aDamlTransactionEntry.getSubmitterInfo) | ||
private val aDeduplicateUntil = createProtobufTimestamp(seconds = 3) | ||
|
||
private val aDedupValue = DamlStateValue.newBuilder | ||
.setCommandDedup( | ||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(toJavaProto(aDeduplicateUntil)) | ||
) | ||
.build() | ||
|
||
private val emptyConfigurationStateValue: DamlStateValue = | ||
defaultConfigurationStateValueBuilder().build | ||
|
||
private val inputWithTimeModelAndCommandDeduplication: Map[DamlStateKey, Some[DamlStateValue]] = | ||
Map( | ||
Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), | ||
aDedupKey -> Some(aDedupValue), | ||
) | ||
private val inputWithTimeModelAndEmptyCommandDeduplication = | ||
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), aDedupKey -> None) | ||
|
||
"time bounds binding step" should { | ||
"continue" in { | ||
val result = step( | ||
contextWithTimeModelAndEmptyCommandDeduplication(), | ||
aTransactionEntrySummary, | ||
) | ||
|
||
result match { | ||
case StepContinue(_) => succeed | ||
case StepStop(_) => fail() | ||
} | ||
} | ||
|
||
"compute and correctly set the min/max ledger time without deduplicateUntil" in { | ||
val context = contextWithTimeModelAndEmptyCommandDeduplication() | ||
|
||
step.apply( | ||
context, | ||
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes, | ||
) | ||
|
||
context.minimumRecordTime shouldEqual Some( | ||
protoTimestampToLf(aLedgerEffectiveTime) | ||
.subtract(theDefaultConfig.timeModel.minSkew) | ||
) | ||
context.maximumRecordTime shouldEqual Some( | ||
protoTimestampToLf(aSubmissionTime) | ||
.add(theDefaultConfig.timeModel.maxSkew) | ||
) | ||
context.deduplicateUntil shouldBe empty | ||
} | ||
|
||
"set deduplicate until when available" in { | ||
val context = contextWithTimeModelAndCommandDeduplication() | ||
step.apply( | ||
context, | ||
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes, | ||
) | ||
context.deduplicateUntil shouldEqual Some( | ||
protoTimestampToLf(aDeduplicateUntil) | ||
) | ||
} | ||
|
||
"mark config and dedup key as accessed in context" in { | ||
val commitContext = | ||
createCommitContext(recordTime = None, inputWithTimeModelAndCommandDeduplication) | ||
|
||
step.apply(commitContext, aTransactionEntrySummary) | ||
|
||
commitContext.getAccessedInputKeys should contain allOf (configurationStateKey, aDedupKey) | ||
} | ||
} | ||
|
||
private def contextWithTimeModelAndEmptyCommandDeduplication() = | ||
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndEmptyCommandDeduplication) | ||
|
||
private def contextWithTimeModelAndCommandDeduplication() = | ||
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndCommandDeduplication) | ||
|
||
private def createProtobufTimestamp(seconds: Long): timestamp.Timestamp = { | ||
timestamp.Timestamp(seconds) | ||
} | ||
private def defaultConfigurationStateValueBuilder(): DamlStateValue.Builder = | ||
DamlStateValue.newBuilder | ||
.setConfigurationEntry( | ||
DamlConfigurationEntry.newBuilder | ||
.setConfiguration(Configuration.encode(theDefaultConfig)) | ||
) | ||
|
||
private def protoTimestampToLf(ts: timestamp.Timestamp) = | ||
Time.Timestamp.assertFromLong(ts.seconds.seconds.toMicros) | ||
} |
Oops, something went wrong.