Skip to content

Commit

Permalink
[DPP-611][Self-service error codes] Adapt ApiCommandService (#11325)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
pbatko-da authored Oct 22, 2021
1 parent a89079b commit e8d0ccb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ private[daml] object ApiServices {
timeProvider = timeProvider,
ledgerConfigurationSubscription = ledgerConfigurationSubscription,
metrics = metrics,
errorsVersionsSwitcher,
)

val apiPartyManagementService = ApiPartyManagementService.createApiService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Source}
import com.daml.api.util.TimeProvider
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_completion_service.{
Expand Down Expand Up @@ -59,6 +59,7 @@ import scala.util.Try
private[apiserver] final class ApiCommandService private[services] (
transactionServices: TransactionServices,
submissionTracker: Tracker,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
Expand All @@ -67,6 +68,8 @@ private[apiserver] final class ApiCommandService private[services] (

private val logger = ContextualizedLogger.get(this.getClass)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

@volatile private var running = true

override def close(): Unit = {
Expand All @@ -93,7 +96,7 @@ private[apiserver] final class ApiCommandService private[services] (
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
ErrorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
}
Expand Down Expand Up @@ -179,6 +182,7 @@ private[apiserver] object ApiCommandService {
timeProvider: TimeProvider,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
materializer: Materializer,
executionContext: ExecutionContext,
Expand All @@ -191,7 +195,8 @@ private[apiserver] object ApiCommandService {
trackerCleanupInterval,
)
new GrpcCommandService(
service = new ApiCommandService(transactionServices, submissionTracker),
service =
new ApiCommandService(transactionServices, submissionTracker, errorCodesVersionSwitcher),
ledgerId = configuration.ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

package com.daml.platform.apiserver.services

import com.daml.error.ErrorCodesVersionSwitcher

import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.TimeUnit

import com.daml.grpc.{GrpcException, GrpcStatus}
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.daml.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
Expand Down Expand Up @@ -36,7 +37,9 @@ class ApiCommandServiceSpec
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

"the command service" should {
val errorCodesVersionSwitcher = mock[ErrorCodesVersionSwitcher]

s"the command service" should {
val completionSuccess = CompletionResponse.CompletionSuccess(
Completion(
commandId = "command ID",
Expand All @@ -48,22 +51,30 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Right(completionSuccess)
)
)

openChannel(
new ApiCommandService(UnimplementedTransactionServices, submissionTracker)
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).map { response =>
response.transactionId should be("transaction ID")
verify(submissionTracker).track(
eqTo(CommandSubmission(commands))
)(any[ExecutionContext], any[LoggingContext])
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
Expand All @@ -79,15 +90,22 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Right(completionSuccess)
)
)

openChannel(
new ApiCommandService(UnimplementedTransactionServices, submissionTracker),
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
),
deadlineTicker,
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
Expand All @@ -99,6 +117,7 @@ class ApiCommandServiceSpec
verify(submissionTracker).track(
eqTo(CommandSubmission(commands, timeout = Some(Duration.ofSeconds(30))))
)(any[ExecutionContext], any[LoggingContext])
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
Expand All @@ -108,7 +127,10 @@ class ApiCommandServiceSpec
val commands = someCommands()
val submissionTracker = mock[Tracker]
when(
submissionTracker.track(any[CommandSubmission])(any[ExecutionContext], any[LoggingContext])
submissionTracker.track(any[CommandSubmission])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(
Future.successful(
Left(
Expand All @@ -119,20 +141,32 @@ class ApiCommandServiceSpec
)
)

openChannel(new ApiCommandService(UnimplementedTransactionServices, submissionTracker)).use {
stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).failed.map { exception =>
exception should matchPattern { case GrpcException(GrpcStatus.ABORTED(), _) => }
}
openChannel(
new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)
).use { stub =>
val request = SubmitAndWaitRequest.of(Some(commands))
stub.submitAndWaitForTransactionId(request).failed.map { exception =>
exception should matchPattern { case GrpcException(GrpcStatus.ABORTED(), _) => }
verifyZeroInteractions(errorCodesVersionSwitcher)
succeed
}
}
}

"close the supplied tracker when closed" in {
val submissionTracker = mock[Tracker]
val service = new ApiCommandService(UnimplementedTransactionServices, submissionTracker)
val service = new ApiCommandService(
UnimplementedTransactionServices,
submissionTracker,
errorCodesVersionSwitcher,
)

verifyZeroInteractions(submissionTracker)
verifyZeroInteractions(errorCodesVersionSwitcher)

service.close()
verify(submissionTracker).close()
Expand Down

0 comments on commit e8d0ccb

Please sign in to comment.