Skip to content

Commit

Permalink
[ledger-api] Include the completion offset in the command_service.pro…
Browse files Browse the repository at this point in the history
…to responses [KVL-1171] (#11658)

CHANGELOG_BEGIN
[ledger-api] - Include the completion offset in the responses from the command_service.proto
CHANGELOG_END
  • Loading branch information
nicu-da authored Nov 18, 2021
1 parent e891180 commit 8f458d8
Show file tree
Hide file tree
Showing 26 changed files with 231 additions and 83 deletions.
12 changes: 12 additions & 0 deletions compatibility/bazel_tools/testing.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,18 @@ excluded_test_tool_tests = [
},
],
},
{
# Completion offset included in the CommandService responses
"start": "1.18.0-snapshot.20211111.8349.0.d938a44c",
"platform_ranges": [
{
"end": "1.18.0-snapshot.20211111.8349.0.d938a44c ",
"exclusions": [
"CommandServiceIT:CSsubmitAndWaitCompletionOffset",
],
},
],
},
]

def in_range(version, range):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
) =>
// Return a completion based on the input status code only on the first submission.
if (nrOfRetries == 0) {
Ctx(context, CompletionResponse(Completion(commands.commandId, Some(status))))
Ctx(
context,
CompletionResponse(
completion = Completion(commands.commandId, Some(status)),
checkpoint = None,
),
)
} else {
Ctx(
context,
Right(
CompletionResponse.CompletionSuccess(Completion(commands.commandId, Some(status)))
CompletionResponse.CompletionSuccess(
completion = Completion(commands.commandId, Some(status)),
checkpoint = None,
)
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,25 @@ message SubmitAndWaitForTransactionIdResponse {
// Must be a valid LedgerString (as described in ``value.proto``).
// Required
string transaction_id = 1;
// The format of this field is described in ``ledger_offset.proto``.
// Optional
string completion_offset = 2;
}

message SubmitAndWaitForTransactionResponse {
// The flat transaction that resulted from the submitted command.
// Required
Transaction transaction = 1;
// The format of this field is described in ``ledger_offset.proto``.
// Optional
string completion_offset = 2;
}

message SubmitAndWaitForTransactionTreeResponse {
// The transaction tree that resulted from the submitted command.
// Required
TransactionTree transaction = 1;
// The format of this field is described in ``ledger_offset.proto``.
// Optional
string completion_offset = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ final class CommandClientIT
): Future[(Set[String], Set[String])] =
readExpectedElements(
client.completionSource(submittingPartyList, checkpoint).collect {
case CompletionStreamElement.CompletionElement(c) => c.commandId
case CompletionStreamElement.CompletionElement(c, _) => c.commandId
},
expected,
timeLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.daml.ledger.api.v1.command_completion_service.{
CompletionStreamRequest,
CompletionStreamResponse,
}
import com.daml.ledger.api.v1.completion.Completion
import io.grpc.stub.StreamObserver

import scala.collection.immutable
Expand All @@ -22,7 +23,11 @@ object CommandCompletionSource {
): immutable.Iterable[CompletionStreamElement] = {

val completions: Vector[CompletionStreamElement] =
response.completions.view.map(CompletionStreamElement.CompletionElement).toVector
response.completions.view
.map((completion: Completion) =>
CompletionStreamElement.CompletionElement(completion, response.checkpoint)
)
.toVector
response.checkpoint.fold(completions)(cp =>
completions :+ CompletionStreamElement.CheckpointElement(cp)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object CompletionStreamElement {

final case class CheckpointElement(checkpoint: Checkpoint) extends CompletionStreamElement

final case class CompletionElement(completion: Completion) extends CompletionStreamElement
final case class CompletionElement(completion: Completion, checkpoint: Option[Checkpoint])
extends CompletionStreamElement

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.google.protobuf.empty.Empty
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.slf4j.LoggerFactory

import java.time.{Duration, Instant}

import com.daml.ledger.api.v1.command_completion_service.Checkpoint

import scala.annotation.nowarn
import scala.collection.compat._
import scala.collection.{immutable, mutable}
Expand Down Expand Up @@ -162,8 +164,8 @@ private[commands] class CommandTracker[Context](
case Left(submitResponse) =>
pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse))

case Right(CompletionStreamElement.CompletionElement(completion)) =>
pushResultOrPullCommandResultIn(getResponseForCompletion(completion))
case Right(CompletionStreamElement.CompletionElement(completion, checkpoint)) =>
pushResultOrPullCommandResultIn(getResponseForCompletion(completion, checkpoint))

case Right(CompletionStreamElement.CheckpointElement(checkpoint)) =>
if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
Expand Down Expand Up @@ -306,7 +308,8 @@ private[commands] class CommandTracker[Context](
}

private def getResponseForCompletion(
completion: Completion
completion: Completion,
checkpoint: Option[Checkpoint],
): Option[ContextualizedCompletionResponse[Context]] = {
val commandId = completion.commandId
val maybeSubmissionId = Option(completion.submissionId).filter(_.nonEmpty)
Expand All @@ -324,7 +327,10 @@ private[commands] class CommandTracker[Context](
val key = TrackedCommandKey(submissionId, completion.commandId)
val trackedCommandForCompletion = pendingCommands.remove(key)
trackedCommandForCompletion.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
Ctx(
trackingData.context,
tracker.CompletionResponse(completion = completion, checkpoint = checkpoint),
)
)
}
.getOrElse {
Expand All @@ -346,11 +352,12 @@ private[commands] class CommandTracker[Context](
Ctx(
t.context,
tracker.CompletionResponse(
Completion(
completion = Completion(
commandKey.commandId,
Some(status),
submissionId = commandKey.submissionId,
)
),
checkpoint = None,
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.client.services.commands.tracker

import com.daml.error.ContextualizedErrorLogger
import com.daml.grpc.GrpcStatus
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.platform.server.api.validation.ErrorFactories
Expand All @@ -18,7 +19,8 @@ object CompletionResponse {
/** Represents failures from executing submissions through gRPC.
*/
sealed trait CompletionFailure
final case class NotOkResponse(completion: Completion) extends CompletionFailure {
final case class NotOkResponse(completion: Completion, checkpoint: Option[Checkpoint])
extends CompletionFailure {
val commandId: String = completion.commandId
val grpcStatus: StatusProto = completion.getStatus
def metadata: Map[String, String] = Map(
Expand All @@ -27,7 +29,8 @@ object CompletionResponse {
}
final case class TimeoutResponse(commandId: String) extends CompletionFailure

final case class NoStatusInResponse(completion: Completion) extends CompletionFailure {
final case class NoStatusInResponse(completion: Completion, checkpoint: Option[Checkpoint])
extends CompletionFailure {
val commandId: String = completion.commandId
}

Expand All @@ -47,21 +50,25 @@ object CompletionResponse {
extends TrackedCompletionFailure

final case class CompletionSuccess(
completion: Completion
completion: Completion,
checkpoint: Option[Checkpoint],
) {
val commandId: String = completion.commandId
val transactionId: String = completion.transactionId
val originalStatus: StatusProto = completion.getStatus
}

def apply(completion: Completion): Either[CompletionFailure, CompletionSuccess] =
def apply(
completion: Completion,
checkpoint: Option[Checkpoint],
): Either[CompletionFailure, CompletionSuccess] =
completion.status match {
case Some(grpcStatus) if Code.OK.value() == grpcStatus.code =>
Right(CompletionSuccess(completion))
Right(CompletionSuccess(completion, checkpoint))
case Some(_) =>
Left(NotOkResponse(completion))
Left(NotOkResponse(completion, checkpoint))
case None =>
Left(NoStatusInResponse(completion))
Left(NoStatusInResponse(completion, checkpoint))
}

/** For backwards compatibility, clients that are too coupled to [[Completion]] as a type can convert back from [[Either[CompletionFailure, CompletionSuccess]]]
Expand All @@ -70,14 +77,14 @@ object CompletionResponse {
response match {
case Left(failure) =>
failure match {
case NotOkResponse(completion) =>
case NotOkResponse(completion, _) =>
completion
case TimeoutResponse(commandId) =>
Completion(
commandId = commandId,
status = Some(StatusProto(Code.ABORTED.value(), "Timeout")),
)
case NoStatusInResponse(completion) =>
case NoStatusInResponse(completion, _) =>
completion
}
case Right(success) =>
Expand Down Expand Up @@ -112,7 +119,7 @@ object CompletionResponse {
GrpcStatus.buildStatus(metadata, statusBuilder)
case CompletionResponse.TimeoutResponse(_) =>
errorFactories.SubmissionQueueErrors.timedOutOnAwaitingForCommandCompletion()
case CompletionResponse.NoStatusInResponse(_) =>
case CompletionResponse.NoStatusInResponse(_, _) =>
errorFactories.SubmissionQueueErrors.noStatusInCompletionResponse()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ class CommandTrackerFlowTest

results.expectNoMessage(3.seconds)

completionStreamMock.send(CompletionStreamElement.CompletionElement(abortedCompletion))
completionStreamMock.send(
CompletionStreamElement.CompletionElement(abortedCompletion, None)
)
results.requestNext().value shouldEqual Left(
failureCompletion(Code.ABORTED)
)
Expand All @@ -272,7 +274,9 @@ class CommandTrackerFlowTest

submissions.sendNext(submission)

completionStreamMock.send(CompletionStreamElement.CompletionElement(abortedCompletion))
completionStreamMock.send(
CompletionStreamElement.CompletionElement(abortedCompletion, None)
)
results.requestNext().value shouldEqual Left(
failureCompletion(Code.ABORTED)
)
Expand Down Expand Up @@ -479,7 +483,7 @@ class CommandTrackerFlowTest
Some(status),
submissionId = submissionId,
)
completionStreamMock.send(CompletionStreamElement.CompletionElement(failedCompletion))
completionStreamMock.send(CompletionStreamElement.CompletionElement(failedCompletion, None))

results.expectNext(
Ctx(
Expand Down Expand Up @@ -640,7 +644,8 @@ class CommandTrackerFlowTest
submissionId: String = submissionId,
) =
CompletionResponse.CompletionSuccess(
Completion(commandId, Some(successStatus), submissionId = submissionId)
Completion(commandId, Some(successStatus), submissionId = submissionId),
None,
)

private def failureCompletion(
Expand All @@ -653,7 +658,8 @@ class CommandTrackerFlowTest
commandId = commandId,
status = Some(StatusProto(code.value, message)),
submissionId = submissionId,
)
),
None,
)

private def commandWithIds(submissionId: String, commandId: String) = {
Expand All @@ -665,7 +671,8 @@ class CommandTrackerFlowTest

private def successfulStreamCompletion(submissionId: String, commandId: String) =
CompletionStreamElement.CompletionElement(
Completion(commandId, Some(successStatus), submissionId = submissionId)
Completion(commandId, Some(successStatus), submissionId = submissionId),
None,
)

private def checkPoint(ledgerOffset: LedgerOffset) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ class CompletionResponseTest extends AnyWordSpec with Matchers {

"match successful completion" in {
val completionWithTransactionId = completion.update(_.transactionId := "transactionId")
val response = CompletionResponse(completionWithTransactionId)
val response = CompletionResponse(completionWithTransactionId, None)
response shouldBe a[Right[_, _]]
CompletionResponse.toCompletion(response) shouldEqual completionWithTransactionId
}

"match not ok status" in {
val failedCodeCompletion = completion.update(_.status.code := Code.INTERNAL.value())
val response =
CompletionResponse(failedCodeCompletion)
CompletionResponse(failedCodeCompletion, None)
response should matchPattern { case Left(_: NotOkResponse) => }
CompletionResponse.toCompletion(response) shouldEqual failedCodeCompletion
}

"handle missing status" in {
val noStatusCodeCompletion = completion.update(_.optionalStatus := None)
val response =
CompletionResponse(noStatusCodeCompletion)
CompletionResponse(noStatusCodeCompletion, None)
response should matchPattern { case Left(_: NoStatusInResponse) => }
CompletionResponse.toCompletion(response) shouldEqual noStatusCodeCompletion

Expand Down Expand Up @@ -121,7 +121,8 @@ class CompletionResponseTest extends AnyWordSpec with Matchers {
details = Seq.empty,
)
),
)
),
None,
)
),
errorFactories,
Expand Down Expand Up @@ -150,7 +151,8 @@ class CompletionResponseTest extends AnyWordSpec with Matchers {
),
)
),
)
),
None,
)
),
errorFactories,
Expand Down Expand Up @@ -179,7 +181,8 @@ class CompletionResponseTest extends AnyWordSpec with Matchers {
),
)
),
)
),
None,
)
),
errorFactories,
Expand Down
3 changes: 2 additions & 1 deletion ledger/ledger-api-test-tool-on-canton/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ conformance_test(
"--exclude=ConfigManagementServiceIT,LedgerConfigurationServiceIT" + # dynamic config management not supported by Canton
",ParticipantPruningIT" + # see "conformance-test-participant-pruning" below
",ClosedWorldIT" + # Canton currently fails this test with a different error (missing namespace in "unallocated" party id)
",CommandServiceIT:CSsubmitAndWaitCompletionOffset" + # Canton does not fill the offsets
# Excluding tests that require contract key uniqueness and RWArchiveVsFailedLookupByKey (finding a lookup failure after contract creation)
",RaceConditionIT:WWDoubleNonTransientCreate,RaceConditionIT:WWArchiveVsNonTransientCreate,RaceConditionIT:RWTransientCreateVsNonTransientCreate,RaceConditionIT:RWArchiveVsFailedLookupByKey" +
",RaceConditionIT:RWArchiveVsLookupByKey,RaceConditionIT:RWArchiveVsNonConsumingChoice,RaceConditionIT:RWArchiveVsFetch,RaceConditionIT:WWDoubleArchive" +
Expand All @@ -96,11 +97,11 @@ conformance_test(
",CompletionDeduplicationInfoITCommandSubmissionService" +
",CommandDeduplicationParallelIT" +
",ContractIdIT" +
",KVCommandDeduplicationIT" + # only for KV-utils
",MultiPartySubmissionIT" +
",ParticipantPruningIT" +
",TLSOnePointThreeIT" +
",TLSAtLeastOnePointTwoIT" +
",KVCommandDeduplicationIT" + # only for KV-utils
",MonotonicRecordTimeIT", # KV-utils specific
],
) if not is_windows else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ object Features {
Features(selfServiceErrorCodesFeature.toList)
}
}

case class Features(features: Seq[Feature]) {
val selfServiceErrorCodes: Boolean = SelfServiceErrorCodes.enabled(features)
}
Expand Down
Loading

0 comments on commit 8f458d8

Please sign in to comment.