Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port LedgerApiServer rate limiter interface from canton #11577

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ import com.daml.platform.services.time.TimeProviderType
import io.grpc.BindableService
import io.grpc.protobuf.services.ProtoReflectionService
import scalaz.syntax.tag._

import java.time.Duration

import com.daml.telemetry.TelemetryContext

import scala.collection.immutable
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -95,6 +97,7 @@ private[daml] object ApiServices {
seedService: SeedService,
managementServiceTimeout: Duration,
enableSelfServiceErrorCodes: Boolean,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit
materializer: Materializer,
esf: ExecutionSequencerFactory,
Expand Down Expand Up @@ -131,7 +134,11 @@ private[daml] object ApiServices {
configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos),
)
services <- Resource(
Future(createServices(ledgerId, currentLedgerConfiguration)(servicesExecutionContext))
Future(
createServices(ledgerId, currentLedgerConfiguration, checkOverloaded)(
servicesExecutionContext
)
)
)(services =>
Future {
services.foreach {
Expand All @@ -146,6 +153,7 @@ private[daml] object ApiServices {
private def createServices(
ledgerId: LedgerId,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit executionContext: ExecutionContext): List[BindableService] = {
val apiTransactionService =
ApiTransactionService.create(ledgerId, transactionsService, metrics, errorsVersionsSwitcher)
Expand Down Expand Up @@ -191,6 +199,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription,
apiCompletionService,
apiTransactionService,
checkOverloaded,
)

val apiReflectionService = ProtoReflectionService.newInstance()
Expand All @@ -217,6 +226,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
apiCompletionService: GrpcCommandCompletionService,
apiTransactionService: GrpcTransactionService,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit executionContext: ExecutionContext): List[BindableService] = {
optWriteService.toList.flatMap { writeService =>
val commandExecutor = new TimedCommandExecutor(
Expand Down Expand Up @@ -245,6 +255,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription,
seedService,
commandExecutor,
checkOverloaded,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
submissionConfig.enableDeduplication,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import com.daml.platform.store.LfValueTranslationCache
import com.daml.ports.{Port, PortFiles}
import io.grpc.{BindableService, ServerInterceptor}
import scalaz.{-\/, \/-}

import java.io.File
import java.time.Clock

import com.daml.telemetry.TelemetryContext

import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success, Try}
Expand All @@ -56,6 +58,8 @@ final class StandaloneApiServer(
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
lfValueTranslationCache: LfValueTranslationCache.Cache,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult] =
_ => None, // Used for Canton rate-limiting
)(implicit actorSystem: ActorSystem, materializer: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[ApiServer] {

Expand Down Expand Up @@ -127,6 +131,7 @@ final class StandaloneApiServer(
seedService = SeedService(config.seeding),
managementServiceTimeout = config.managementServiceTimeout,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
checkOverloaded = checkOverloaded,
)(materializer, executionSequencerFactory, loggingContext)
.map(_.withServices(otherServices))
apiServer <- new LedgerApiServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private[apiserver] object ApiSubmissionService {
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
Expand All @@ -73,6 +74,7 @@ private[apiserver] object ApiSubmissionService {
ledgerConfigurationSubscription,
seedService,
commandExecutor,
checkOverloaded,
configuration,
metrics,
errorCodesVersionSwitcher,
Expand Down Expand Up @@ -102,6 +104,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
val errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
Expand Down Expand Up @@ -221,12 +224,20 @@ private[apiserver] final class ApiSubmissionService private[services] (
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
transactionInfo <- handleCommandExecutionResult(result)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(transactionInfo, partyAllocationResults, ledgerConfig)
} yield submissionResult
checkOverloaded(telemetryContext) match {
case Some(submissionResult) => Future.successful(submissionResult)
Comment on lines +227 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: another alternative may be to use .map/.getOrElse.

    checkOverloaded(telemetryContext)
      .map(Future.successful)
      .getOrElse {
        ...
      }

case None =>
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
transactionInfo <- handleCommandExecutionResult(result)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(
transactionInfo,
partyAllocationResults,
ledgerConfig,
)
} yield submissionResult
}
oliverse-da marked this conversation as resolved.
Show resolved Hide resolved

// Takes the whole transaction to ensure to traverse it only if necessary
private[services] def allocateMissingInformees(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.daml.ledger.participant.state.index.v2.{
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.v2.WriteService
import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteService}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf
import com.daml.lf.command.{Commands => LfCommands}
Expand All @@ -35,15 +35,15 @@ import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._
import com.daml.platform.apiserver.SeedService
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
import io.grpc.{Status, StatusRuntimeException}
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}

import java.time.Duration
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -413,6 +413,32 @@ class ApiSubmissionServiceSpec
Success(succeed)
})
}

it should "rate-limit when configured to do so" in {
val grpcError = RpcStatus.of(Status.Code.ABORTED.value(), s"Quota Exceeded", Seq.empty)

val service =
newSubmissionService(
mock[state.WriteService],
mock[IndexPartyManagementService],
implicitPartyAllocation = true,
deduplicationEnabled = false,
mockIndexSubmissionService = mock[IndexSubmissionService],
commandExecutor = mock[CommandExecutor],
checkOverloaded = _ => Some(SubmissionResult.SynchronousError(grpcError)),
)

val submitRequest = newSubmitRequest()
service
.submit(submitRequest)
.transform {
case Failure(e: StatusRuntimeException)
if e.getStatus.getCode.value == grpcError.code && e.getStatus.getDescription == grpcError.message =>
Success(succeed)
case result =>
Try(fail(s"Expected submission to be aborted, but got ${result}"))
}
}
}

object ApiSubmissionServiceSpec {
Expand Down Expand Up @@ -449,6 +475,7 @@ object ApiSubmissionServiceSpec {
deduplicationEnabled: Boolean = true,
mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService],
useSelfServiceErrorCodes: Boolean = false,
checkOverloaded: TelemetryContext => Option[SubmissionResult] = _ => None,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
Expand Down Expand Up @@ -489,6 +516,7 @@ object ApiSubmissionServiceSpec {
errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
enableSelfServiceErrorCodes = useSelfServiceErrorCodes
),
checkOverloaded = checkOverloaded,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ final class SandboxServer(
seedService = seedingService,
managementServiceTimeout = config.managementServiceTimeout,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
checkOverloaded = _ => None,
)(materializer, executionSequencerFactory, loggingContext)
.map(_.withServices(List(resetService)))
apiServer <- new LedgerApiServer(
Expand Down