Skip to content

Commit

Permalink
kvutils: Simplify CommandDeduplication somewhat, and clarify error me…
Browse files Browse the repository at this point in the history
…ssages. (digital-asset#12708)

* kvutils: Clarify handling of all deduplication period cases.

* kvutils: Simplify CommandDeduplication somewhat.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Feb 2, 2022
1 parent 8bd908e commit 496bc4e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,27 @@ private[transaction] object CommandDeduplication {
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val commandDeduplicationDuration =
if (transactionEntry.submitterInfo.hasDeduplicationDuration)
parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration)
else
throw Err.InternalError(
"Deduplication period not supported, only durations are supported"
)
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val dedupEntry = commitContext.get(dedupKey)
val maybeDedupValue = dedupEntry
.filter(_.hasCommandDedup)
.map(_.getCommandDedup)
transactionEntry.submitterInfo.getDeduplicationPeriodCase match {
case DamlSubmitterInfo.DeduplicationPeriodCase.DEDUPLICATION_DURATION =>
parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration)
case DamlSubmitterInfo.DeduplicationPeriodCase.DEDUPLICATE_UNTIL =>
throw Err.InternalError(
"Deduplication timestamp not supported, only durations are supported"
)
case DamlSubmitterInfo.DeduplicationPeriodCase.DEDUPLICATION_OFFSET =>
throw Err.InternalError(
"Deduplication offset not supported, only durations are supported"
)
case DamlSubmitterInfo.DeduplicationPeriodCase.DEDUPLICATIONPERIOD_NOT_SET =>
throw Err.InternalError(
"Deduplication period must be set"
)
}
val maybeDedupValue =
commitContext
.get(commandDedupKey(transactionEntry.submitterInfo))
.filter(_.hasCommandDedup)
.map(_.getCommandDedup)
val isNotADuplicate =
isTheCommandNotADuplicate(commitContext, commandDeduplicationDuration, maybeDedupValue)
if (isNotADuplicate) {
Expand All @@ -79,8 +89,8 @@ private[transaction] object CommandDeduplication {
commitContext: CommitContext,
commandDeduplicationDuration: Duration,
maybeDedupValue: Option[DamlCommandDedupValue],
) = {
val recordTimeOrMinimumRecordTime = commitContext.recordTime match {
): Boolean = {
val minimumRecordTime = commitContext.recordTime match {
case Some(recordTime) =>
// During the normal execution, in the deduplication state value we stored the record time
// This allows us to compare the record times directly
Expand All @@ -96,35 +106,28 @@ private[transaction] object CommandDeduplication {
)
.toInstant
}
maybeDedupValue
.flatMap(commandDeduplication =>
commandDeduplication.getTimeCase match {
// Backward-compatibility, will not be set for new entries
case TimeCase.DEDUPLICATED_UNTIL =>
Some(parseTimestamp(commandDeduplication.getDeduplicatedUntil))
// Set during normal execution, no time skews are added
case TimeCase.RECORD_TIME =>
val storedDuplicateRecordTime =
parseTimestamp(commandDeduplication.getRecordTime)
Some(
storedDuplicateRecordTime
.add(commandDeduplicationDuration)
)
// Set during pre-execution, time skews are already accounted for
case TimeCase.RECORD_TIME_BOUNDS =>
val maxRecordTime =
parseTimestamp(commandDeduplication.getRecordTimeBounds.getMaxRecordTime)
Some(
maxRecordTime
.add(commandDeduplicationDuration)
)
case TimeCase.TIME_NOT_SET =>
None
}
)
.forall(deduplicatedUntil =>
recordTimeOrMinimumRecordTime.isAfter(deduplicatedUntil.toInstant)
)
val maybeDeduplicatedUntil = maybeDedupValue.flatMap(commandDeduplication =>
commandDeduplication.getTimeCase match {
// Backward-compatibility, will not be set for new entries
case TimeCase.DEDUPLICATED_UNTIL =>
Some(parseTimestamp(commandDeduplication.getDeduplicatedUntil))
// Set during normal execution, no time skews are added
case TimeCase.RECORD_TIME =>
val storedDuplicateRecordTime =
parseTimestamp(commandDeduplication.getRecordTime)
Some(storedDuplicateRecordTime.add(commandDeduplicationDuration))
// Set during pre-execution, time skews are already accounted for
case TimeCase.RECORD_TIME_BOUNDS =>
val maxRecordTime =
parseTimestamp(commandDeduplication.getRecordTimeBounds.getMaxRecordTime)
Some(maxRecordTime.add(commandDeduplicationDuration))
case TimeCase.TIME_NOT_SET =>
None
}
)
maybeDeduplicatedUntil.forall(deduplicatedUntil =>
minimumRecordTime.isAfter(deduplicatedUntil.toInstant)
)
}

private def preExecutionDuplicateRejection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CommandDeduplicationSpec
with Matchers
with MockitoSugar
with OptionValues {

import CommandDeduplicationSpec._

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
Expand All @@ -55,16 +56,16 @@ class CommandDeduplicationSpec
CommandDeduplication.setDeduplicationEntryStep(theDefaultConfig)
private val timestamp: Timestamp = Timestamp.now()

"deduplicateCommand" should {
Map(
"pre-execution" -> ((dedupValueBuilder: Timestamp => Option[DamlStateValue]) => {
"deduplicateCommand" when {
Map[String, (Timestamp => Option[DamlStateValue]) => (Timestamp, CommitContext)](
"pre-execution" -> (dedupValueBuilder => {
val dedupValue = dedupValueBuilder(timestamp)
val commitContext = createCommitContext(None, Map(aDedupKey -> dedupValue))
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(Timestamp.Epoch)
timestamp -> commitContext
}),
"normal-execution" -> ((dedupValueBuilder: Timestamp => Option[DamlStateValue]) => {
"normal-execution" -> (dedupValueBuilder => {
val dedupValue = dedupValueBuilder(timestamp)
val commitContext = createCommitContext(Some(timestamp), Map(aDedupKey -> dedupValue))
timestamp -> commitContext
Expand All @@ -84,7 +85,6 @@ class CommandDeduplicationSpec
}

"using deduplicate until" should {

"continue if record time is after deduplication time in case a deduplication entry is found" in {
val (_, context) = contextBuilder(timestamp =>
Some(
Expand Down Expand Up @@ -118,14 +118,16 @@ class CommandDeduplicationSpec

"using record time" should {
forAll(
Table(
Table[
String,
Timestamp => DamlCommandDedupValue.Builder => DamlCommandDedupValue.Builder,
](
"identifier" -> "time setter",
"record time" -> ((timestamp: Timestamp) =>
(builder: DamlCommandDedupValue.Builder) =>
builder.setRecordTime(buildTimestamp(timestamp))
"record time" -> (timestamp =>
builder => builder.setRecordTime(buildTimestamp(timestamp))
),
"record time bounds" -> ((timestamp: Timestamp) =>
(builder: DamlCommandDedupValue.Builder) =>
"record time bounds" -> (timestamp =>
builder =>
builder.setRecordTimeBounds(
buildPreExecutionDeduplicationBounds(
timestamp.subtract(deduplicationDuration),
Expand Down

0 comments on commit 496bc4e

Please sign in to comment.