Skip to content

Commit

Permalink
participant-integration-api: Move TrackerMap code around. [KVL-1009] (
Browse files Browse the repository at this point in the history
#10653)

* participant-integration-api: Make `TrackerMap` a `Tracker`.

Mostly by moving parameters around.

CHANGELOG_BEGIN
CHANGELOG_END

* participant-integration-api: Clean up `TrackerMap` a little more.

* participant-integration-api: Make `TrackerMap` generic.

* participant-integration-api: Move `Tracker.WithLastSubmission`.

It's only used by `TrackerMap`, so now it's an internal class there.

* participant-integration-api: Only provide the key to `newTracker`.

* participant-integration-api: Subsume the `TrackerMap` cleanup schedule.

* participant-integration-api: Construct the tracker map outside.

* ledger-api-common: Remove some unnecessary braces.

* participant-integration-api: Prettify `TrackerMap` some more.

* participant-integration-api: Make `TrackerMap.selfCleaning` a class.

* participant-integration-api: Add some tests for TrackerMap.

* participant-integration-api: Convert a method to `Runnable`, 2.12-style.

Apparently underscores aren't good enough.

* ledger-api-client: Delete CompletionSuccess#unapply.

It doesn't work on Scala 2.12.
  • Loading branch information
SamirTalwar authored Aug 25, 2021
1 parent 640fb68 commit ef239fd
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ object CompletionResponse {
originalStatus: StatusProto,
)

object CompletionSuccess {

/** In most cases we're not interested in the original grpc status, as it's used only to keep backwards compatibility
*/
def unapply(success: CompletionSuccess): Option[(String, String)] = Some(
success.commandId -> success.transactionId
)
}

def apply(completion: Completion): Either[CompletionFailure, CompletionSuccess] =
completion.status match {
case Some(grpcStatus) if Code.OK.value() == grpcStatus.code =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,17 @@ object CommandsValidator {
}

def effectiveSubmitters(commands: ProtoCommands): Submitters[String] = {
val effectiveActAs =
if (commands.party.isEmpty)
commands.actAs.toSet
else
commands.actAs.toSet + commands.party
val effectiveReadAs = commands.readAs.toSet -- effectiveActAs
Submitters(effectiveActAs, effectiveReadAs)
val actAs = effectiveActAs(commands)
val readAs = commands.readAs.toSet -- actAs
Submitters(actAs, readAs)
}

def effectiveActAs(commands: ProtoCommands): Set[String] =
if (commands.party.isEmpty)
commands.actAs.toSet
else
commands.actAs.toSet + commands.party

val noSubmitters: Submitters[String] = Submitters(Set.empty, Set.empty)

def validateSubmitters(
Expand Down
4 changes: 4 additions & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ da_scala_test_suite(
jvm_flags = [
"-Djava.security.debug=\"certpath ocsp\"", # This facilitates debugging of the OCSP checks mechanism
],
plugins = [
silencer_plugin,
],
resources = glob(["src/test/resources/**/*"]),
scala_deps = [
"@maven//:com_typesafe_akka_akka_actor",
Expand All @@ -241,6 +244,7 @@ da_scala_test_suite(
"@maven//:org_mockito_mockito_scala",
"@maven//:org_playframework_anorm_anorm",
"@maven//:org_playframework_anorm_anorm_tokenizer",
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalactic_scalactic",
"@maven//:org_scalatest_scalatest_core",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.platform.apiserver.services
import java.time.Instant

import akka.NotUsed
import akka.actor.Cancellable
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Keep, Source}
import com.daml.api.util.TimeProvider
Expand All @@ -19,6 +18,7 @@ import com.daml.ledger.api.v1.command_completion_service.{
}
import com.daml.ledger.api.v1.command_service._
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
Expand All @@ -39,44 +39,34 @@ import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription
import com.daml.platform.apiserver.services.ApiCommandService._
import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
import com.daml.platform.apiserver.services.tracking.{Tracker, TrackerImpl, TrackerMap}
import com.daml.platform.server.api.ApiException
import com.daml.platform.server.api.services.grpc.GrpcCommandService
import com.daml.util.Ctx
import com.daml.util.akkastreams.MaxInFlight
import com.google.protobuf.empty.Empty
import io.grpc._
import io.grpc.Status
import scalaz.syntax.tag._

import scala.concurrent.duration._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try

private[apiserver] final class ApiCommandService private (
services: LocalServices,
configuration: ApiCommandService.Configuration,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
metrics: Metrics,
submissionTracker: Tracker,
)(implicit
materializer: Materializer,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) extends CommandServiceGrpc.CommandService
with AutoCloseable {

private val logger = ContextualizedLogger.get(this.getClass)

private val submissionTracker: TrackerMap = TrackerMap(configuration.retentionPeriod)
private val staleCheckerInterval: FiniteDuration = 30.seconds

private val trackerCleanupJob: Cancellable = materializer.system.scheduler
.scheduleAtFixedRate(staleCheckerInterval, staleCheckerInterval)(submissionTracker.cleanup)

@volatile private var running = true

override def close(): Unit = {
logger.info("Shutting down Command Service")
trackerCleanupJob.cancel()
running = false
submissionTracker.close()
}
Expand All @@ -91,63 +81,14 @@ private[apiserver] final class ApiCommandService private (
logging.readAsStrings(request.getCommands.readAs),
) { implicit loggingContext =>
if (running) {
track(request)
submissionTracker.track(request)
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down."))
)
}.andThen(logger.logErrorsOnCall[Completion])
}

private def track(
request: SubmitAndWaitRequest
)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val appId = request.getCommands.applicationId
// Note: command completions are returned as long as at least one of the original submitters
// is specified in the command completion request.
val parties = CommandsValidator.effectiveSubmitters(request.getCommands).actAs
val submitter = TrackerMap.Key(application = appId, parties = parties)
// Use just name of first party for open-ended metrics to avoid unbounded metrics name for multiple parties
val metricsPrefixFirstParty = parties.toList.min
submissionTracker.track(submitter, request) {
for {
ledgerEnd <- services.getCompletionEnd().map(_.getOffset)
} yield {
val tracker =
CommandTrackerFlow[Promise[Either[CompletionFailure, CompletionSuccess]], NotUsed](
services.submissionFlow,
offset =>
services
.getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId.unwrap,
appId,
parties.toList,
Some(offset),
)
)
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd,
() => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
)
val trackingFlow = MaxInFlight(
configuration.maxCommandsInFlight,
capacityCounter = metrics.daml.commands.maxInFlightCapacity(metricsPrefixFirstParty),
lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty),
).joinMat(tracker)(Keep.right)
TrackerImpl(
trackingFlow,
configuration.inputBufferSize,
capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty),
lengthCounter = metrics.daml.commands.inputBufferLength(metricsPrefixFirstParty),
delayTimer = metrics.daml.commands.inputBufferDelay(metricsPrefixFirstParty),
)
}
}
}

override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] =
submitAndWaitInternal(request).map(
_.fold(
Expand Down Expand Up @@ -205,12 +146,12 @@ private[apiserver] final class ApiCommandService private (
},
)
}

override def toString: String = ApiCommandService.getClass.getSimpleName
}

private[apiserver] object ApiCommandService {

private val trackerCleanupInterval = 30.seconds

def create(
configuration: Configuration,
services: LocalServices,
Expand All @@ -221,16 +162,23 @@ private[apiserver] object ApiCommandService {
materializer: Materializer,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): CommandServiceGrpc.CommandService with GrpcApiService =
): CommandServiceGrpc.CommandService with GrpcApiService = {
val submissionTracker = new TrackerMap.SelfCleaning(
configuration.retentionPeriod,
Tracking.getTrackerKey,
Tracking.newTracker(configuration, services, ledgerConfigurationSubscription, metrics),
trackerCleanupInterval,
)
new GrpcCommandService(
new ApiCommandService(services, configuration, ledgerConfigurationSubscription, metrics),
service = new ApiCommandService(services, submissionTracker),
ledgerId = configuration.ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () =>
ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
generateSubmissionId = SubmissionIdGenerator.Random,
)
}

final case class Configuration(
ledgerId: LedgerId,
Expand All @@ -251,4 +199,65 @@ private[apiserver] object ApiCommandService {
getFlatTransactionById: GetTransactionByIdRequest => Future[GetFlatTransactionResponse],
)

private object Tracking {
final case class Key(applicationId: String, parties: Set[String])

def getTrackerKey(commands: Commands): Tracking.Key = {
val parties = CommandsValidator.effectiveActAs(commands)
Tracking.Key(commands.applicationId, parties)
}

def newTracker(
configuration: Configuration,
services: LocalServices,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
metrics: Metrics,
)(
key: Tracking.Key
)(implicit
materializer: Materializer,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Tracker] = {
// Note: command completions are returned as long as at least one of the original submitters
// is specified in the command completion request.
// Use just name of first party for open-ended metrics to avoid unbounded metrics name for multiple parties
val metricsPrefixFirstParty = key.parties.min
for {
ledgerEnd <- services.getCompletionEnd().map(_.getOffset)
} yield {
val commandTrackerFlow =
CommandTrackerFlow[Promise[Either[CompletionFailure, CompletionSuccess]], NotUsed](
services.submissionFlow,
offset =>
services
.getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId.unwrap,
key.applicationId,
key.parties.toList,
Some(offset),
)
)
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd,
() => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
)
val trackingFlow = MaxInFlight(
configuration.maxCommandsInFlight,
capacityCounter = metrics.daml.commands.maxInFlightCapacity(metricsPrefixFirstParty),
lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty),
).joinMat(commandTrackerFlow)(Keep.right)

TrackerImpl(
trackingFlow,
configuration.inputBufferSize,
capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty),
lengthCounter = metrics.daml.commands.inputBufferLength(metricsPrefixFirstParty),
delayTimer = metrics.daml.commands.inputBufferDelay(metricsPrefixFirstParty),
)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,11 @@ import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

private[tracking] trait Tracker extends AutoCloseable {
trait Tracker extends AutoCloseable {

def track(request: SubmitAndWaitRequest)(implicit
ec: ExecutionContext,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]]

}

private[tracking] object Tracker {

class WithLastSubmission(delegate: Tracker) extends Tracker {

override def close(): Unit = delegate.close()

@volatile private var lastSubmission = System.nanoTime()

def getLastSubmission: Long = lastSubmission

override def track(request: SubmitAndWaitRequest)(implicit
ec: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
lastSubmission = System.nanoTime()
delegate.track(request)
}
}

object WithLastSubmission {
def apply(delegate: Tracker): WithLastSubmission = new WithLastSubmission(delegate)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[services] final class TrackerImpl(
import TrackerImpl.logger

override def track(request: SubmitAndWaitRequest)(implicit
ec: ExecutionContext,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
logger.trace("Tracking command")
Expand Down
Loading

0 comments on commit ef239fd

Please sign in to comment.