Skip to content

Commit

Permalink
Fix the Completions handling for the mutating schema
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
hubert-da committed Sep 14, 2021
1 parent 158b0b7 commit a8c575e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.daml.ledger.client.services.commands.tracker

import java.time.{Duration, Instant}

import akka.stream.stage._
import akka.stream.{Attributes, Inlet, Outlet}
import com.daml.grpc.{GrpcException, GrpcStatus}
Expand Down Expand Up @@ -299,7 +298,7 @@ private[commands] class CommandTracker[Context](
}

private def getOutputForCompletion(completion: Completion) = {
val (commandKey, errorText) = {
val (potentialCommandKey, errorText) = {
completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
TrackedCommandKey(
Expand All @@ -314,7 +313,25 @@ private[commands] class CommandTracker[Context](
}
}

logger.trace("Handling {} {}", errorText, completion.commandId: Any)
val commandKey = if (potentialCommandKey.submissionId.isEmpty) {
val potentialKeys =
pendingCommands.keys.filter(_.commandId == potentialCommandKey.commandId)
if (potentialKeys.size > 1) {
throw new IllegalStateException(
s"There are multiple pending commands for the id ${potentialCommandKey.commandId}. This can only happen for the mutating schema."
) with NoStackTrace
}
potentialKeys.head
} else {
potentialCommandKey
}

logger.trace(
"Handling {} {} from submission {}",
errorText,
completion.commandId,
completion.submissionId,
)
pendingCommands.remove(commandKey).map { t =>
Ctx(t.context, tracker.CompletionResponse(completion))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,58 @@ class CommandTrackerFlowTest
}
}

"a completion without submission id arrives" should {
"fail if there are multiple pending commands with the same command id" in {
val Handle(submissions, _, unhandledF, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)

submissions.sendNext(newSubmission("submissionId", "commandId"))
submissions.sendNext(newSubmission("anotherSubmissionId", "commandId"))

val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)

whenReady(unhandledF) { unhandled =>
unhandled should have size 2
unhandled should contain(
TrackedCommandKey("submissionId", "commandId") -> submission.context
)
unhandled should contain(
TrackedCommandKey("anotherSubmissionId", "commandId") -> submission.context
)
}
}

"output the completion" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)

submissions.sendNext(submission)

val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)

results.expectNext(
Ctx(context, Right(CompletionResponse.CompletionSuccess(commandId, "", successStatus)))
)
succeed
}
}

"a multitude of successful completions arrive for submitted commands" should {

"output all expected values" in {
Expand Down Expand Up @@ -552,7 +604,7 @@ class CommandTrackerFlowTest
_ <- checkOffset(LedgerOffset(Boundary(LEDGER_BEGIN)))
_ <- breakUntilOffsetArrives()
_ <- checkOffset(checkPointOffset)
_ <- sendCommand("submission-1", "command-2")
_ <- sendCommand("submission-2", "command-2")
} yield {
succeed
}
Expand Down

0 comments on commit a8c575e

Please sign in to comment.