Skip to content

Commit

Permalink
Extract the setting of the time bounds on the commit context [KVL-117…
Browse files Browse the repository at this point in the history
…3] (#11516)

CHANGELOG_BEGIN

CHANGELOG_END
  • Loading branch information
nicu-da authored Nov 3, 2021
1 parent 9bb1d64 commit 2efcb21
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 162 deletions.
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ da_scala_library(
"@maven//:org_scala_lang_modules_scala_java8_compat",
"@maven//:org_scalaz_scalaz_core",
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_typelevel_cats_core",
],
tags = ["maven_coordinates=com.daml:participant-state-kvutils:__VERSION__"],
visibility = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import com.codahale.metrics.Counter
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.ledger.participant.state.kvutils.committer.{StepResult, StepStop}
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlSubmitterInfo,
DamlTransactionRejectionEntry,
}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
Expand Down Expand Up @@ -43,13 +46,13 @@ private[transaction] class Rejections(metrics: Metrics) {
}

def preExecutionOutOfTimeBoundsRejectionEntry(
transactionEntry: DamlTransactionEntrySummary,
submitterInfo: DamlSubmitterInfo,
minimumRecordTime: Timestamp,
maximumRecordTime: Timestamp,
): DamlTransactionRejectionEntry =
Conversions
.encodeTransactionRejectionEntry(
transactionEntry.submitterInfo,
submitterInfo,
Rejection.RecordTimeOutOfRange(minimumRecordTime, maximumRecordTime),
)
.build
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.committer.transaction

import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp}
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext

object TimeBoundBindingStep {

def setTimeBoundsInContextStep(defaultConfig: Configuration): Step = new Step {
override def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)(
implicit loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel

if (commitContext.preExecute) {
val maybeDeduplicateUntil =
getLedgerDeduplicateUntil(transactionEntry, commitContext)
val minimumRecordTime = transactionMinRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
maybeDeduplicateUntil,
timeModel,
)
val maximumRecordTime = transactionMaxRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
commitContext.deduplicateUntil = maybeDeduplicateUntil
commitContext.minimumRecordTime = Some(minimumRecordTime)
commitContext.maximumRecordTime = Some(maximumRecordTime)
}
StepContinue(transactionEntry)
}
}

@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable"))
private def transactionMinRecordTime(
submissionTime: Timestamp,
ledgerTime: Timestamp,
maybeDeduplicateUntil: Option[Timestamp],
timeModel: LedgerTimeModel,
): Timestamp =
List(
maybeDeduplicateUntil
.map(
_.add(Timestamp.Resolution)
), // DeduplicateUntil defines a rejection window, endpoints inclusive
Some(timeModel.minRecordTime(ledgerTime)),
Some(timeModel.minRecordTime(submissionTime)),
).flatten.max

private def transactionMaxRecordTime(
submissionTime: Timestamp,
ledgerTime: Timestamp,
timeModel: LedgerTimeModel,
): Timestamp =
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min

private def getLedgerDeduplicateUntil(
transactionEntry: DamlTransactionEntrySummary,
commitContext: CommitContext,
): Option[Timestamp] =
for {
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo))
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) {
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil
}
} yield parseTimestamp(dedupTimestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private[kvutils] class TransactionCommitter(
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
"overwrite_deduplication_period" -> overwriteDeduplicationPeriodWithMaxDuration,
"deduplicate" -> deduplicateCommand,
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils.committer.transaction.validation

import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp}
import cats.syntax.contravariantSemigroupal._
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Err
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
Expand All @@ -14,7 +15,6 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.{
}
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext

private[transaction] class LedgerTimeValidator(defaultConfig: Configuration)
Expand All @@ -27,12 +27,12 @@ private[transaction] class LedgerTimeValidator(defaultConfig: Configuration)
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel

commitContext.recordTime match {
case Some(recordTime) =>
val givenLedgerTime = transactionEntry.ledgerEffectiveTime
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel

timeModel
.checkTime(ledgerTime = givenLedgerTime, recordTime = recordTime)
Expand All @@ -46,68 +46,26 @@ private[transaction] class LedgerTimeValidator(defaultConfig: Configuration)
_ => StepContinue(transactionEntry),
)
case None => // Pre-execution: propagate the time bounds and defer the checks to post-execution.
val maybeDeduplicateUntil =
getLedgerDeduplicateUntil(transactionEntry, commitContext)
val minimumRecordTime = transactionMinRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
maybeDeduplicateUntil,
timeModel,
)
val maximumRecordTime = transactionMaxRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
commitContext.deduplicateUntil = maybeDeduplicateUntil
commitContext.minimumRecordTime = Some(minimumRecordTime)
commitContext.maximumRecordTime = Some(maximumRecordTime)
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
rejections.preExecutionOutOfTimeBoundsRejectionEntry(
transactionEntry,
minimumRecordTime,
maximumRecordTime,
(commitContext.minimumRecordTime, commitContext.maximumRecordTime)
.mapN((minimumRecordTime, maximumRecordTime) => {
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
rejections.preExecutionOutOfTimeBoundsRejectionEntry(
transactionEntry.submitterInfo,
minimumRecordTime,
maximumRecordTime,
)
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
StepContinue(transactionEntry)
})
.getOrElse(
throw Err.InternalError(
"Minimum and maximum record times were not set in the committer context"
)
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
StepContinue(transactionEntry)
}
}
}

@SuppressWarnings(Array("org.wartremover.warts.Option2Iterable"))
private def transactionMinRecordTime(
submissionTime: Timestamp,
ledgerTime: Timestamp,
maybeDeduplicateUntil: Option[Timestamp],
timeModel: LedgerTimeModel,
): Timestamp =
List(
maybeDeduplicateUntil
.map(
_.add(Timestamp.Resolution)
), // DeduplicateUntil defines a rejection window, endpoints inclusive
Some(timeModel.minRecordTime(ledgerTime)),
Some(timeModel.minRecordTime(submissionTime)),
).flatten.max

private def transactionMaxRecordTime(
submissionTime: Timestamp,
ledgerTime: Timestamp,
timeModel: LedgerTimeModel,
): Timestamp =
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min

private def getLedgerDeduplicateUntil(
transactionEntry: DamlTransactionEntrySummary,
commitContext: CommitContext,
): Option[Timestamp] =
for {
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo))
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) {
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil
}
} yield parseTimestamp(dedupTimestamp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.committer.transaction

import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.configurationStateKey
import com.daml.ledger.participant.state.kvutils.TestHelpers.{
createCommitContext,
createEmptyTransactionEntry,
theDefaultConfig,
}
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
DamlTransactionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{
DamlCommandDedupValue,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.{Conversions, TestHelpers}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext
import com.google.protobuf.timestamp
import com.google.protobuf.timestamp.Timestamp.toJavaProto
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.duration._

class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private val step = TimeBoundBindingStep.setTimeBoundsInContextStep(TestHelpers.theDefaultConfig)
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
private val aSubmissionTime = createProtobufTimestamp(seconds = 1)
private val aLedgerEffectiveTime = createProtobufTimestamp(seconds = 2)
private val aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes: DamlTransactionEntry =
aDamlTransactionEntry.toBuilder
.setSubmissionTime(toJavaProto(aSubmissionTime))
.setLedgerEffectiveTime(toJavaProto(aLedgerEffectiveTime))
.build()
private val aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes =
DamlTransactionEntrySummary(aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes)

private val aDedupKey: DamlStateKey = Conversions
.commandDedupKey(aDamlTransactionEntry.getSubmitterInfo)
private val aDeduplicateUntil = createProtobufTimestamp(seconds = 3)

private val aDedupValue = DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(toJavaProto(aDeduplicateUntil))
)
.build()

private val emptyConfigurationStateValue: DamlStateValue =
defaultConfigurationStateValueBuilder().build

private val inputWithTimeModelAndCommandDeduplication: Map[DamlStateKey, Some[DamlStateValue]] =
Map(
Conversions.configurationStateKey -> Some(emptyConfigurationStateValue),
aDedupKey -> Some(aDedupValue),
)
private val inputWithTimeModelAndEmptyCommandDeduplication =
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), aDedupKey -> None)

"time bounds binding step" should {
"continue" in {
val result = step(
contextWithTimeModelAndEmptyCommandDeduplication(),
aTransactionEntrySummary,
)

result match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}

"compute and correctly set the min/max ledger time without deduplicateUntil" in {
val context = contextWithTimeModelAndEmptyCommandDeduplication()

step.apply(
context,
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes,
)

context.minimumRecordTime shouldEqual Some(
protoTimestampToLf(aLedgerEffectiveTime)
.subtract(theDefaultConfig.timeModel.minSkew)
)
context.maximumRecordTime shouldEqual Some(
protoTimestampToLf(aSubmissionTime)
.add(theDefaultConfig.timeModel.maxSkew)
)
context.deduplicateUntil shouldBe empty
}

"set deduplicate until when available" in {
val context = contextWithTimeModelAndCommandDeduplication()
step.apply(
context,
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes,
)
context.deduplicateUntil shouldEqual Some(
protoTimestampToLf(aDeduplicateUntil)
)
}

"mark config and dedup key as accessed in context" in {
val commitContext =
createCommitContext(recordTime = None, inputWithTimeModelAndCommandDeduplication)

step.apply(commitContext, aTransactionEntrySummary)

commitContext.getAccessedInputKeys should contain allOf (configurationStateKey, aDedupKey)
}
}

private def contextWithTimeModelAndEmptyCommandDeduplication() =
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndEmptyCommandDeduplication)

private def contextWithTimeModelAndCommandDeduplication() =
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndCommandDeduplication)

private def createProtobufTimestamp(seconds: Long): timestamp.Timestamp = {
timestamp.Timestamp(seconds)
}
private def defaultConfigurationStateValueBuilder(): DamlStateValue.Builder =
DamlStateValue.newBuilder
.setConfigurationEntry(
DamlConfigurationEntry.newBuilder
.setConfiguration(Configuration.encode(theDefaultConfig))
)

private def protoTimestampToLf(ts: timestamp.Timestamp) =
Time.Timestamp.assertFromLong(ts.seconds.seconds.toMicros)
}
Loading

0 comments on commit 2efcb21

Please sign in to comment.