Skip to content

Commit

Permalink
Improve logging in ledger API server [DPP-231] (#8676)
Browse files Browse the repository at this point in the history
* Added logging incoming requests in API services: Submission, ConfigManagement, PackageManagement, ParticipantPruning and PartyManagement

CHANGELOG_BEGIN
- Logging incoming requests in API services
CHANGELOG_END

* Logging transactions and transaction trees streamed by the ApiTransactionService

CHANGELOG_BEGIN
- Logging transactions and transaction trees returned by the ApiTransactionService
CHANGELOG_END

* Logging storing the db in JdbcLedgerDao

* Changed log severity

* Factored out logging util to ContextualizedLogger

* Review improvements

* Removed unused import

* Formatted changes

* Logging completions stream items

* Fixed log message

* Logging complete transactions and transaction trees

* Removed duplicated keys from logging context

* Formatted changes

* Reduced logging for completions and transactions

* Removed redundant log

* Removed update* prefix in Indexer's logging context

* Minor improvement

* Minor improvement
  • Loading branch information
kamil-da authored Feb 15, 2021
1 parent f02e0fe commit 926fb59
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,23 @@ private[apiserver] final class ApiCommandCompletionService private (

completionsService
.getCompletions(offset, request.applicationId, request.parties)
.via(logger.debugStream(completionsLoggable))
.via(logger.logErrorsOnStream)
}

private def completionsLoggable(response: CompletionStreamResponse): String =
s"Responding with completions: ${response.completions.toList
.map(c => singleCompletionLoggable(c.commandId, c.status.map(_.code)))}"

private def singleCompletionLoggable(
commandId: String,
statusCode: Option[Int],
): Map[String, String] =
Map(
logging.commandId(commandId),
"statusCode" -> statusCode.map(_.toString).getOrElse(""),
)

override def getLedgerEnd(ledgerId: domain.LedgerId): Future[LedgerOffset.Absolute] =
completionsService.currentLedgerEnd().andThen(logger.logErrorsOnCall[LedgerOffset.Absolute])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[apiserver] final class ApiSubmissionService private (

override def submit(request: SubmitRequest): Future[Unit] =
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.debug("Submitting transaction")
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")
ledgerConfigProvider.latestConfiguration
.map(deduplicateAndRecordOnLedger(seedService.nextSeed(), request.commands, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.daml.ledger.participant.state.v1.{
WriteConfigService,
}
import com.daml.lf.data.Time
import com.daml.platform.apiserver.services.logging
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.services.admin.ApiConfigManagementService._
import com.daml.platform.configuration.LedgerConfiguration
Expand Down Expand Up @@ -57,11 +59,13 @@ private[apiserver] final class ApiConfigManagementService private (
override def bindService(): ServerServiceDefinition =
ConfigManagementServiceGrpc.bindService(this, executionContext)

override def getTimeModel(request: GetTimeModelRequest): Future[GetTimeModelResponse] =
override def getTimeModel(request: GetTimeModelRequest): Future[GetTimeModelResponse] = {
logger.info("Getting time model")
index
.lookupConfiguration()
.map(_.fold(defaultConfigResponse) { case (_, conf) => configToResponse(conf) })
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}

private def configToResponse(config: Configuration): GetTimeModelResponse = {
val tm = config.timeModel
Expand All @@ -77,55 +81,58 @@ private[apiserver] final class ApiConfigManagementService private (
)
}

override def setTimeModel(request: SetTimeModelRequest): Future[SetTimeModelResponse] = {
val response = for {
// Validate and convert the request parameters
params <- validateParameters(request).fold(Future.failed(_), Future.successful)

// Lookup latest configuration to check generation and to extend it with the new time model.
optConfigAndOffset <- index.lookupConfiguration()
ledgerEndBeforeRequest = optConfigAndOffset.map(_._1)
currentConfig = optConfigAndOffset.map(_._2)

// Verify that we're modifying the current configuration.
expectedGeneration = currentConfig
.map(_.generation)
.getOrElse(LedgerConfiguration.NoGeneration)
_ <-
if (request.configurationGeneration != expectedGeneration) {
Future.failed(
ErrorFactories.invalidArgument(
s"Mismatching configuration generation, expected $expectedGeneration, received ${request.configurationGeneration}"
)
override def setTimeModel(request: SetTimeModelRequest): Future[SetTimeModelResponse] =
withEnrichedLoggingContext(logging.submissionId(request.submissionId)) {
implicit loggingContext =>
logger.info("Setting time model")
val response = for {
// Validate and convert the request parameters
params <- validateParameters(request).fold(Future.failed(_), Future.successful)

// Lookup latest configuration to check generation and to extend it with the new time model.
optConfigAndOffset <- index.lookupConfiguration()
ledgerEndBeforeRequest = optConfigAndOffset.map(_._1)
currentConfig = optConfigAndOffset.map(_._2)

// Verify that we're modifying the current configuration.
expectedGeneration = currentConfig
.map(_.generation)
.getOrElse(LedgerConfiguration.NoGeneration)
_ <-
if (request.configurationGeneration != expectedGeneration) {
Future.failed(
ErrorFactories.invalidArgument(
s"Mismatching configuration generation, expected $expectedGeneration, received ${request.configurationGeneration}"
)
)
} else {
Future.unit
}

// Create the new extended configuration.
newConfig = currentConfig
.map(config => config.copy(generation = config.generation + 1))
.getOrElse(ledgerConfiguration.initialConfiguration)
.copy(timeModel = params.newTimeModel)

// Submit configuration to the ledger, and start polling for the result.
submissionId = SubmissionId.assertFromString(request.submissionId)
synchronousResponse = new SynchronousResponse(
new SynchronousResponseStrategy(
writeService,
index,
ledgerEndBeforeRequest,
),
timeToLive = JDuration.ofMillis(params.timeToLive.toMillis),
)
} else {
Future.unit
}

// Create the new extended configuration.
newConfig = currentConfig
.map(config => config.copy(generation = config.generation + 1))
.getOrElse(ledgerConfiguration.initialConfiguration)
.copy(timeModel = params.newTimeModel)

// Submit configuration to the ledger, and start polling for the result.
submissionId = SubmissionId.assertFromString(request.submissionId)
synchronousResponse = new SynchronousResponse(
new SynchronousResponseStrategy(
writeService,
index,
ledgerEndBeforeRequest,
),
timeToLive = JDuration.ofMillis(params.timeToLive.toMillis),
)
entry <- synchronousResponse.submitAndWait(
submissionId,
(params.maximumRecordTime, newConfig),
)(executionContext, materializer)
} yield SetTimeModelResponse(entry.configuration.generation)
entry <- synchronousResponse.submitAndWait(
submissionId,
(params.maximumRecordTime, newConfig),
)(executionContext, materializer)
} yield SetTimeModelResponse(entry.configuration.generation)

response.andThen(logger.logErrorsOnCall[SetTimeModelResponse])
}
response.andThen(logger.logErrorsOnCall[SetTimeModelResponse])
}

private case class SetTimeModelParameters(
newTimeModel: v1.TimeModel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.daml.ledger.participant.state.v1.{SubmissionId, SubmissionResult, Wri
import com.daml.lf.archive.{Dar, DarReader, Decode}
import com.daml.lf.engine.Engine
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.apiserver.services.logging
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.services.admin.ApiPackageManagementService._
import com.daml.platform.server.api.validation.ErrorFactories
Expand Down Expand Up @@ -64,6 +66,7 @@ private[apiserver] final class ApiPackageManagementService private (
override def listKnownPackages(
request: ListKnownPackagesRequest
): Future[ListKnownPackagesResponse] = {
logger.info("Listing known packages")
packagesIndex
.listLfPackages()
.map { pkgs =>
Expand All @@ -81,7 +84,7 @@ private[apiserver] final class ApiPackageManagementService private (

private[this] val darReader = DarReader[Archive] { case (_, x) => Try(Archive.parseFrom(x)) }

def decodeAndValidate(stream: ZipInputStream): Try[Dar[Archive]] =
private def decodeAndValidate(stream: ZipInputStream): Try[Dar[Archive]] =
for {
dar <- darReader.readArchive("package-upload", stream)
packages <- Try(dar.all.iterator.map(Decode.decodeArchive).toMap)
Expand All @@ -92,30 +95,34 @@ private[apiserver] final class ApiPackageManagementService private (
.toTry
} yield dar

override def uploadDarFile(request: UploadDarFileRequest): Future[UploadDarFileResponse] = {
val submissionId =
if (request.submissionId.isEmpty)
SubmissionId.assertFromString(UUID.randomUUID().toString)
else
SubmissionId.assertFromString(request.submissionId)

val darInputStream = new ZipInputStream(request.darFile.newInput())
val response = for {
dar <- decodeAndValidate(darInputStream).fold(
err => Future.failed(ErrorFactories.invalidArgument(err.getMessage)),
Future.successful,
)
_ <- synchronousResponse.submitAndWait(submissionId, dar)(executionContext, materializer)
} yield {
for (archive <- dar.all) {
logger.info(s"Package ${archive.getHash} successfully uploaded")
}
UploadDarFileResponse()
override def uploadDarFile(request: UploadDarFileRequest): Future[UploadDarFileResponse] =
withEnrichedLoggingContext(logging.submissionId(request.submissionId)) {
implicit loggingContext =>
logger.info("Uploading DAR file")
val submissionId =
if (request.submissionId.isEmpty)
SubmissionId.assertFromString(UUID.randomUUID().toString)
else
SubmissionId.assertFromString(request.submissionId)

val darInputStream = new ZipInputStream(request.darFile.newInput())

val response = for {
dar <- decodeAndValidate(darInputStream).fold(
err => Future.failed(ErrorFactories.invalidArgument(err.getMessage)),
Future.successful,
)
_ <- synchronousResponse.submitAndWait(submissionId, dar)(executionContext, materializer)
} yield {
for (archive <- dar.all) {
logger.info(s"Package ${archive.getHash} successfully uploaded")
}
UploadDarFileResponse()
}

response.andThen(logger.logErrorsOnCall[UploadDarFileResponse])
}

response.andThen(logger.logErrorsOnCall[UploadDarFileResponse])
}

}

private[apiserver] object ApiPackageManagementService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.daml.ledger.participant.state.v1.{
WriteParticipantPruningService,
}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.apiserver.services.logging
import com.daml.platform.ApiOffset
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.api.grpc.GrpcApiService
Expand Down Expand Up @@ -50,8 +51,9 @@ final class ApiParticipantPruningService private (
submissionIdOrErr.fold(
Future.failed,
submissionId =>
LoggingContext.withEnrichedLoggingContext("submissionId" -> submissionId) {
LoggingContext.withEnrichedLoggingContext(logging.submissionId(submissionId)) {
implicit logCtx =>
logger.info(s"Pruning up to ${request.pruneUpTo}")
(for {

pruneUpTo <- validateRequest(request: PruneRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.daml.ledger.participant.state.v1
import com.daml.ledger.participant.state.v1.{SubmissionId, SubmissionResult, WritePartyService}
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.apiserver.services.logging
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.services.admin.ApiPartyManagementService._
import com.daml.platform.server.api.validation.ErrorFactories
Expand Down Expand Up @@ -57,6 +59,7 @@ private[apiserver] final class ApiPartyManagementService private (
override def getParticipantId(
request: GetParticipantIdRequest
): Future[GetParticipantIdResponse] = {
logger.info("Getting Participant ID")
partyManagementService
.getParticipantId()
.map(pid => GetParticipantIdResponse(pid.toString))
Expand All @@ -69,52 +72,58 @@ private[apiserver] final class ApiPartyManagementService private (
PartyDetails(details.party, details.displayName.getOrElse(""), details.isLocal)

override def getParties(request: GetPartiesRequest): Future[GetPartiesResponse] =
partyManagementService
.getParties(request.parties.map(Ref.Party.assertFromString))
.map(ps => GetPartiesResponse(ps.map(mapPartyDetails)))
.andThen(logger.logErrorsOnCall[GetPartiesResponse])
withEnrichedLoggingContext(logging.parties(request.parties)) { implicit loggingContext =>
logger.info("Getting parties")
partyManagementService
.getParties(request.parties.map(Ref.Party.assertFromString))
.map(ps => GetPartiesResponse(ps.map(mapPartyDetails)))
.andThen(logger.logErrorsOnCall[GetPartiesResponse])
}

override def listKnownParties(
request: ListKnownPartiesRequest
): Future[ListKnownPartiesResponse] =
): Future[ListKnownPartiesResponse] = {
logger.info("Listing known parties")
partyManagementService
.listKnownParties()
.map(ps => ListKnownPartiesResponse(ps.map(mapPartyDetails)))
.andThen(logger.logErrorsOnCall[ListKnownPartiesResponse])
}

override def allocateParty(request: AllocatePartyRequest): Future[AllocatePartyResponse] = {

val validatedPartyIdentifier =
if (request.partyIdHint.isEmpty) {
Future.successful(None)
} else {
Ref.Party
.fromString(request.partyIdHint)
.fold(
error => Future.failed(ErrorFactories.invalidArgument(error)),
party => Future.successful(Some(party)),
)
}

validatedPartyIdentifier
.flatMap(party => {
val displayName = if (request.displayName.isEmpty) None else Some(request.displayName)
synchronousResponse
.submitAndWait(CreateSubmissionId.withPrefix(party), (party, displayName))
.map { case PartyEntry.AllocationAccepted(_, partyDetails) =>
AllocatePartyResponse(
Some(
PartyDetails(
partyDetails.party,
partyDetails.displayName.getOrElse(""),
partyDetails.isLocal,
override def allocateParty(request: AllocatePartyRequest): Future[AllocatePartyResponse] =
withEnrichedLoggingContext(logging.party(request.partyIdHint)) { implicit loggingContext =>
logger.info("Allocating party")
val validatedPartyIdentifier =
if (request.partyIdHint.isEmpty) {
Future.successful(None)
} else {
Ref.Party
.fromString(request.partyIdHint)
.fold(
error => Future.failed(ErrorFactories.invalidArgument(error)),
party => Future.successful(Some(party)),
)
}

validatedPartyIdentifier
.flatMap(party => {
val displayName = if (request.displayName.isEmpty) None else Some(request.displayName)
synchronousResponse
.submitAndWait(CreateSubmissionId.withPrefix(party), (party, displayName))
.map { case PartyEntry.AllocationAccepted(_, partyDetails) =>
AllocatePartyResponse(
Some(
PartyDetails(
partyDetails.party,
partyDetails.displayName.getOrElse(""),
partyDetails.isLocal,
)
)
)
)
}
})
.andThen(logger.logErrorsOnCall[AllocatePartyResponse])
}
}
})
.andThen(logger.logErrorsOnCall[AllocatePartyResponse])
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ package object logging {
"submissionId" -> id
private[services] def submittedAt(t: Instant): (String, String) =
"submittedAt" -> t.toString
private[services] def transactionId(id: String): (String, String) =
"transactionId" -> id
private[services] def transactionId(id: TransactionId): (String, String) =
"transactionId" -> id.unwrap
private[services] def workflowId(id: String): (String, String) =
"workflowId" -> id
private[services] def workflowId(id: WorkflowId): (String, String) =
"workflowId" -> id.unwrap
private[services] def commands(cmds: Commands): Map[String, String] = {
Expand Down
Loading

0 comments on commit 926fb59

Please sign in to comment.