Skip to content

Commit

Permalink
Handle ledger bridge Queue errors (#12510)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
tudor-da authored Jan 26, 2022
1 parent b1a9175 commit 3e16e5e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
throwableO = Some(t),
)

case class CommandTrackerInternalError(
case class Generic(
message: String,
override val throwableO: Option[Throwable] = None,
)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
GrpcStatus.buildStatus(Map.empty, statusBuilder)
},
v2 = LedgerApiErrors.InternalError
.CommandTrackerInternalError(
.Generic(
message = s"$message: ${t.getClass.getSimpleName}: ${t.getMessage}",
throwableO = Some(t),
)
Expand Down Expand Up @@ -93,7 +93,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
.build()
},
v2 = LedgerApiErrors.InternalError
.CommandTrackerInternalError(
.Generic(
"Missing status in completion response.",
throwableO = None,
)
Expand Down Expand Up @@ -535,7 +535,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
.setMessage(msg)
grpcError(builder.build())
},
v2 = LedgerApiErrors.InternalError.CommandTrackerInternalError(msg).asGrpcError,
v2 = LedgerApiErrors.InternalError.Generic(msg).asGrpcError,
)

private def invalidArgumentV1(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import akka.NotUsed
import akka.stream.scaladsl.Sink
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.definitions.LedgerApiErrors
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
Expand All @@ -18,8 +20,6 @@ import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.InstrumentedSource
import com.daml.telemetry.TelemetryContext
import com.google.rpc.code.Code
import com.google.rpc.status.Status

import java.util.concurrent.{CompletableFuture, CompletionStage}

Expand Down Expand Up @@ -134,7 +134,7 @@ class BridgeWriteService(
}

private def submit(submission: Submission): CompletionStage[SubmissionResult] =
toSubmissionResult(queue.offer(submission))
toSubmissionResult(submission.submissionId, queue.offer(submission))

override def close(): Unit = {
logger.info("Shutting down BridgeLedgerFactory.")
Expand All @@ -146,36 +146,39 @@ object BridgeWriteService {
private[this] val logger = ContextualizedLogger.get(getClass)

def toSubmissionResult(
queueOfferResult: QueueOfferResult
)(implicit loggingContext: LoggingContext): CompletableFuture[SubmissionResult] =
submissionId: Ref.SubmissionId,
queueOfferResult: QueueOfferResult,
)(implicit
loggingContext: LoggingContext
): CompletableFuture[SubmissionResult] = {
implicit val errorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))

CompletableFuture.completedFuture(
queueOfferResult match {
case QueueOfferResult.Enqueued => SubmissionResult.Acknowledged
case QueueOfferResult.Dropped =>
logger.warn(
"Buffer overflow: new submission is not added, signalized `Overloaded` for caller."
)
SubmissionResult.SynchronousError(
Status(
Code.ABORTED.value // TODO SoX: Use error codes
)
LedgerApiErrors.ParticipantBackpressure
.Rejection("Sandbox-on-X ledger bridge submission buffer is full")
.rpcStatus(Some(submissionId))
)
case QueueOfferResult.Failure(throwable) =>
logger.error("Error enqueueing new submission.", throwable)
SubmissionResult.SynchronousError(
Status(
Code.INTERNAL.value,
throwable.getMessage,
)
LedgerApiErrors.InternalError
.Generic(
message = s"Failed to enqueue submission in the Sandbox-on-X ledger bridge",
throwableO = Some(throwable),
)
.rpcStatus(Some(submissionId))
)
case QueueOfferResult.QueueClosed =>
logger.error("Error enqueueing new submission: queue is closed.")
SubmissionResult.SynchronousError(
Status(
Code.INTERNAL.value,
"Queue is closed",
)
LedgerApiErrors.ServiceNotRunning
.Reject("Sandbox-on-X ledger bridge")
.rpcStatus(None)
)
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package com.daml.ledger.sandbox.domain
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.v2.{SubmitterInfo, TransactionMeta}
import com.daml.lf.data.Ref.SubmissionId
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext

private[sandbox] sealed trait Submission extends Product with Serializable {
def submissionId: Ref.SubmissionId
def loggingContext: LoggingContext
}

Expand All @@ -21,15 +23,20 @@ private[sandbox] object Submission {
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit val loggingContext: LoggingContext)
extends Submission
extends Submission {
val submissionId: SubmissionId = {
// TODO SoX production-ready: Make the submissionId non-optional
// .get deemed safe since no transaction submission should have the submission id empty
submitterInfo.submissionId.get
}
}

final case class Config(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit val loggingContext: LoggingContext)
extends Submission

final case class AllocateParty(
hint: Option[Ref.Party],
displayName: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BridgeWriteServiceTest extends AnyFlatSpec with MockitoSugar with Matchers
applicationId = Ref.ApplicationId.assertFromString("a0"),
commandId = Ref.CommandId.assertFromString("c0"),
deduplicationPeriod = DeduplicationPeriod.DeduplicationOffset(Offset.beforeBegin),
submissionId = None,
submissionId = Some(Ref.SubmissionId.assertFromString("some-submission-id")),
ledgerConfiguration = Configuration.reasonableInitialConfiguration,
)

Expand Down

0 comments on commit 3e16e5e

Please sign in to comment.