Skip to content

Commit

Permalink
kvutils - Backwards looking command deduplication [KVL-1174] (#11634)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
kvutils - The deduplication duration passed in the command is now used for command deduplication, and is no longer always overwritten with the max deduplication duration. The command deduplication duration can still be extended by the committer to account for time skews.
CHANGELOG_END
  • Loading branch information
nicu-da authored Nov 18, 2021
1 parent 43438c6 commit dd88ba2
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 277 deletions.
6 changes: 2 additions & 4 deletions compatibility/bazel_tools/testing.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -535,12 +535,10 @@ excluded_test_tool_tests = [
"start": "1.18.0-snapshot.20211026.8179.0.e474b2d1",
"platform_ranges": [
{
"end": "1.18.0-snapshot.20211026.8179.0.e474b2d1",
"end": "1.18.0-snapshot.20211102.8257.1",
"exclusions": [
# Exclude dedup tests due to large number of changes (removed participant deduplication, switch to append-only schema, changes in deduplication duration)
"KVCommandDeduplicationIT:KVCommandDeduplicationSimpleDeduplicationMixedClients",
"KVCommandDeduplicationIT:KVCommandDeduplicationDeduplicateSubmitterBasic",
"KVCommandDeduplicationIT:KVCommandDeduplicationSimpleDeduplicationBasic",
"KVCommandDeduplicationIT",
"CommandDeduplicationIT", # Latest version of the test is dependent on having the submission id populated
],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package com.daml.ledger.api.testtool.infrastructure.deduplication

import com.daml.error.ErrorCode
import com.daml.error.definitions.LedgerApiErrors

import java.util.UUID

import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions._
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
Expand All @@ -21,6 +21,7 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.DA.Types.Tuple2
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, TextKeyOperations}
import com.daml.timer.Delayed
import io.grpc.Status.Code

import scala.annotation.nowarn
Expand Down Expand Up @@ -70,7 +71,7 @@ private[testtool] abstract class CommandDeduplicationBase(
completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
_ <- submitRequestAndAssertDeduplication(ledger)(request, party)
// Wait until the end of first deduplication window
_ <- delay()
_ <- delay.delayForEntireDeduplicationPeriod()

// Submit command (second deduplication window)
// Note: the deduplication window is guaranteed to have passed on both
Expand Down Expand Up @@ -184,7 +185,7 @@ private[testtool] abstract class CommandDeduplicationBase(
_ <- submitAndWaitRequestAndAssertDeduplication(ledger)(request)

// Wait until the end of first deduplication window
_ <- delay()
_ <- delay.delayForEntireDeduplicationPeriod()

// Submit command (second deduplication window)
_ <- ledger.submitAndWait(request)
Expand Down Expand Up @@ -253,7 +254,7 @@ private[testtool] abstract class CommandDeduplicationBase(
_ <- submitAndAssertDeduplicated(secondCall)

// Wait until the end of first deduplication window
_ <- delay()
_ <- delay.delayForEntireDeduplicationPeriod()

// Submit command (second deduplication window)
_ <- submitAndAssertAccepted(thirdCall)
Expand Down Expand Up @@ -467,7 +468,38 @@ private[testtool] abstract class CommandDeduplicationBase(

object CommandDeduplicationBase {
trait DelayMechanism {
def apply(): Future[Unit]
val deduplicationDuration: Duration
val extraWait: Duration

/** Delay by the guaranteed full deduplication period. After calling this method any duplicate calls should succeed
*/
def delayForEntireDeduplicationPeriod(): Future[Unit] =
delayBy(deduplicationDuration + extraWait)

/** Delay with [[duration]]
*/
protected def delayBy(duration: Duration): Future[Unit]
}

class TimeDelayMechanism(val deduplicationDuration: Duration, val extraWait: Duration)(implicit
ec: ExecutionContext
) extends DelayMechanism {

override protected def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(())
}

class StaticTimeDelayMechanism(
ledger: ParticipantTestContext,
val deduplicationDuration: Duration,
val extraWait: Duration,
)(implicit ec: ExecutionContext)
extends DelayMechanism {
override protected def delayBy(duration: Duration): Future[Unit] =
ledger
.time()
.flatMap { currentTime =>
ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis))
}
}

/** @param participantDeduplication If participant deduplication is enabled then we will receive synchronous rejections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplic
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.{
DeduplicationFeatures,
DelayMechanism,
TimeDelayMechanism,
}
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.timer.Delayed

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -28,7 +28,7 @@ final class CommandDeduplicationIT(
)(
testWithDelayMechanism: DelayMechanism => Future[Unit]
)(implicit ec: ExecutionContext): Future[Unit] =
testWithDelayMechanism(() => Delayed.by(defaultDeduplicationWindowWait)(()))
testWithDelayMechanism(new TimeDelayMechanism(deduplicationDuration, ledgerWaitInterval))

override def testNamingPrefix: String = "ParticipantCommandDeduplication"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplic
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.{
DeduplicationFeatures,
DelayMechanism,
StaticTimeDelayMechanism,
TimeDelayMechanism,
}
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.admin.config_management_service.TimeModel
import com.daml.timer.Delayed
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
Expand Down Expand Up @@ -43,69 +44,37 @@ class KVCommandDeduplicationIT(
)(
testWithDelayMechanism: DelayMechanism => Future[Unit]
)(implicit ec: ExecutionContext): Future[Unit] = {
runWithConfig(participants) { case (maxDeduplicationDuration, minSkew) =>
runWithConfig(participants) { extraWait =>
val anyParticipant = participants.head
testWithDelayMechanism(() => delay(anyParticipant, maxDeduplicationDuration, minSkew))
testWithDelayMechanism(delayMechanism(anyParticipant, extraWait))
}
}

private def delay(
private def delayMechanism(
ledger: ParticipantTestContext,
maxDeduplicationDuration: Duration,
minSkew: Duration,
extraWait: Duration,
)(implicit ec: ExecutionContext) = {
val committerDeduplicationWindow =
maxDeduplicationDuration.plus(minSkew).plus(ledgerWaitInterval)
if (staticTime) {
forwardTimeWithDuration(ledger, committerDeduplicationWindow)
new StaticTimeDelayMechanism(ledger, deduplicationDuration, extraWait)
} else {
Delayed.by(committerDeduplicationWindow)(())
new TimeDelayMechanism(deduplicationDuration, extraWait)
}
}

private def forwardTimeWithDuration(
ledger: ParticipantTestContext,
duration: Duration,
)(implicit ec: ExecutionContext): Future[Unit] =
ledger
.time()
.flatMap(currentTime => {
ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis))
})

private def runWithConfig(
participants: Seq[ParticipantTestContext]
)(
test: (FiniteDuration, FiniteDuration) => Future[Unit]
)(implicit ec: ExecutionContext): Future[Unit] = {
// deduplication duration is increased by minSkew in the committer so we set the skew to a low value for testing
val minSkew = scaledDuration(1.second).asProtobuf
val anyParticipant = participants.head
anyParticipant
.configuration()
.flatMap(ledgerConfiguration => {
val maxDeduplicationTime = ledgerConfiguration.maxDeduplicationTime
.getOrElse(
throw new IllegalStateException(
"Max deduplication time was not set and our deduplication period depends on it"
)
)
.asScala
// max deduplication should be set to 5 seconds through the --max-deduplication-duration flag
assert(
maxDeduplicationTime <= 5.seconds,
s"Max deduplication time [$maxDeduplicationTime] is too high for the test.",
)
runWithUpdatedTimeModel(
participants,
_.update(_.minSkew := minSkew),
)(timeModel =>
test(
asFiniteDuration(maxDeduplicationTime),
asFiniteDuration(timeModel.getMinSkew.asScala),
)
)
})
)(test: FiniteDuration => Future[Unit])(implicit ec: ExecutionContext): Future[Unit] = {
// deduplication duration is adjusted by min skew and max skew when running using pre-execution
// to account for this we adjust the time model
val skew = scaledDuration(1.second).asProtobuf
runWithUpdatedTimeModel(
participants,
_.update(_.minSkew := skew, _.maxSkew := skew),
)(timeModel =>
test(
asFiniteDuration(timeModel.getMinSkew.asScala + timeModel.getMaxSkew.asScala)
)
)
}

private def runWithUpdatedTimeModel(
Expand All @@ -121,7 +90,7 @@ class KVCommandDeduplicationIT(
time <- participant.time()
_ <- participant
.setTimeModel(
time.plusSeconds(30),
time.plusSeconds(1),
timeModel.configurationGeneration + 1,
timeModel.getTimeModel,
)
Expand All @@ -138,11 +107,10 @@ class KVCommandDeduplicationIT(
(timeModelForTest, participantThatDidTheUpdate) <- tryTimeModelUpdateOnAllParticipants(
participants,
_.setTimeModel(
time.plusSeconds(30),
time.plusSeconds(1),
timeModel.configurationGeneration,
updatedModel,
)
.map(_ => updatedModel),
).map(_ => updatedModel),
)
_ <- test(timeModelForTest)
.transformWith(testResult =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ message DamlCommandDedupKey {

message DamlCommandDedupValue {
reserved 1; // was record_time
// The time until when future commands with the same
// deduplication key will be rejected due to a duplicate submission.
google.protobuf.Timestamp deduplicated_until = 2;
// The submission time of the initial command.
// We use submission time because record time is not available during pre-execution.
google.protobuf.Timestamp submission_time = 3;
oneof time {
// The time until when future commands with the same
// deduplication key will be rejected due to a duplicate submission.
google.protobuf.Timestamp deduplicated_until = 2;
google.protobuf.Timestamp record_time = 3;
PreExecutionDeduplicationBounds record_time_bounds = 4;
}
}
message PreExecutionDeduplicationBounds {
// record_time is not available during pre-execution
google.protobuf.Timestamp max_record_time = 1;
google.protobuf.Timestamp min_record_time = 2;
}

message DamlSubmissionDedupKey {
Expand Down
Loading

0 comments on commit dd88ba2

Please sign in to comment.