Skip to content

Commit

Permalink
participant-state: Re-enable integration test for command deduplicati…
Browse files Browse the repository at this point in the history
…on [KVL-1089] (#10751)

* Re-write command deduplication integration test for participant state to be valid for the deduplication changes.

The test now accounts for completions being generated for duplicate commands.

CHANGELOG_BEGIN

CHANGELOG_END

* Update check for successfull transaction

* Emit command rejected completions for duplicate commands rejections when using pre-execution

CHANGELOG_BEGIN
participant-state: Emit completions (CommandRejected) for duplicate commands when using pre-execution
CHANGELOG_END

* Update test for emitted duplicate command update

* Added test for new case.

* Revert redundant test case.

This reverts commit 8ef4364

* Test corner case of dropping not deduplicated transaction rejection.

* Code tidying.

* Reformatted.

Co-authored-by: Miklos Erdelyi <miklos.erdelyi@digitalasset.com>
  • Loading branch information
nicu-da and miklos-da authored Sep 3, 2021
1 parent 793253c commit 303ba90
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
1 change: 1 addition & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ conformance_test(
],
test_tool_args = [
"--verbose",
"--exclude=CommandDeduplicationIT",
"--exclude=ConfigManagementServiceIT",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,27 @@ object KeyValueConsumption {

val wrappedLogEntry = outOfTimeBoundsEntry.getEntry
wrappedLogEntry.getPayloadCase match {
case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if deduplicated =>
val transactionRejectionEntry = wrappedLogEntry.getTransactionRejectionEntry
Some(
Update.CommandRejected(
recordTime = recordTime,
completionInfo = parseCompletionInfo(
Conversions.parseInstant(recordTime),
transactionRejectionEntry.getSubmitterInfo,
),
reasonTemplate = FinalReason(
Status.of(
Code.ALREADY_EXISTS.value,
"Duplicate commands",
Seq.empty,
)
),
)
)

case _ if deduplicated =>
// We don't emit updates for deduplicated submissions.
// We only emit updates for duplicate transaction submissions.
None

case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if invalidRecordTime =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
}
}

"reject duplicate commands" ignore participantState.use { ps =>
"reject duplicate commands" in participantState.use { ps =>
val firstCommandId = "X1"
val secondCommandId = "X2"
for {
Expand All @@ -337,7 +337,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
)
.toScala
(offset2, update2) <- waitForNextUpdate(ps, Some(offset1))
// Below submission is a duplicate, should get dropped.
// Below submission is a duplicate.
result3 <- ps
.submitTransaction(
submitterInfo(alice, firstCommandId),
Expand All @@ -346,6 +346,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
DefaultInterpretationCost,
)
.toScala
(offset3, update3) <- waitForNextUpdate(ps, Some(offset2))
result4 <- ps
.submitTransaction(
submitterInfo(alice, secondCommandId),
Expand All @@ -354,10 +355,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
DefaultInterpretationCost,
)
.toScala
(offset3, update3) <- waitForNextUpdate(ps, Some(offset2))
(offset4, update4) <- waitForNextUpdate(ps, Some(offset3))
results = Seq(result1, result2, result3, result4)
_ = all(results) should be(SubmissionResult.Acknowledged)
updates = Seq(update1, update2, update3)
updates = Seq(update1, update2, update3, update4)
} yield {
all(updates.map(_.recordTime)) should be >= rt

Expand All @@ -367,8 +368,13 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
offset2 should be(toOffset(2))
matchTransaction(update2, firstCommandId)

offset3 should be(toOffset(4))
matchTransaction(update3, secondCommandId)
offset3 should be(toOffset(3))
inside(update3) { case CommandRejected(_, _, FinalReason(status)) =>
status.code should be(Code.ALREADY_EXISTS.value)
}

offset4 should be(toOffset(4))
matchTransaction(update4, secondCommandId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{
}
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.v2.Update.CommandRejected
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import com.daml.lf.data.Time.Timestamp
import com.google.protobuf.{ByteString, Empty}
import com.google.rpc.code.Code
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.prop.TableFor4
Expand Down Expand Up @@ -85,13 +87,26 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
)

"outOfTimeBoundsEntryToUpdate" should {
"not generate an update for deduplicated entries" in {
"generate update only for rejected and deduplicated transaction" in {
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(
TimeBounds(deduplicateUntil = Some(aRecordTime)),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions(update =>
inside(update) { case Some(CommandRejected(_, _, FinalReason(status))) =>
status.code shouldBe Code.ALREADY_EXISTS.value
()
}
),
),
(
TimeBounds(
deduplicateUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1)))
),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions(),
),
(
Expand Down

0 comments on commit 303ba90

Please sign in to comment.