Skip to content

Commit

Permalink
[Sandbox-on-X] Fix record time assignment (digital-asset#12706)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
tudor-da authored Feb 2, 2022
1 parent 496bc4e commit 784faf6
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,32 @@ import scala.collection.immutable.VectorMap
case class DeduplicationState private (
private[validate] val deduplicationQueue: DeduplicationQueue,
private val maxDeduplicationDuration: Duration,
private val currentTime: () => Time.Timestamp,
private val bridgeMetrics: BridgeMetrics,
) {

def deduplicate(
changeId: ChangeId,
commandDeduplicationDuration: Duration,
recordTime: Time.Timestamp,
): (DeduplicationState, Boolean) = {
bridgeMetrics.SequencerState.deduplicationQueueLength.update(deduplicationQueue.size)
if (commandDeduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
throw new RuntimeException(
s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
)
else {
val now = currentTime()
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, now)
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, recordTime)

val queueAfterEvictions = deduplicationQueue.dropWhile(_._2 <= expiredTimestamp)

val isDuplicateChangeId = queueAfterEvictions
.get(changeId)
.exists(_ > expiredThreshold(commandDeduplicationDuration, now))
.exists(_ >= expiredThreshold(commandDeduplicationDuration, recordTime))

if (isDuplicateChangeId)
copy(deduplicationQueue = queueAfterEvictions) -> true
else
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, now)) -> false
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, recordTime)) -> false
}
}

Expand All @@ -56,13 +55,11 @@ object DeduplicationState {

private[validate] def empty(
deduplicationDuration: Duration,
currentTime: () => Time.Timestamp,
bridgeMetrics: BridgeMetrics,
): DeduplicationState =
DeduplicationState(
deduplicationQueue = VectorMap.empty,
maxDeduplicationDuration = deduplicationDuration,
currentTime = currentTime,
bridgeMetrics = bridgeMetrics,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ private[validate] class SequenceImpl(
bridgeMetrics: BridgeMetrics,
errorFactories: ErrorFactories,
maxDeduplicationDuration: Duration,
wallClockTime: () => Time.Timestamp = () => Timestamp.now(),
) extends Sequence {
private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)

Expand All @@ -54,7 +53,7 @@ private[validate] class SequenceImpl(
@volatile private[validate] var allocatedParties = initialAllocatedParties
@volatile private[validate] var ledgerConfiguration = initialLedgerConfiguration
@volatile private[validate] var deduplicationState =
DeduplicationState.empty(maxDeduplicationDuration, wallClockTime, bridgeMetrics)
DeduplicationState.empty(maxDeduplicationDuration, bridgeMetrics)

override def apply(): Validation[(Offset, PreparedSubmission)] => Iterable[(Offset, Update)] =
in => {
Expand Down Expand Up @@ -149,11 +148,11 @@ private[validate] class SequenceImpl(
newOffset: LastUpdatedAt,
recordTime: Timestamp,
txSubmission: PreparedTransactionSubmission,
) = {
val submitterInfo = txSubmission.submission.submitterInfo
val completionInfo = submitterInfo.toCompletionInfo()
): Update =
withErrorLogger(txSubmission.submission.submitterInfo.submissionId) { implicit errorLogger =>
val submitterInfo = txSubmission.submission.submitterInfo
val completionInfo = submitterInfo.toCompletionInfo()

withErrorLogger(submitterInfo.submissionId) { implicit errorLogger =>
for {
_ <- checkTimeModel(
transaction = txSubmission.submission,
Expand All @@ -172,37 +171,30 @@ private[validate] class SequenceImpl(
inputContracts = txSubmission.inputContracts,
completionInfo = completionInfo,
)
_ <- deduplicateAndUpdateState(
recordTime = timeProvider.getCurrentTimestamp
updatedDeduplicationState <- deduplicate(
changeId = ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
),
deduplicationPeriod = txSubmission.submission.submitterInfo.deduplicationPeriod,
deduplicationPeriod = submitterInfo.deduplicationPeriod,
completionInfo = completionInfo,
recordTime = recordTime,
)
} yield ()
}(txSubmission.submission.loggingContext, logger)
.fold(
_.toCommandRejectedUpdate(recordTime),
{ _ =>
// Update the sequencer state
sequencerState = sequencerState
.dequeue(noConflictUpTo)
.enqueue(
newOffset,
txSubmission.updatedKeys,
txSubmission.consumedContracts,
)

transactionAccepted(
txSubmission.submission,
offsetIdx,
timeProvider.getCurrentTimestamp,
)
},
_ = updateStatesOnSuccessfulValidation(
noConflictUpTo,
newOffset,
txSubmission,
updatedDeduplicationState,
)
} yield transactionAccepted(
txSubmission.submission,
offsetIdx,
recordTime,
)
}
}(txSubmission.submission.loggingContext, logger)
.fold(_.toCommandRejectedUpdate(recordTime), identity)

private def conflictCheckWithInFlight(
keysState: Map[Key, (Option[ContractId], LastUpdatedAt)],
Expand Down Expand Up @@ -240,22 +232,22 @@ private[validate] class SequenceImpl(
}
}

private def deduplicateAndUpdateState(
private def deduplicate(
changeId: ChangeId,
deduplicationPeriod: DeduplicationPeriod,
completionInfo: CompletionInfo,
recordTime: Time.Timestamp,
)(implicit
errorLogger: ContextualizedErrorLogger
): Validation[Unit] =
): Validation[DeduplicationState] =
deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(commandDeduplicationDuration) =>
val (newDeduplicationState, isDuplicate) =
deduplicationState.deduplicate(changeId, commandDeduplicationDuration)
deduplicationState.deduplicate(changeId, commandDeduplicationDuration, recordTime)

deduplicationState = newDeduplicationState
Either.cond(
!isDuplicate,
(),
newDeduplicationState,
DuplicateCommand(changeId, completionInfo),
)
case _: DeduplicationPeriod.DeduplicationOffset =>
Expand Down Expand Up @@ -296,4 +288,20 @@ private[validate] class SequenceImpl(
.map(Rejection.InvalidLedgerTime(completionInfo, _)(errorFactories))
)
}

private def updateStatesOnSuccessfulValidation(
noConflictUpTo: LastUpdatedAt,
newOffset: LastUpdatedAt,
txSubmission: PreparedTransactionSubmission,
updatedDeduplicationState: DeduplicationState,
): Unit = {
sequencerState = sequencerState
.dequeue(noConflictUpTo)
.enqueue(
newOffset,
txSubmission.updatedKeys,
txSubmission.consumedContracts,
)
deduplicationState = updatedDeduplicationState
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,48 @@ import scala.util.chaining._
class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
behavior of classOf[DeduplicationState].getSimpleName

private val initialTime = Time.Timestamp.now()
private val t0 = Time.Timestamp.now()
private val t1 = t0.add(Duration.ofMinutes(1L))
private val t2 = t0.add(Duration.ofMinutes(2L))
private val t3 = t0.add(Duration.ofMinutes(3L))

private val bridgeMetrics = new BridgeMetrics(new Metrics(new MetricRegistry))

it should "deduplicate commands within the requested deduplication window" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(3L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)

deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t0,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> t0)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t1,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> t0)
isDuplicate shouldBe true
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t3,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime.add(Duration.ofMinutes(2))
changeId(1) -> t3
)
isDuplicate shouldBe false
}
Expand All @@ -54,33 +69,44 @@ class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
it should "evicts old entries (older than max deduplication time)" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(2L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)

deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t0,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime
changeId(1) -> t0
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(2), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(2),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t1,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime,
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
changeId(1) -> t0,
changeId(2) -> t1,
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(3), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(3),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t2,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
changeId(3) -> initialTime.add(Duration.ofMinutes(2)),
changeId(2) -> t1,
changeId(3) -> t2,
)
isDuplicate shouldBe false
}
Expand All @@ -93,24 +119,16 @@ class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
DeduplicationState
.empty(
deduplicationDuration = maxDeduplicationDuration,
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
.deduplicate(changeId(1337), commandDeduplicationDuration)
.deduplicate(changeId(1337), commandDeduplicationDuration, t0)
) match {
case Failure(ex) =>
ex.getMessage shouldBe s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
case Success(_) => fail("It should throw an exception on invalid deduplication durations")
}
}

// Current time provider mock builder.
// On each call, the mock advances the time by 1 minute
private def currentTimeMock: () => Time.Timestamp = {
var currentTime = initialTime
() => currentTime.tap(_ => currentTime = currentTime.add(Duration.ofMinutes(1L)))
}

private def changeId(idx: Int): ChangeId = ChangeId(
applicationId = Ref.ApplicationId.assertFromString("some-app"),
commandId = Ref.CommandId.assertFromString(s"some-command-$idx"),
Expand Down
Loading

0 comments on commit 784faf6

Please sign in to comment.