-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract the setting of the time bounds on the commit context [KVL-1173] #11516
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.ledger.participant.state.kvutils.committer.transaction | ||
|
||
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} | ||
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp} | ||
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration | ||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult} | ||
import com.daml.lf.data.Time.Timestamp | ||
import com.daml.logging.LoggingContext | ||
|
||
object TimeBoundBindingStep { | ||
|
||
def setTimeBoundsInContextStep(defaultConfig: Configuration): Step = new Step { | ||
override def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)( | ||
implicit loggingContext: LoggingContext | ||
): StepResult[DamlTransactionEntrySummary] = { | ||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext) | ||
val timeModel = config.timeModel | ||
|
||
if (commitContext.preExecute) { | ||
val maybeDeduplicateUntil = | ||
getLedgerDeduplicateUntil(transactionEntry, commitContext) | ||
val minimumRecordTime = transactionMinRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
maybeDeduplicateUntil, | ||
timeModel, | ||
) | ||
val maximumRecordTime = transactionMaxRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
timeModel, | ||
) | ||
commitContext.deduplicateUntil = maybeDeduplicateUntil | ||
commitContext.minimumRecordTime = Some(minimumRecordTime) | ||
commitContext.maximumRecordTime = Some(maximumRecordTime) | ||
} | ||
StepContinue(transactionEntry) | ||
} | ||
} | ||
|
||
@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable")) | ||
private def transactionMinRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
maybeDeduplicateUntil: Option[Timestamp], | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List( | ||
maybeDeduplicateUntil | ||
.map( | ||
_.add(Timestamp.Resolution) | ||
), // DeduplicateUntil defines a rejection window, endpoints inclusive | ||
Some(timeModel.minRecordTime(ledgerTime)), | ||
Some(timeModel.minRecordTime(submissionTime)), | ||
).flatten.max | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment that explains the rationale of this logic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not have the initial context around this to guarantee the rationale of it. The code was just moved in this PR |
||
|
||
private def transactionMaxRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min | ||
|
||
private def getLedgerDeduplicateUntil( | ||
transactionEntry: DamlTransactionEntrySummary, | ||
commitContext: CommitContext, | ||
): Option[Timestamp] = | ||
for { | ||
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo)) | ||
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) { | ||
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil | ||
} | ||
} yield parseTimestamp(dedupTimestamp) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,9 @@ | |
|
||
package com.daml.ledger.participant.state.kvutils.committer.transaction.validation | ||
|
||
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} | ||
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp} | ||
import cats.syntax.contravariantSemigroupal._ | ||
import com.daml.ledger.configuration.Configuration | ||
import com.daml.ledger.participant.state.kvutils.Err | ||
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration | ||
import com.daml.ledger.participant.state.kvutils.committer.transaction.{ | ||
DamlTransactionEntrySummary, | ||
|
@@ -14,7 +15,6 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.{ | |
} | ||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult} | ||
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry | ||
import com.daml.lf.data.Time.Timestamp | ||
import com.daml.logging.LoggingContext | ||
|
||
private[transaction] class LedgerTimeValidator(defaultConfig: Configuration) | ||
|
@@ -27,12 +27,12 @@ private[transaction] class LedgerTimeValidator(defaultConfig: Configuration) | |
commitContext: CommitContext, | ||
transactionEntry: DamlTransactionEntrySummary, | ||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = { | ||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext) | ||
val timeModel = config.timeModel | ||
|
||
commitContext.recordTime match { | ||
case Some(recordTime) => | ||
val givenLedgerTime = transactionEntry.ledgerEffectiveTime | ||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext) | ||
val timeModel = config.timeModel | ||
|
||
timeModel | ||
.checkTime(ledgerTime = givenLedgerTime, recordTime = recordTime) | ||
|
@@ -46,68 +46,26 @@ private[transaction] class LedgerTimeValidator(defaultConfig: Configuration) | |
_ => StepContinue(transactionEntry), | ||
) | ||
case None => // Pre-execution: propagate the time bounds and defer the checks to post-execution. | ||
val maybeDeduplicateUntil = | ||
getLedgerDeduplicateUntil(transactionEntry, commitContext) | ||
val minimumRecordTime = transactionMinRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
maybeDeduplicateUntil, | ||
timeModel, | ||
) | ||
val maximumRecordTime = transactionMaxRecordTime( | ||
transactionEntry.submissionTime, | ||
transactionEntry.ledgerEffectiveTime, | ||
timeModel, | ||
) | ||
commitContext.deduplicateUntil = maybeDeduplicateUntil | ||
commitContext.minimumRecordTime = Some(minimumRecordTime) | ||
commitContext.maximumRecordTime = Some(maximumRecordTime) | ||
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder | ||
.setTransactionRejectionEntry( | ||
rejections.preExecutionOutOfTimeBoundsRejectionEntry( | ||
transactionEntry, | ||
minimumRecordTime, | ||
maximumRecordTime, | ||
(commitContext.minimumRecordTime, commitContext.maximumRecordTime) | ||
.mapN((minimumRecordTime, maximumRecordTime) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we introducing cats here? We aren't using this library anywhere in kvutils or under the broader |
||
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder | ||
.setTransactionRejectionEntry( | ||
rejections.preExecutionOutOfTimeBoundsRejectionEntry( | ||
transactionEntry.submitterInfo, | ||
minimumRecordTime, | ||
maximumRecordTime, | ||
) | ||
) | ||
.build | ||
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry) | ||
StepContinue(transactionEntry) | ||
}) | ||
.getOrElse( | ||
throw Err.InternalError( | ||
"Minimum and maximum record times were not set in the committer context" | ||
) | ||
) | ||
.build | ||
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry) | ||
StepContinue(transactionEntry) | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable")) | ||
private def transactionMinRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
maybeDeduplicateUntil: Option[Timestamp], | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List( | ||
maybeDeduplicateUntil | ||
.map( | ||
_.add(Timestamp.Resolution) | ||
), // DeduplicateUntil defines a rejection window, endpoints inclusive | ||
Some(timeModel.minRecordTime(ledgerTime)), | ||
Some(timeModel.minRecordTime(submissionTime)), | ||
).flatten.max | ||
|
||
private def transactionMaxRecordTime( | ||
submissionTime: Timestamp, | ||
ledgerTime: Timestamp, | ||
timeModel: LedgerTimeModel, | ||
): Timestamp = | ||
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min | ||
|
||
private def getLedgerDeduplicateUntil( | ||
transactionEntry: DamlTransactionEntrySummary, | ||
commitContext: CommitContext, | ||
): Option[Timestamp] = | ||
for { | ||
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo)) | ||
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) { | ||
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil | ||
} | ||
} yield parseTimestamp(dedupTimestamp) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.ledger.participant.state.kvutils.committer.transaction | ||
|
||
import com.daml.ledger.configuration.Configuration | ||
import com.daml.ledger.participant.state.kvutils.Conversions.configurationStateKey | ||
import com.daml.ledger.participant.state.kvutils.TestHelpers.{ | ||
createCommitContext, | ||
createEmptyTransactionEntry, | ||
theDefaultConfig, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop} | ||
import com.daml.ledger.participant.state.kvutils.store.events.{ | ||
DamlConfigurationEntry, | ||
DamlTransactionEntry, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.store.{ | ||
DamlCommandDedupValue, | ||
DamlStateKey, | ||
DamlStateValue, | ||
} | ||
import com.daml.ledger.participant.state.kvutils.{Conversions, TestHelpers} | ||
import com.daml.lf.data.Time | ||
import com.daml.logging.LoggingContext | ||
import com.google.protobuf.timestamp | ||
import com.google.protobuf.timestamp.Timestamp.toJavaProto | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.wordspec.AnyWordSpec | ||
|
||
import scala.concurrent.duration._ | ||
|
||
class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers { | ||
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting | ||
|
||
private val step = TimeBoundBindingStep.setTimeBoundsInContextStep(TestHelpers.theDefaultConfig) | ||
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter")) | ||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry) | ||
private val aSubmissionTime = createProtobufTimestamp(seconds = 1) | ||
private val aLedgerEffectiveTime = createProtobufTimestamp(seconds = 2) | ||
private val aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes: DamlTransactionEntry = | ||
aDamlTransactionEntry.toBuilder | ||
.setSubmissionTime(toJavaProto(aSubmissionTime)) | ||
.setLedgerEffectiveTime(toJavaProto(aLedgerEffectiveTime)) | ||
.build() | ||
private val aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes = | ||
DamlTransactionEntrySummary(aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes) | ||
|
||
private val aDedupKey: DamlStateKey = Conversions | ||
.commandDedupKey(aDamlTransactionEntry.getSubmitterInfo) | ||
private val aDeduplicateUntil = createProtobufTimestamp(seconds = 3) | ||
|
||
private val aDedupValue = DamlStateValue.newBuilder | ||
.setCommandDedup( | ||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(toJavaProto(aDeduplicateUntil)) | ||
) | ||
.build() | ||
|
||
private val emptyConfigurationStateValue: DamlStateValue = | ||
defaultConfigurationStateValueBuilder().build | ||
|
||
private val inputWithTimeModelAndCommandDeduplication: Map[DamlStateKey, Some[DamlStateValue]] = | ||
Map( | ||
Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), | ||
aDedupKey -> Some(aDedupValue), | ||
) | ||
private val inputWithTimeModelAndEmptyCommandDeduplication = | ||
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), aDedupKey -> None) | ||
|
||
"time bounds binding step" should { | ||
"continue" in { | ||
val result = step( | ||
contextWithTimeModelAndEmptyCommandDeduplication(), | ||
aTransactionEntrySummary, | ||
) | ||
|
||
result match { | ||
case StepContinue(_) => succeed | ||
case StepStop(_) => fail() | ||
} | ||
} | ||
|
||
"compute and correctly set the min/max ledger time without deduplicateUntil" in { | ||
val context = contextWithTimeModelAndEmptyCommandDeduplication() | ||
|
||
step.apply( | ||
context, | ||
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes, | ||
) | ||
|
||
context.minimumRecordTime shouldEqual Some( | ||
protoTimestampToLf(aLedgerEffectiveTime) | ||
.subtract(theDefaultConfig.timeModel.minSkew) | ||
) | ||
context.maximumRecordTime shouldEqual Some( | ||
protoTimestampToLf(aSubmissionTime) | ||
.add(theDefaultConfig.timeModel.maxSkew) | ||
) | ||
context.deduplicateUntil shouldBe empty | ||
} | ||
|
||
"set deduplicate until when available" in { | ||
val context = contextWithTimeModelAndCommandDeduplication() | ||
step.apply( | ||
context, | ||
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes, | ||
) | ||
context.deduplicateUntil shouldEqual Some( | ||
protoTimestampToLf(aDeduplicateUntil) | ||
) | ||
} | ||
|
||
"mark config and dedup key as accessed in context" in { | ||
val commitContext = | ||
createCommitContext(recordTime = None, inputWithTimeModelAndCommandDeduplication) | ||
|
||
step.apply(commitContext, aTransactionEntrySummary) | ||
|
||
commitContext.getAccessedInputKeys should contain allOf (configurationStateKey, aDedupKey) | ||
} | ||
} | ||
|
||
private def contextWithTimeModelAndEmptyCommandDeduplication() = | ||
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndEmptyCommandDeduplication) | ||
|
||
private def contextWithTimeModelAndCommandDeduplication() = | ||
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndCommandDeduplication) | ||
|
||
private def createProtobufTimestamp(seconds: Long): timestamp.Timestamp = { | ||
timestamp.Timestamp(seconds) | ||
} | ||
private def defaultConfigurationStateValueBuilder(): DamlStateValue.Builder = | ||
DamlStateValue.newBuilder | ||
.setConfigurationEntry( | ||
DamlConfigurationEntry.newBuilder | ||
.setConfiguration(Configuration.encode(theDefaultConfig)) | ||
) | ||
|
||
private def protoTimestampToLf(ts: timestamp.Timestamp) = | ||
Time.Timestamp.assertFromLong(ts.seconds.seconds.toMicros) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove dependency on cats.