Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CommandTracker distinguish submissions of the same command using submissionId [KVL-1104] #10868

Merged
merged 7 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
package com.daml.ledger.client.services.commands

import java.time.Duration

import akka.NotUsed
import akka.stream.scaladsl.{Concat, Flow, GraphDSL, Merge, Source}
import akka.stream.{DelayOverflowStrategy, FlowShape, OverflowStrategy}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.services.commands.tracker.CommandTracker
import com.daml.ledger.client.services.commands.tracker.{TrackedCommandKey, CommandTracker}
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
Expand All @@ -32,13 +31,13 @@ object CommandTrackerFlow {

final case class Materialized[SubmissionMat, Context](
submissionMat: SubmissionMat,
trackingMat: Future[immutable.Map[String, Context]],
trackingMat: Future[immutable.Map[TrackedCommandKey, Context]],
)

def apply[Context, SubmissionMat](
commandSubmissionFlow: Flow[
Ctx[(Context, String), CommandSubmission],
Ctx[(Context, String), Try[
Ctx[(Context, TrackedCommandKey), CommandSubmission],
Ctx[(Context, TrackedCommandKey), Try[
Empty
]],
SubmissionMat,
Expand All @@ -62,12 +61,13 @@ object CommandTrackerFlow {
implicit builder => (submissionFlow, tracker) =>
import GraphDSL.Implicits._

val wrapResult = builder.add(Flow[Ctx[(Context, String), Try[Empty]]].map(Left.apply))
val wrapResult =
builder.add(Flow[Ctx[(Context, TrackedCommandKey), Try[Empty]]].map(Left.apply))

val wrapCompletion = builder.add(Flow[CompletionStreamElement].map(Right.apply))

val merge = builder.add(
Merge[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]](
Merge[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
inputPorts = 2,
eagerComplete = false,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.ledger.client.services.commands.tracker

import java.time.{Duration, Instant}

import akka.stream.stage._
import akka.stream.{Attributes, Inlet, Outlet}
import com.daml.grpc.{GrpcException, GrpcStatus}
Expand All @@ -15,6 +13,7 @@ import com.daml.ledger.client.services.commands.tracker.CommandTracker._
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.services.commands.{
CommandSubmission,
Expand All @@ -27,6 +26,7 @@ import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.slf4j.LoggerFactory

import java.time.{Duration, Instant}
import scala.annotation.nowarn
import scala.collection.compat._
import scala.collection.{immutable, mutable}
Expand Down Expand Up @@ -57,27 +57,30 @@ private[commands] class CommandTracker[Context](
timeoutDetectionPeriod: FiniteDuration,
) extends GraphStageWithMaterializedValue[
CommandTrackerShape[Context],
Future[Map[String, Context]],
Future[Map[TrackedCommandKey, Context]],
] {

private val logger = LoggerFactory.getLogger(this.getClass.getName)

val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] =
Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn")
val submitRequestOut: Outlet[Ctx[(Context, String), CommandSubmission]] =
Outlet[Ctx[(Context, String), CommandSubmission]]("submitRequestOut")
val commandResultIn: Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]] =
Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]]("commandResultIn")
val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] =
Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]]("submitRequestOut")
val commandResultIn
: Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]] =
Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
"commandResultIn"
)
val resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] =
Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]("resultOut")
val offsetOut: Outlet[LedgerOffset] =
Outlet[LedgerOffset]("offsetOut")

override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes
): (GraphStageLogic, Future[Map[String, Context]]) = {
): (GraphStageLogic, Future[Map[TrackedCommandKey, Context]]) = {

val promise = Promise[immutable.Map[String, Context]]()
val promise = Promise[immutable.Map[TrackedCommandKey, Context]]()

val logic: TimerGraphStageLogic = new TimerGraphStageLogic(shape) {

Expand All @@ -96,7 +99,7 @@ private[commands] class CommandTracker[Context](
}
}

private val pendingCommands = new mutable.HashMap[String, TrackingData[Context]]()
private val pendingCommands = new mutable.HashMap[TrackedCommandKey, TrackingData[Context]]()

setHandler(
submitRequestOut,
Expand All @@ -116,11 +119,16 @@ private[commands] class CommandTracker[Context](
override def onPush(): Unit = {
val submitRequest = grab(submitRequestIn)
registerSubmission(submitRequest)
logger.trace(
"Submitted command {}",
submitRequest.value.commands.commandId,
val commands = submitRequest.value.commands
val submissionId = commands.submissionId
val commandId = commands.commandId
logger.trace(s"Submitted command $commandId in submission $submissionId.")
push(
submitRequestOut,
submitRequest.enrich((context, _) =>
context -> TrackedCommandKey(submissionId, commandId)
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
),
)
push(submitRequestOut, submitRequest.enrich(_ -> _.commands.commandId))
}

override def onUpstreamFinish(): Unit = {
Expand Down Expand Up @@ -177,7 +185,7 @@ private[commands] class CommandTracker[Context](
)

private def pushResultOrPullCommandResultIn(
compl: Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]
compl: immutable.Iterable[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
): Unit = {
// The command tracker detects timeouts outside the regular pull/push
// mechanism of the input/output ports. Basically the timeout
Expand All @@ -186,7 +194,11 @@ private[commands] class CommandTracker[Context](
// even though it hasn't been pulled again in the meantime. Using `emit`
// instead of `push` when a completion arrives makes akka take care of
// handling the signaling properly.
compl.fold(if (!hasBeenPulled(commandResultIn)) pull(commandResultIn))(emit(resultOut, _))
if (compl.isEmpty && !hasBeenPulled(commandResultIn)) {
pull(commandResultIn)
} else {
emitMultiple(resultOut, compl)
}
}

private def completeStageIfTerminal(): Unit = {
Expand All @@ -197,33 +209,38 @@ private[commands] class CommandTracker[Context](

import CommandTracker.nonTerminalCodes

private def handleSubmitResponse(submitResponse: Ctx[(Context, String), Try[Empty]]) = {
val Ctx((_, commandId), value, _) = submitResponse
private def handleSubmitResponse(
submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]]
): immutable.Iterable[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
val Ctx((_, commandKey), value, _) = submitResponse
value match {
case Failure(GrpcException(status @ GrpcStatus(code, _), metadata))
if !nonTerminalCodes(code) =>
getOutputForTerminalStatusCode(commandId, GrpcStatus.toProto(status, metadata))
getOutputForTerminalStatusCode(commandKey, GrpcStatus.toProto(status, metadata))
case Failure(throwable) =>
logger.warn(
s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...",
throwable,
)
None
immutable.Iterable.empty
case Success(_) =>
logger.trace("Received confirmation that command {} was accepted.", commandId)
None
logger.trace(
s"Received confirmation that command ${commandKey.commandId} from submission ${commandKey.submissionId} was accepted."
)
immutable.Iterable.empty
}
}

@nowarn("msg=deprecated")
private def registerSubmission(submission: Ctx[Context, CommandSubmission]): Unit = {
val commands = submission.value.commands
val submissionId = commands.submissionId
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
val commandId = commands.commandId
logger.trace("Begin tracking of command {}", commandId)
if (pendingCommands.contains(commandId)) {
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
if (pendingCommands.contains(TrackedCommandKey(submissionId, commandId))) {
// TODO return an error identical to the server side duplicate command error once that's defined.
throw new IllegalStateException(
s"A command with id $commandId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
s"A command $commandId from a submission $submissionId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
) with NoStackTrace
Comment on lines +239 to 243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we have any gaps in testing though for this? Wondering how come conformance tests were green considering this code. Shouldn't they throw an error for 2 submissions with the same command id currently?

Also that TODO, should we throw an ApiException for duplicate command from here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback!

Do you think we have any gaps in testing though for this? Wondering how come conformance tests were green considering this code. Shouldn't they throw an error for 2 submissions with the same command id currently?

Not yet sure, I was going to think about this tomorrow, as I'm also not sure about some of these changes at all. Partially because I don't know Akka Streams Graph API 😄 I submitted it mostly as an FYI so far.

Also that TODO, should we throw an ApiException for duplicate command from here?

Was also thinking about this. Will keep you posted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, let me know if you need any help with the Graph API, spent a lot of time writing akka graphs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that TODO, should we throw an ApiException for duplicate command from here?

Not sure what to do, as this is a Unit method. Maybe let's keep it as is for now and I can do something about this in a separate PR.

}
val commandTimeout = submission.value.timeout match {
Expand All @@ -249,19 +266,20 @@ private[commands] class CommandTracker[Context](
commandTimeout = Instant.now().plus(commandTimeout),
context = submission.context,
)
pendingCommands += commandId -> trackingData
pendingCommands += TrackedCommandKey(submissionId, commandId) -> trackingData
()
}

private def getOutputForTimeout(instant: Instant) = {
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("Checking timeouts at {}", instant)
pendingCommands.view
.flatMap { case (commandId, trackingData) =>
.flatMap { case (commandKey, trackingData) =>
if (trackingData.commandTimeout.isBefore(instant)) {
pendingCommands -= commandId
pendingCommands -= commandKey
logger.info(
s"Command {} (command timeout {}) timed out at checkpoint {}.",
commandId,
s"Command {} from submission {} (command timeout {}) timed out at checkpoint {}.",
commandKey.commandId,
commandKey.submissionId,
trackingData.commandTimeout,
instant,
)
Expand All @@ -275,39 +293,91 @@ private[commands] class CommandTracker[Context](
Nil
}
}
.to(immutable.Seq)
.to(immutable.Iterable)
}

private def getOutputForCompletion(completion: Completion) = {
val (commandId, errorText) = {
completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
completion.commandId -> "successful completion of command"
case _ =>
completion.commandId -> "failed completion of command"
}
private def getOutputForCompletion(
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
completion: Completion
): immutable.Iterable[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
val completionDescription = completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
"successful completion of command"
case _ => "failed completion of command"
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
}

logger.trace("Handling {} {}", errorText, completion.commandId: Any)
pendingCommands.remove(commandId).map { t =>
Ctx(t.context, tracker.CompletionResponse(completion))
val commandId = completion.commandId
logger.trace(
"Handling {} {} from submission {}.",
completionDescription,
commandId,
completion.submissionId,
)

val maybeSubmissionId = Option(completion.submissionId).collect {
case id if id.nonEmpty => id
}
val trackedCommands =
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
pendingCommandKeys(maybeSubmissionId, commandId).flatMap(pendingCommands.remove(_).toList)

if (trackedCommands.size > 1) {
trackedCommands.map { trackingData =>
Ctx(
trackingData.context,
Left(
NotOkResponse(
commandId,
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.",
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
Seq.empty,
),
)
),
)
}
} else {
trackedCommands.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
)
}
}

private def getOutputForTerminalStatusCode(
private def pendingCommandKeys(
submissionId: Option[String],
commandId: String,
) =
submissionId.map(id => immutable.Iterable(TrackedCommandKey(id, commandId))).getOrElse {
pendingCommands.keys.filter(_.commandId == commandId).to(immutable.Iterable)
}

private def getOutputForTerminalStatusCode(
commandKey: TrackedCommandKey,
status: StatusProto,
): Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
logger.trace("Handling failure of command {}", commandId)
): immutable.Iterable[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
logger.trace(
s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
pendingCommands
.remove(commandId)
.remove(commandKey)
.map { t =>
Ctx(t.context, tracker.CompletionResponse(Completion(commandId, Some(status))))
Ctx(
t.context,
tracker.CompletionResponse(
Completion(
commandKey.commandId,
Some(status),
submissionId = commandKey.submissionId,
)
),
)
}
.orElse {
logger.trace("Platform signaled failure for unknown command {}", commandId)
logger.trace(
s"Platform signaled failure for unknown command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
None
}
.toList
}

override def postStop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import scala.util.Try

private[tracker] final case class CommandTrackerShape[Context](
submitRequestIn: Inlet[Ctx[Context, CommandSubmission]],
submitRequestOut: Outlet[Ctx[(Context, String), CommandSubmission]],
commandResultIn: Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]],
submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]],
commandResultIn: Inlet[
Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]
],
resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]],
offsetOut: Outlet[LedgerOffset],
) extends Shape {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.client.services.commands.tracker

case class TrackedCommandKey(submissionId: String, commandId: String)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.daml.ledger.api.validation.CommandsValidator
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
Expand Down Expand Up @@ -149,7 +150,7 @@ private[daml] final class CommandClient(
.via(commandUpdaterFlow[Context](ledgerIdToUse))
.viaMat(
CommandTrackerFlow[Context, NotUsed](
commandSubmissionFlow = CommandSubmissionFlow[(Context, String)](
commandSubmissionFlow = CommandSubmissionFlow[(Context, TrackedCommandKey)](
submit(token),
config.maxParallelSubmissions,
),
Expand Down
Loading