Skip to content

Commit

Permalink
kvutils - For duplicate command rejections, add the submission id as …
Browse files Browse the repository at this point in the history
…metadata [KVL-1175] (#11848)

changelog_begin
kvutils - For duplicate command rejections, the submission id of the already accepted transaction is returning as part of the gRPC metadata. The submission id will be included under the key `existing_submission_id`.
changelog_end
  • Loading branch information
nicu-da authored Nov 24, 2021
1 parent 970243d commit 59eb0d2
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,17 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
ErrorCategory.InvalidGivenCurrentSystemStateResourceExists,
) {

case class Reject(override val definiteAnswer: Boolean = false)(implicit
case class Reject(
override val definiteAnswer: Boolean = false,
existingCommandSubmissionId: Option[String],
)(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
cause = "A command with the given command id has already been successfully processed"
)
) {
override def context: Map[String, String] =
super.context ++ existingCommandSubmissionId.map("existing_submission_id" -> _).toList
}
}

@Explanation("An input contract has been archived by a concurrent transaction submission.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
v2 = LedgerApiErrors.InternalError.VersionService(message).asGrpcError,
)

def duplicateCommandException(implicit
def duplicateCommandException(existingSubmissionId: Option[String])(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
Expand All @@ -183,7 +183,9 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
contextualizedErrorLogger.info(exception.getMessage)
exception
},
v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand.Reject().asGrpcError,
v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand
.Reject(existingCommandSubmissionId = existingSubmissionId)
.asGrpcError,
)

/** @param expected Expected ledger id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class ErrorFactoriesSpec
}

"return the DuplicateCommandException" in {
assertVersionedError(_.duplicateCommandException)(
assertVersionedError(_.duplicateCommandException(None))(
v1_code = Code.ALREADY_EXISTS,
v1_message = "Duplicate command",
v1_details = Seq(definiteAnswers(false)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException
errorFactories.duplicateCommandException(None)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message InvalidLedgerTime {
// key during its implementation specific deduplication window.
message Duplicate {
string details = 1;
string submission_id = 2;
}

// A party mentioned as a stakeholder or actor has not been on-boarded on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ message DamlCommandDedupValue {
google.protobuf.Timestamp record_time = 3;
PreExecutionDeduplicationBounds record_time_bounds = 4;
}
string submission_id = 5;
}
message PreExecutionDeduplicationBounds {
// record_time is not available during pre-execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ object KeyValueConsumption {
recordTime,
rejectionEntry,
errorVersionSwitch,
None, // Not available for historical entries
)(contextualizedErrorLogger(loggingContext, rejectionEntry))
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[transaction] object CommandDeduplication {
maybeDedupValue,
)
} else {
duplicateRejection(commitContext, transactionEntry.submitterInfo)
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
}
}
}
Expand Down Expand Up @@ -156,22 +156,28 @@ private[transaction] object CommandDeduplication {
buildDuration(rejectionDeduplicationDuration)
)
.build,
maybeDedupValue,
)
case None =>
duplicateRejection(commitContext, transactionEntry.submitterInfo)
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
}
}

private def duplicateRejection(
commitContext: CommitContext,
submitterInfo: DamlSubmitterInfo,
dedupValue: Option[DamlCommandDedupValue],
)(implicit loggingContext: LoggingContext) = {
rejections.reject(
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(submitterInfo)
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
.setDefiniteAnswer(false)
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
.setDuplicateCommand(
Duplicate.newBuilder
.setDetails("")
.setSubmissionId(dedupValue.map(_.getSubmissionId).getOrElse(""))
),
"the command is a duplicate",
commitContext.recordTime,
)
Expand All @@ -186,7 +192,9 @@ private[transaction] object CommandDeduplication {
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
throw Err.InvalidSubmission("Deduplication duration is not set.")
}
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
val commandDedupBuilder = DamlCommandDedupValue.newBuilder.setSubmissionId(
transactionEntry.submitterInfo.getSubmissionId
)
commitContext.recordTime
.map(Conversions.buildTimestamp) match {
case Some(recordTime) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private[kvutils] object TransactionRejections {
recordTime: Timestamp,
rejectionEntry: DamlTransactionRejectionEntry,
errorVersionSwitch: ValueSwitch,
existingCommandSubmissionId: Option[String],
)(implicit loggingContext: ContextualizedErrorLogger): Update.CommandRejected = {
val definiteAnswer = rejectionEntry.getDefiniteAnswer
Update.CommandRejected(
Expand All @@ -82,7 +83,7 @@ private[kvutils] object TransactionRejections {
reasonTemplate = FinalReason(
errorVersionSwitch.choose(
V1.duplicateCommandsRejectionStatus(definiteAnswer, Code.ALREADY_EXISTS),
V2.duplicateCommandsRejectionStatus(definiteAnswer),
V2.duplicateCommandsRejectionStatus(definiteAnswer, existingCommandSubmissionId),
)
),
)
Expand Down Expand Up @@ -302,15 +303,19 @@ private[kvutils] object TransactionRejections {
def duplicateCommandStatus(
entry: DamlTransactionRejectionEntry,
errorVersionSwitch: ValueSwitch,
)(implicit loggingContext: ContextualizedErrorLogger): Status =
)(implicit loggingContext: ContextualizedErrorLogger): Status = {
val rejectionReason = entry.getDuplicateCommand
errorVersionSwitch.choose(
V1.status(
entry,
Code.ALREADY_EXISTS,
"Duplicate commands",
),
V2.duplicateCommandsRejectionStatus(),
V2.duplicateCommandsRejectionStatus(existingCommandSubmissionId =
Some(rejectionReason.getSubmissionId).filter(_.nonEmpty)
),
)
}

@nowarn("msg=deprecated")
def submitterCannotActViaParticipantStatus(
Expand Down Expand Up @@ -515,11 +520,12 @@ private[kvutils] object TransactionRejections {
.asStatus

def duplicateCommandsRejectionStatus(
definiteAnswer: Boolean = false
definiteAnswer: Boolean = false,
existingCommandSubmissionId: Option[String],
)(implicit loggingContext: ContextualizedErrorLogger): Status =
GrpcStatus.toProto(
LedgerApiErrors.ConsistencyErrors.DuplicateCommand
.Reject(definiteAnswer)
.Reject(definiteAnswer, existingCommandSubmissionId)
.asGrpcStatusFromContext
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues {
Map.empty,
),
(
_.setDuplicateCommand(Duplicate.newBuilder()),
_.setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("not_used")),
Code.ALREADY_EXISTS,
Map.empty,
Map(),
),
(
_.setSubmitterCannotActViaParticipant(
Expand Down Expand Up @@ -496,6 +496,23 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues {
}
}

"decode duplicate command v2" in {
val finalReason = Conversions
.decodeTransactionRejectionEntry(
DamlTransactionRejectionEntry
.newBuilder()
.setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("submissionId"))
.build(),
v2ErrorSwitch,
)
finalReason.code shouldBe Code.ALREADY_EXISTS.value()
finalReason.definiteAnswer shouldBe false
val actualDetails = finalReasonDetails(finalReason)
actualDetails should contain allElementsOf Map(
"existing_submission_id" -> "submissionId"
)
}

"decode completion info" should {
val recordTime = LfTimestamp.now()
def submitterInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ class CommandDeduplicationSpec
deduplicateStepHasTransactionRejectionEntry(context)
}
}

"include the submission id in the rejection" in {
val submissionId = "submissionId"
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(builder =>
timeSetter(timestamp)(builder.setSubmissionId(submissionId))
)
)
)
val rejection = deduplicateStepHasTransactionRejectionEntry(context)
rejection.getDuplicateCommand.getSubmissionId shouldBe submissionId
}

}
}
}
Expand Down Expand Up @@ -283,6 +297,23 @@ class CommandDeduplicationSpec
) shouldBe recordTime
}

"set the submission id in the dedup value" in {
val submissionId = "submissionId"
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration))
.setSubmissionId(submissionId),
Some(timestamp),
)
setDeduplicationEntryStep(context, transactionEntrySummary)
deduplicateValueStoredInContext(context, transactionEntrySummary)
.map(
_.getSubmissionId
)
.value shouldBe submissionId
}

"throw an error for missing record time bounds" in {
val (context, transactionEntrySummary) =
buildContextAndTransaction(
Expand Down

0 comments on commit 59eb0d2

Please sign in to comment.