Skip to content

Commit

Permalink
Add logging context information to ledger services (digital-asset#4205)
Browse files Browse the repository at this point in the history
* Add logging context information to ledger services

Rolls back that AllowedType implicit, which made the logging context
less usable and didn't provide that much of an advantage. Replaced with
helpers where it makes sense that take care of turning the value safely
into a string.

Closes digital-asset#3699

CHANGELOG_BEGIN
CHANGELOG_END

* Remove unused import

* Address digital-asset#4205 (comment)
  • Loading branch information
stefanobaghino-da authored and mergify[bot] committed Jan 24, 2020
1 parent 135abd1 commit 243746d
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 235 deletions.
2 changes: 2 additions & 0 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ da_scala_library(
"@maven//:io_netty_netty_handler",
"@maven//:io_netty_netty_transport",
"@maven//:io_zipkin_brave_brave",
"@maven//:net_logstash_logback_logstash_logback_encoder",
"@maven//:org_flywaydb_flyway_core",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
"@maven//:org_scalaz_scalaz_core_2_12",
Expand Down Expand Up @@ -154,6 +155,7 @@ da_scala_library(
"@maven//:io_netty_netty_handler",
"@maven//:io_netty_netty_transport",
"@maven//:io_zipkin_brave_brave",
"@maven//:net_logstash_logback_logstash_logback_encoder",
"@maven//:org_flywaydb_flyway_core",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
"@maven//:org_scalaz_scalaz_core_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.v1.active_contracts_service.ActiveContractsServiceGrpc.ActiveContractsService
import com.digitalasset.ledger.api.v1.active_contracts_service._
import com.digitalasset.ledger.api.v1.event.CreatedEvent
import com.digitalasset.ledger.api.v1.transaction_filter.TransactionFilter
import com.digitalasset.ledger.api.validation.TransactionFilterValidator
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.logging.LoggingContext._
import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.participant.util.LfEngineToApi
import com.digitalasset.platform.server.api.validation.ActiveContractsServiceValidation
Expand All @@ -40,59 +39,58 @@ final class ApiActiveContractsService private (

private val logger = ContextualizedLogger.get(this.getClass)

private def partiesIn(filter: Option[TransactionFilter]): Array[String] =
filter.fold(Array.empty[String])(_.filtersByParty.keysIterator.toArray)

override protected def getActiveContractsSource(
request: GetActiveContractsRequest): Source[GetActiveContractsResponse, NotUsed] =
withEnrichedLoggingContext("parties" -> partiesIn(request.filter)) { implicit logCtx =>
logger.trace("Serving an Active Contracts request...")
request: GetActiveContractsRequest): Source[GetActiveContractsResponse, NotUsed] = {
logger.trace("Serving an Active Contracts request...")

TransactionFilterValidator
.validate(request.getFilter, "filter")
.fold(
Source.failed, { filter =>
Source
.future(backend.getActiveContractSetSnapshot(filter))
.flatMapConcat {
case ActiveContractSetSnapshot(offset, acsStream) =>
acsStream
.map {
case (wfId, create) =>
GetActiveContractsResponse(
workflowId = wfId.map(_.unwrap).getOrElse(""),
activeContracts = List(
CreatedEvent(
create.eventId.unwrap,
create.contractId.coid,
Some(LfEngineToApi.toApiIdentifier(create.templateId)),
create.contractKey.map(
ck =>
TransactionFilterValidator
.validate(request.getFilter, "filter")
.fold(
Source.failed, { filter =>
withEnrichedLoggingContext(logging.parties(filter.filtersByParty.keys)) {
implicit logCtx =>
Source
.future(backend.getActiveContractSetSnapshot(filter))
.flatMapConcat {
case ActiveContractSetSnapshot(offset, acsStream) =>
acsStream
.map {
case (wfId, create) =>
GetActiveContractsResponse(
workflowId = wfId.map(_.unwrap).getOrElse(""),
activeContracts = List(
CreatedEvent(
create.eventId.unwrap,
create.contractId.coid,
Some(LfEngineToApi.toApiIdentifier(create.templateId)),
create.contractKey.map(ck =>
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfVersionedValueToApiValue(verbose = request.verbose, ck))),
Some(
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfValueToApiRecord(
verbose = request.verbose,
create.argument.value))),
create.stakeholders.toSeq,
signatories = create.signatories.map(_.toString)(collection.breakOut),
observers = create.observers.map(_.toString)(collection.breakOut),
agreementText = Some(create.agreementText)
Some(
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfValueToApiRecord(
verbose = request.verbose,
create.argument.value))),
create.stakeholders.toSeq,
signatories =
create.signatories.map(_.toString)(collection.breakOut),
observers = create.observers.map(_.toString)(collection.breakOut),
agreementText = Some(create.agreementText)
)
)
)
)
}
.concat(Source.single(GetActiveContractsResponse(offset = offset.value)))
}
}
.concat(Source.single(GetActiveContractsResponse(offset = offset.value)))
}
}
)
.via(logger.logErrorsOnStream)
}
}
)
.via(logger.logErrorsOnStream)
}

override def bindService(): ServerServiceDefinition =
ActiveContractsServiceGrpc.bindService(this, DirectExecutionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.digitalasset.ledger.api.domain.{CompletionEvent, LedgerId, LedgerOffs
import com.digitalasset.ledger.api.messages.command.completion.CompletionStreamRequest
import com.digitalasset.ledger.api.v1.command_completion_service._
import com.digitalasset.ledger.api.validation.PartyNameChecker
import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.server.api.services.domain.CommandCompletionService
Expand All @@ -36,24 +37,26 @@ final class ApiCommandCompletionService private (completionsService: IndexComple
private val subscriptionIdCounter = new AtomicLong()

override def completionStreamSource(
request: CompletionStreamRequest): Source[CompletionEvent, NotUsed] = {
request: CompletionStreamRequest): Source[CompletionEvent, NotUsed] =
withEnrichedLoggingContext(logging.parties(request.parties), logging.offset(request.offset)) {
implicit logCtx =>
val subscriptionId = subscriptionIdCounter.getAndIncrement().toString
logger.debug(s"Received request for completion subscription $subscriptionId: $request")

val subscriptionId = subscriptionIdCounter.getAndIncrement().toString
logger.debug(s"Received request for completion subscription $subscriptionId: $request")
val offset = request.offset.getOrElse(LedgerOffset.LedgerEnd)

val offset = request.offset.getOrElse(LedgerOffset.LedgerEnd)

completionsService
.getCompletions(offset, request.applicationId, request.parties)
.via(logger.logErrorsOnStream)
}
completionsService
.getCompletions(offset, request.applicationId, request.parties)
.via(logger.logErrorsOnStream)
}

override def getLedgerEnd(ledgerId: domain.LedgerId): Future[LedgerOffset.Absolute] =
completionsService.currentLedgerEnd().andThen(logger.logErrorsOnCall[LedgerOffset.Absolute])

}

object ApiCommandCompletionService {

def create(ledgerId: LedgerId, completionsService: IndexCompletionsService)(
implicit ec: ExecutionContext,
mat: Materializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.digitalasset.ledger.client.services.commands.{
CommandTrackerFlow
}
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.apiserver.services.ApiCommandService.LowLevelCommandServiceAccess
import com.digitalasset.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
Expand Down Expand Up @@ -74,92 +75,86 @@ final class ApiCommandService private (
}

@SuppressWarnings(Array("org.wartremover.warts.Any"))
private def submitAndWaitInternal(request: SubmitAndWaitRequest): Future[Completion] = {

val appId = request.getCommands.applicationId
val submitter = TrackerMap.Key(application = appId, party = request.getCommands.party)

if (running) {
submissionTracker
.track(submitter, request) {
for {
trackingFlow <- {
lowLevelCommandServiceAccess match {
case LowLevelCommandServiceAccess.LocalServices(
submissionFlow,
getCompletionSource,
getCompletionEnd,
_,
_) =>
for {
ledgerEnd <- getCompletionEnd().map(_.getOffset)
} yield {
val tracker =
CommandTrackerFlow[Promise[Completion], NotUsed](
submissionFlow,
offset =>
getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId.unwrap,
appId,
List(submitter.party),
Some(offset)))
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd
)

if (configuration.limitMaxCommandsInFlight)
MaxInFlight(configuration.maxCommandsInFlight).joinMat(tracker)(Keep.right)
else tracker
}
private def submitAndWaitInternal(request: SubmitAndWaitRequest): Future[Completion] =
withEnrichedLoggingContext(
logging.commandId(request.getCommands.commandId),
logging.party(request.getCommands.party)) { implicit logCtx =>
val appId = request.getCommands.applicationId
val submitter = TrackerMap.Key(application = appId, party = request.getCommands.party)

if (running) {
submissionTracker
.track(submitter, request) {
for {
trackingFlow <- {
lowLevelCommandServiceAccess match {
case LowLevelCommandServiceAccess.LocalServices(
submissionFlow,
getCompletionSource,
getCompletionEnd,
_,
_) =>
for {
ledgerEnd <- getCompletionEnd().map(_.getOffset)
} yield {
val tracker =
CommandTrackerFlow[Promise[Completion], NotUsed](
submissionFlow,
offset =>
getCompletionSource(
CompletionStreamRequest(
configuration.ledgerId.unwrap,
appId,
List(submitter.party),
Some(offset)))
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd
)

if (configuration.limitMaxCommandsInFlight)
MaxInFlight(configuration.maxCommandsInFlight).joinMat(tracker)(Keep.right)
else tracker
}
}
}
} yield {
TrackerImpl(trackingFlow, configuration.inputBufferSize, configuration.historySize)
}
} yield {
TrackerImpl(trackingFlow, configuration.inputBufferSize, configuration.historySize)
}
}
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")))
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")))
}.andThen(logger.logErrorsOnCall[Completion])
}
}

override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] =
submitAndWaitInternal(request)
.map(_ => Empty.defaultInstance)
.andThen(logger.logErrorsOnCall[Empty])
submitAndWaitInternal(request).map(_ => Empty.defaultInstance)

override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionIdResponse] =
submitAndWaitInternal(request)
.map { compl =>
SubmitAndWaitForTransactionIdResponse(compl.transactionId)
}
.andThen(logger.logErrorsOnCall[SubmitAndWaitForTransactionIdResponse])
submitAndWaitInternal(request).map { compl =>
SubmitAndWaitForTransactionIdResponse(compl.transactionId)
}

override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionResponse] =
submitAndWaitInternal(request)
.flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
flatById(txRequest).map(resp => SubmitAndWaitForTransactionResponse(resp.transaction))
}
.andThen(logger.logErrorsOnCall[SubmitAndWaitForTransactionResponse])
submitAndWaitInternal(request).flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
flatById(txRequest).map(resp => SubmitAndWaitForTransactionResponse(resp.transaction))
}

override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest): Future[SubmitAndWaitForTransactionTreeResponse] =
submitAndWaitInternal(request)
.flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
treeById(txRequest).map(resp => SubmitAndWaitForTransactionTreeResponse(resp.transaction))
}
.andThen(logger.logErrorsOnCall[SubmitAndWaitForTransactionTreeResponse])
submitAndWaitInternal(request).flatMap { resp =>
val txRequest = GetTransactionByIdRequest(
request.getCommands.ledgerId,
resp.transactionId,
List(request.getCommands.party))
treeById(txRequest).map(resp => SubmitAndWaitForTransactionTreeResponse(resp.transaction))
}

override def toString: String = ApiCommandService.getClass.getSimpleName

Expand Down
Loading

0 comments on commit 243746d

Please sign in to comment.