Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move logging value definitions alongside their objects. #10439

Merged
merged 4 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

package com.daml.lf.data

import com.daml.lf.data.Ref.{Identifier, Party}
import com.daml.lf.data.Ref.{Identifier, LedgerString, Party}
import com.daml.logging.entries.{LoggingKey, LoggingValue, ToLoggingKey, ToLoggingValue}

package object logging {

implicit val `LedgerString to LoggingValue`: ToLoggingValue[LedgerString] =
ToLoggingValue.ToStringToLoggingValue

implicit val `Identifier to LoggingValue`: ToLoggingValue[Identifier] =
ToLoggingValue.ToStringToLoggingValue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.daml.ledger.configuration.Configuration
import com.daml.lf.command.{Commands => LfCommands}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.LedgerString.ordering
import com.daml.lf.data.logging._
import com.daml.lf.value.{Value => Lf}
import com.daml.logging.entries.{LoggingValue, ToLoggingValue}
import scalaz.syntax.tag._
Expand Down Expand Up @@ -288,9 +289,28 @@ object domain {
lazy val deduplicateUntil: Instant = submittedAt.plus(deduplicationDuration)
}

/** @param party The stable unique identifier of a Daml party.
object Commands {

import Logging._

implicit val `Commands to LoggingValue`: ToLoggingValue[Commands] = commands =>
LoggingValue.Nested.fromEntries(
"ledgerId" -> commands.ledgerId,
"workflowId" -> commands.workflowId,
"applicationId" -> commands.applicationId,
"commandId" -> commands.commandId,
"actAs" -> commands.actAs,
"readAs" -> commands.readAs,
"submittedAt" -> commands.submittedAt,
"deduplicationDuration" -> commands.deduplicationDuration,
)
}

/** Represents a party with additional known information.
*
* @param party The stable unique identifier of a Daml party.
* @param displayName Human readable name associated with the party. Might not be unique.
* @param isLocal True if party is hosted by the backing participant.
* @param isLocal True if party is hosted by the backing participant.
*/
case class PartyDetails(party: Ref.Party, displayName: Option[String], isLocal: Boolean)

Expand Down Expand Up @@ -338,4 +358,9 @@ object domain {
reason: String,
) extends PackageEntry
}

object Logging {
implicit def `tagged value to LoggingValue`[T: ToLoggingValue, Tag]: ToLoggingValue[T @@ Tag] =
value => value.unwrap
}
Comment on lines +361 to +365
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not completely understand the code here yet but I have the feeling this could be quite useful for the json api too 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says that if we have a type T, a type Tag, and an instance of ToLoggingValue[T], then we also have an instance of ToLoggingValue[T @@ Tag] (which just discards the tag).

For example, if we have a value of type Ref.TransactionId @@ TransactionIdTag, we can convert it to a logging value, because we know how to convert Ref.TransactionId to a logging value.

Happy to discuss further offline if you'd like to understand more.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.engine.{Error => LfError}
import com.daml.lf.interpretation.{Error => InterpretationError}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext.{withEnrichedLoggingContext, withEnrichedLoggingContextFrom}
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
Expand Down Expand Up @@ -107,7 +107,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
override def submit(
request: SubmitRequest
)(implicit telemetryContext: TelemetryContext): Future[Unit] =
withEnrichedLoggingContextFrom(logging.commands(request.commands)) { implicit loggingContext =>
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")
ledgerConfigProvider.latestConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@

package com.daml.platform.apiserver.services

import java.time.{Duration, Instant}

import com.daml.ledger.api.domain.{
ApplicationId,
CommandId,
Commands,
EventId,
LedgerId,
LedgerOffset,
TransactionFilter,
TransactionId,
WorkflowId,
}
import com.daml.lf.data.Ref.Party
import com.daml.lf.data.logging._
Expand Down Expand Up @@ -63,18 +58,9 @@ package object logging {
private[services] def ledgerId(id: LedgerId): LoggingEntry =
"ledgerId" -> id.unwrap

private[services] def applicationId(id: ApplicationId): LoggingEntry =
"applicationId" -> id.unwrap

private[services] def commandId(id: String): LoggingEntry =
"commandId" -> id

private[services] def commandId(id: CommandId): LoggingEntry =
"commandId" -> id.unwrap

private[services] def deduplicationDuration(duration: Duration): LoggingEntry =
"deduplicationDuration" -> duration

private[services] def eventId(id: EventId): LoggingEntry =
"eventId" -> id.unwrap

Expand All @@ -95,9 +81,6 @@ package object logging {
private[services] def submissionId(id: String): LoggingEntry =
"submissionId" -> id

private[services] def submittedAt(t: Instant): LoggingEntry =
"submittedAt" -> t

private[services] def transactionId(id: String): LoggingEntry =
"transactionId" -> id

Expand All @@ -107,20 +90,8 @@ package object logging {
private[services] def workflowId(id: String): LoggingEntry =
"workflowId" -> id

private[services] def workflowId(id: WorkflowId): LoggingEntry =
"workflowId" -> id.unwrap

private[services] def commands(cmds: Commands): LoggingEntries = {
val context = LoggingEntries(
commandId(cmds.commandId),
deduplicationDuration(cmds.deduplicationDuration),
applicationId(cmds.applicationId),
submittedAt(cmds.submittedAt),
actAs(cmds.actAs),
readAs(cmds.readAs),
)
cmds.workflowId.fold(context)(context :+ workflowId(_))
}
private[services] def commands(cmds: Commands): LoggingEntry =
"commands" -> cmds

private[services] def verbose(v: Boolean): LoggingEntry =
"verbose" -> v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@

package com.daml.platform.indexer

import java.time.Duration

import akka.NotUsed
import akka.stream.scaladsl.Flow
import com.codahale.metrics.Timer
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext.withEnrichedLoggingContextFrom
import com.daml.logging.entries.{LoggingEntries, LoggingEntry}
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.indexer.ExecuteUpdate.ExecuteUpdateFlow
Expand Down Expand Up @@ -75,7 +70,6 @@ object ExecuteUpdate {

trait ExecuteUpdate {
import state.Update._
import state._

private val logger = ContextualizedLogger.get(this.getClass)

Expand Down Expand Up @@ -228,146 +222,6 @@ trait ExecuteUpdate {
)
}
}

private[indexer] def loggingEntriesFor(
offset: Offset,
update: Update,
): LoggingEntries =
loggingEntriesFor(update) :+
"updateRecordTime" -> update.recordTime.toInstant :+
"updateOffset" -> offset

private def loggingEntriesFor(update: Update): LoggingEntries =
update match {
case ConfigurationChanged(_, submissionId, participantId, newConfiguration) =>
LoggingEntries(
Logging.submissionId(submissionId),
Logging.participantId(participantId),
Logging.configGeneration(newConfiguration.generation),
Logging.maxDeduplicationTime(newConfiguration.maxDeduplicationTime),
)
case ConfigurationChangeRejected(
_,
submissionId,
participantId,
proposedConfiguration,
rejectionReason,
) =>
LoggingEntries(
Logging.submissionId(submissionId),
Logging.participantId(participantId),
Logging.configGeneration(proposedConfiguration.generation),
Logging.maxDeduplicationTime(proposedConfiguration.maxDeduplicationTime),
Logging.rejectionReason(rejectionReason),
)
case PartyAddedToParticipant(party, displayName, participantId, _, submissionId) =>
LoggingEntries(
Logging.submissionIdOpt(submissionId),
Logging.participantId(participantId),
Logging.party(party),
Logging.displayName(displayName),
)
case PartyAllocationRejected(submissionId, participantId, _, rejectionReason) =>
LoggingEntries(
Logging.submissionId(submissionId),
Logging.participantId(participantId),
Logging.rejectionReason(rejectionReason),
)
case PublicPackageUpload(_, sourceDescription, _, submissionId) =>
LoggingEntries(
Logging.submissionIdOpt(submissionId),
Logging.sourceDescriptionOpt(sourceDescription),
)
case PublicPackageUploadRejected(submissionId, _, rejectionReason) =>
LoggingEntries(
Logging.submissionId(submissionId),
Logging.rejectionReason(rejectionReason),
)
case TransactionAccepted(optSubmitterInfo, transactionMeta, _, transactionId, _, _, _) =>
LoggingEntries(
Logging.transactionId(transactionId),
Logging.ledgerTime(transactionMeta.ledgerEffectiveTime),
Logging.workflowIdOpt(transactionMeta.workflowId),
Logging.submissionTime(transactionMeta.submissionTime),
) ++ optSubmitterInfo
.map(info =>
LoggingEntries(
Logging.submitter(info.actAs),
Logging.applicationId(info.applicationId),
Logging.commandId(info.commandId),
Logging.deduplicationPeriod(info.optDeduplicationPeriod),
)
)
.getOrElse(LoggingEntries.empty)
case CommandRejected(_, completionInfo, reason) =>
LoggingEntries(
Logging.submitter(completionInfo.actAs),
Logging.applicationId(completionInfo.applicationId),
Logging.commandId(completionInfo.commandId),
Logging.deduplicationPeriod(completionInfo.optDeduplicationPeriod),
Logging.rejectionReason(reason),
)
}

private object Logging {
import com.daml.lf.data.logging._

def submissionId(id: Ref.SubmissionId): LoggingEntry =
"submissionId" -> id

def submissionIdOpt(id: Option[Ref.SubmissionId]): LoggingEntry =
"submissionId" -> id

def participantId(id: Ref.ParticipantId): LoggingEntry =
"participantId" -> id

def commandId(id: Ref.CommandId): LoggingEntry =
"commandId" -> id

def party(party: Ref.Party): LoggingEntry =
"party" -> party

def transactionId(id: Ref.TransactionId): LoggingEntry =
"transactionId" -> id

def applicationId(id: Ref.ApplicationId): LoggingEntry =
"applicationId" -> id

def workflowIdOpt(id: Option[Ref.WorkflowId]): LoggingEntry =
"workflowId" -> id

def ledgerTime(time: Timestamp): LoggingEntry =
"ledgerTime" -> time.toInstant

def submissionTime(time: Timestamp): LoggingEntry =
"submissionTime" -> time.toInstant

def configGeneration(generation: Long): LoggingEntry =
"configGeneration" -> generation

def maxDeduplicationTime(time: Duration): LoggingEntry =
"maxDeduplicationTime" -> time

def deduplicationPeriod(period: Option[state.DeduplicationPeriod]): LoggingEntry =
"deduplicationPeriod" -> period

def rejectionReason(rejectionReason: String): LoggingEntry =
"rejectionReason" -> rejectionReason

def rejectionReason(
rejectionReasonTemplate: state.Update.CommandRejected.RejectionReasonTemplate
): LoggingEntry =
"rejectionReason" -> rejectionReasonTemplate

def displayName(name: String): LoggingEntry =
"displayName" -> name

def sourceDescriptionOpt(description: Option[String]): LoggingEntry =
"sourceDescription" -> description

def submitter(parties: List[Ref.Party]): LoggingEntry =
"submitter" -> parties
}
}

class PipelinedExecuteUpdate(
Expand All @@ -377,8 +231,6 @@ class PipelinedExecuteUpdate(
private[indexer] val updatePreparationParallelism: Int,
)(implicit val executionContext: ExecutionContext, val loggingContext: LoggingContext)
extends ExecuteUpdate {
import state.Update._

private def insertTransactionState(
timedPipelinedUpdate: PipelinedUpdateWithTimer
): Future[PipelinedUpdateWithTimer] = timedPipelinedUpdate.preparedUpdate match {
Expand Down Expand Up @@ -407,8 +259,9 @@ class PipelinedExecuteUpdate(
timedPipelinedUpdate: PipelinedUpdateWithTimer
): Future[PersistenceResponse] = {
val pipelinedUpdate = timedPipelinedUpdate.preparedUpdate
withEnrichedLoggingContextFrom(
loggingEntriesFor(pipelinedUpdate.offsetStep.offset, pipelinedUpdate.update)
withEnrichedLoggingContext(
"offset" -> pipelinedUpdate.offsetStep.offset,
"update" -> pipelinedUpdate.update,
) { implicit loggingContext =>
Timed.future(
metrics.daml.indexer.stateUpdateProcessing, {
Expand All @@ -429,7 +282,7 @@ class PipelinedExecuteUpdate(

private def completeTransactionInsertion(
offsetStep: OffsetStep,
tx: TransactionAccepted,
tx: state.Update.TransactionAccepted,
pipelinedInsertTimer: Timer.Context,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
Timed
Expand Down Expand Up @@ -493,13 +346,11 @@ class AtomicExecuteUpdate(
private[indexer] implicit val loggingContext: LoggingContext,
private[indexer] val executionContext: ExecutionContext,
) extends ExecuteUpdate {
import state.Update._

private[indexer] val flow: ExecuteUpdateFlow =
Flow[OffsetUpdate]
.mapAsync(updatePreparationParallelism)(prepareUpdate)
.mapAsync(1) { case offsetUpdate @ OffsetUpdate(offsetStep, update) =>
withEnrichedLoggingContextFrom(loggingEntriesFor(offsetStep.offset, update)) {
withEnrichedLoggingContext("offset" -> offsetStep.offset, "update" -> update) {
implicit loggingContext =>
Timed.future(
metrics.daml.indexer.stateUpdateProcessing,
Expand All @@ -515,7 +366,7 @@ class AtomicExecuteUpdate(
preparedUpdate match {
case PreparedTransactionInsert(
offsetStep,
TransactionAccepted(
state.Update.TransactionAccepted(
optCompletionInfo,
transactionMeta,
transaction,
Expand Down
Loading