diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index efcb56c2148d..ff8e080e6f85 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -173,6 +173,7 @@ conformance_test( ], test_tool_args = [ "--verbose", + "--exclude=CommandDeduplicationIT", "--exclude=ConfigManagementServiceIT", ], ) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index 7dc009dd6951..9b6fe3152613 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -325,8 +325,27 @@ object KeyValueConsumption { val wrappedLogEntry = outOfTimeBoundsEntry.getEntry wrappedLogEntry.getPayloadCase match { + case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if deduplicated => + val transactionRejectionEntry = wrappedLogEntry.getTransactionRejectionEntry + Some( + Update.CommandRejected( + recordTime = recordTime, + completionInfo = parseCompletionInfo( + Conversions.parseInstant(recordTime), + transactionRejectionEntry.getSubmitterInfo, + ), + reasonTemplate = FinalReason( + Status.of( + Code.ALREADY_EXISTS.value, + "Duplicate commands", + Seq.empty, + ) + ), + ) + ) + case _ if deduplicated => - // We don't emit updates for deduplicated submissions. + // We only emit updates for duplicate transaction submissions. None case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if invalidRecordTime => diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index 4f5abd04c5e6..fb5038c867ad 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -322,7 +322,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i } } - "reject duplicate commands" ignore participantState.use { ps => + "reject duplicate commands" in participantState.use { ps => val firstCommandId = "X1" val secondCommandId = "X2" for { @@ -337,7 +337,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i ) .toScala (offset2, update2) <- waitForNextUpdate(ps, Some(offset1)) - // Below submission is a duplicate, should get dropped. + // Below submission is a duplicate. result3 <- ps .submitTransaction( submitterInfo(alice, firstCommandId), @@ -346,6 +346,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i DefaultInterpretationCost, ) .toScala + (offset3, update3) <- waitForNextUpdate(ps, Some(offset2)) result4 <- ps .submitTransaction( submitterInfo(alice, secondCommandId), @@ -354,10 +355,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i DefaultInterpretationCost, ) .toScala - (offset3, update3) <- waitForNextUpdate(ps, Some(offset2)) + (offset4, update4) <- waitForNextUpdate(ps, Some(offset3)) results = Seq(result1, result2, result3, result4) _ = all(results) should be(SubmissionResult.Acknowledged) - updates = Seq(update1, update2, update3) + updates = Seq(update1, update2, update3, update4) } yield { all(updates.map(_.recordTime)) should be >= rt @@ -367,8 +368,13 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i offset2 should be(toOffset(2)) matchTransaction(update2, firstCommandId) - offset3 should be(toOffset(4)) - matchTransaction(update3, secondCommandId) + offset3 should be(toOffset(3)) + inside(update3) { case CommandRejected(_, _, FinalReason(status)) => + status.code should be(Code.ALREADY_EXISTS.value) + } + + offset4 should be(toOffset(4)) + matchTransaction(update4, secondCommandId) } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index a626ded024fb..ee6061d2f767 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -16,10 +16,12 @@ import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{ } import com.daml.ledger.participant.state.kvutils.api.LedgerReader import com.daml.ledger.participant.state.v2.Update +import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.lf.data.Time.Timestamp import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code +import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.prop.TableFor4 @@ -85,13 +87,26 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { ) "outOfTimeBoundsEntryToUpdate" should { - "not generate an update for deduplicated entries" in { + "generate update only for rejected and deduplicated transaction" in { val testCases = Table( ("Time Bounds", "Record Time", "Log Entry Type", "Assertions"), ( TimeBounds(deduplicateUntil = Some(aRecordTime)), aRecordTime, TRANSACTION_REJECTION_ENTRY, + Assertions(update => + inside(update) { case Some(CommandRejected(_, _, FinalReason(status))) => + status.code shouldBe Code.ALREADY_EXISTS.value + () + } + ), + ), + ( + TimeBounds( + deduplicateUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))) + ), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, Assertions(), ), (