From 3e16e5ec98b78503035cfc243ac93d0047e44f77 Mon Sep 17 00:00:00 2001 From: tudor-da Date: Wed, 26 Jan 2022 19:48:00 +0100 Subject: [PATCH] Handle ledger bridge Queue errors (#12510) changelog_begin changelog_end --- .../error/definitions/LedgerApiErrors.scala | 2 +- .../api/validation/ErrorFactories.scala | 6 +-- .../ledger/sandbox/BridgeWriteService.scala | 45 ++++++++++--------- .../ledger/sandbox/domain/Submission.scala | 11 ++++- .../sandbox/BridgeWriteServiceTest.scala | 2 +- 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index a3fe584fc2ab..b4552c31f642 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -591,7 +591,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup { throwableO = Some(t), ) - case class CommandTrackerInternalError( + case class Generic( message: String, override val throwableO: Option[Throwable] = None, )(implicit diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index 625b3b277d05..8cb307bc6943 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -43,7 +43,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch GrpcStatus.buildStatus(Map.empty, statusBuilder) }, v2 = LedgerApiErrors.InternalError - .CommandTrackerInternalError( + .Generic( message = s"$message: ${t.getClass.getSimpleName}: ${t.getMessage}", throwableO = Some(t), ) @@ -93,7 +93,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch .build() }, v2 = LedgerApiErrors.InternalError - .CommandTrackerInternalError( + .Generic( "Missing status in completion response.", throwableO = None, ) @@ -535,7 +535,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch .setMessage(msg) grpcError(builder.build()) }, - v2 = LedgerApiErrors.InternalError.CommandTrackerInternalError(msg).asGrpcError, + v2 = LedgerApiErrors.InternalError.Generic(msg).asGrpcError, ) private def invalidArgumentV1( diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala index 1b3cad926bef..9f737ca26767 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/BridgeWriteService.scala @@ -7,6 +7,8 @@ import akka.NotUsed import akka.stream.scaladsl.Sink import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} import com.daml.daml_lf_dev.DamlLf.Archive +import com.daml.error.definitions.LedgerApiErrors +import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset @@ -18,8 +20,6 @@ import com.daml.lf.transaction.SubmittedTransaction import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.InstrumentedSource import com.daml.telemetry.TelemetryContext -import com.google.rpc.code.Code -import com.google.rpc.status.Status import java.util.concurrent.{CompletableFuture, CompletionStage} @@ -134,7 +134,7 @@ class BridgeWriteService( } private def submit(submission: Submission): CompletionStage[SubmissionResult] = - toSubmissionResult(queue.offer(submission)) + toSubmissionResult(submission.submissionId, queue.offer(submission)) override def close(): Unit = { logger.info("Shutting down BridgeLedgerFactory.") @@ -146,36 +146,39 @@ object BridgeWriteService { private[this] val logger = ContextualizedLogger.get(getClass) def toSubmissionResult( - queueOfferResult: QueueOfferResult - )(implicit loggingContext: LoggingContext): CompletableFuture[SubmissionResult] = + submissionId: Ref.SubmissionId, + queueOfferResult: QueueOfferResult, + )(implicit + loggingContext: LoggingContext + ): CompletableFuture[SubmissionResult] = { + implicit val errorLogger: ContextualizedErrorLogger = + new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) + CompletableFuture.completedFuture( queueOfferResult match { case QueueOfferResult.Enqueued => SubmissionResult.Acknowledged case QueueOfferResult.Dropped => - logger.warn( - "Buffer overflow: new submission is not added, signalized `Overloaded` for caller." - ) SubmissionResult.SynchronousError( - Status( - Code.ABORTED.value // TODO SoX: Use error codes - ) + LedgerApiErrors.ParticipantBackpressure + .Rejection("Sandbox-on-X ledger bridge submission buffer is full") + .rpcStatus(Some(submissionId)) ) case QueueOfferResult.Failure(throwable) => - logger.error("Error enqueueing new submission.", throwable) SubmissionResult.SynchronousError( - Status( - Code.INTERNAL.value, - throwable.getMessage, - ) + LedgerApiErrors.InternalError + .Generic( + message = s"Failed to enqueue submission in the Sandbox-on-X ledger bridge", + throwableO = Some(throwable), + ) + .rpcStatus(Some(submissionId)) ) case QueueOfferResult.QueueClosed => - logger.error("Error enqueueing new submission: queue is closed.") SubmissionResult.SynchronousError( - Status( - Code.INTERNAL.value, - "Queue is closed", - ) + LedgerApiErrors.ServiceNotRunning + .Reject("Sandbox-on-X ledger bridge") + .rpcStatus(None) ) } ) + } } diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/domain/Submission.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/domain/Submission.scala index afea4d049380..41b0d1668842 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/domain/Submission.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/domain/Submission.scala @@ -6,11 +6,13 @@ package com.daml.ledger.sandbox.domain import com.daml.daml_lf_dev.DamlLf.Archive import com.daml.ledger.configuration.Configuration import com.daml.ledger.participant.state.v2.{SubmitterInfo, TransactionMeta} +import com.daml.lf.data.Ref.SubmissionId import com.daml.lf.data.{Ref, Time} import com.daml.lf.transaction.SubmittedTransaction import com.daml.logging.LoggingContext private[sandbox] sealed trait Submission extends Product with Serializable { + def submissionId: Ref.SubmissionId def loggingContext: LoggingContext } @@ -21,7 +23,13 @@ private[sandbox] object Submission { transaction: SubmittedTransaction, estimatedInterpretationCost: Long, )(implicit val loggingContext: LoggingContext) - extends Submission + extends Submission { + val submissionId: SubmissionId = { + // TODO SoX production-ready: Make the submissionId non-optional + // .get deemed safe since no transaction submission should have the submission id empty + submitterInfo.submissionId.get + } + } final case class Config( maxRecordTime: Time.Timestamp, @@ -29,7 +37,6 @@ private[sandbox] object Submission { config: Configuration, )(implicit val loggingContext: LoggingContext) extends Submission - final case class AllocateParty( hint: Option[Ref.Party], displayName: Option[String], diff --git a/ledger/sandbox-on-x/src/test/suite/scala/com/daml/ledger/sandbox/BridgeWriteServiceTest.scala b/ledger/sandbox-on-x/src/test/suite/scala/com/daml/ledger/sandbox/BridgeWriteServiceTest.scala index 33f34eee2d4e..fb7a9f048772 100644 --- a/ledger/sandbox-on-x/src/test/suite/scala/com/daml/ledger/sandbox/BridgeWriteServiceTest.scala +++ b/ledger/sandbox-on-x/src/test/suite/scala/com/daml/ledger/sandbox/BridgeWriteServiceTest.scala @@ -50,7 +50,7 @@ class BridgeWriteServiceTest extends AnyFlatSpec with MockitoSugar with Matchers applicationId = Ref.ApplicationId.assertFromString("a0"), commandId = Ref.CommandId.assertFromString("c0"), deduplicationPeriod = DeduplicationPeriod.DeduplicationOffset(Offset.beforeBegin), - submissionId = None, + submissionId = Some(Ref.SubmissionId.assertFromString("some-submission-id")), ledgerConfiguration = Configuration.reasonableInitialConfiguration, )