Skip to content

Commit

Permalink
factor common structure in RawBatch.Event (digital-asset#6216)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
S11001001 authored Jun 4, 2020
1 parent 2c3efc6 commit 8e3e296
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,24 @@ private[events] trait EventsTableInsert { this: EventsTable =>
}

private case class AccumulatingBatches(
creates: Vector[RawBatch.Event.Created],
exercises: Vector[RawBatch.Event.Exercised],
creates: Vector[RawBatch.Event[RawBatch.Event.Created]],
exercises: Vector[RawBatch.Event[RawBatch.Event.Exercised]],
archives: Vector[Vector[NamedParameter]],
) {

def add(create: RawBatch.Event.Created): AccumulatingBatches =
def add(create: RawBatch.Event[RawBatch.Event.Created]): AccumulatingBatches =
copy(creates = creates :+ create)

def add(exercise: RawBatch.Event.Exercised): AccumulatingBatches =
def add(exercise: RawBatch.Event[RawBatch.Event.Exercised])(
implicit dummy: DummyImplicit): AccumulatingBatches =
copy(exercises = exercises :+ exercise)

def add(archive: Vector[NamedParameter]): AccumulatingBatches =
copy(archives = archives :+ archive)

private def prepareRawNonEmpty(
query: String,
params: Vector[RawBatch.Event],
params: Vector[RawBatch.Event[_]],
): Option[RawBatch] =
if (params.nonEmpty) Some(new RawBatch(query, params)) else None

Expand Down Expand Up @@ -167,42 +168,29 @@ private[events] trait EventsTableInsert { this: EventsTable =>
transaction: GenTransaction.WithTxValue[Nid, ContractId],
flatWitnesses: WitnessRelation[Nid],
treeWitnesses: WitnessRelation[Nid],
): RawBatches =
): RawBatches = {
def event[Sp <: RawBatch.Event.Specific](nodeId: Nid, sp: Sp) =
new RawBatch.Event(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
specific = sp,
)

transaction
.fold(AccumulatingBatches.empty) {
case (batches, (nodeId, node: Create)) =>
batches.add(
new RawBatch.Event.Created(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
create = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Created(node)))
case (batches, (nodeId, node: Exercise)) =>
val batchWithExercises =
batches.add(
new RawBatch.Event.Exercised(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
exercise = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Exercised(node)))
if (node.consuming) {
batchWithExercises.add(
archive(
Expand All @@ -217,5 +205,6 @@ private[events] trait EventsTableInsert { this: EventsTable =>
batches // ignore any event which is neither a create nor an exercise
}
.prepare
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.platform.store.dao.events
import java.time.Instant

import anorm.NamedParameter
import com.daml.ledger.EventId
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.{ApplicationId, CommandId, TransactionId, WorkflowId}
import com.daml.platform.events.EventIdFormatter.fromTransactionId
Expand Down Expand Up @@ -51,7 +52,10 @@ private[events] object RawBatch {
partial :+ lfValueTranslation.serialize(contractId, createArgument)
}

sealed abstract class Event(
// this unfortunate upper bound is here to get access to
// [[Event.Specific#applySerialization]]; we would do away with it if
// [[PartialParameters#applySerialization]] was statically determined
final case class Event[+Specific <: Event.Specific](
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
Expand All @@ -62,9 +66,10 @@ private[events] object RawBatch {
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
specific: Specific,
) extends PartialParameters {
final protected val eventId = fromTransactionId(transactionId, nodeId)
final protected val base: Vector[NamedParameter] =
private[this] val eventId = fromTransactionId(transactionId, nodeId)
private[this] val base: Vector[NamedParameter] =
Vector[NamedParameter](
"event_id" -> eventId,
"event_offset" -> offset,
Expand All @@ -78,87 +83,60 @@ private[events] object RawBatch {
"flat_event_witnesses" -> Party.Array(flatWitnesses.toSeq: _*),
"tree_event_witnesses" -> Party.Array(treeWitnesses.toSeq: _*),
)

override def applySerialization(
lfValueTranslation: LfValueTranslation): Vector[NamedParameter] =
base ++ specific.applySerialization(transactionId, eventId, lfValueTranslation)
}

object Event {

sealed abstract class Specific {
private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter]
}

final class Created(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
create: Create,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> create.coid.coid,
"template_id" -> create.coinst.template,
"create_signatories" -> create.signatories.toArray[String],
"create_observers" -> create.stakeholders.diff(create.signatories).toArray[String],
"create_agreement_text" -> Some(create.coinst.agreementText).filter(_.nonEmpty),
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, create)
}

final class Exercised(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
exercise: Exercise,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> exercise.targetCoid,
"template_id" -> exercise.templateId,
"exercise_consuming" -> exercise.consuming,
"exercise_choice" -> exercise.choiceId,
"exercise_actors" -> exercise.actingParties.toArray[String],
"exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String],
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, exercise)
(partial :+ ("exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String]: NamedParameter)) ++ lfValueTranslation.serialize(eventId, exercise)
}

}
Expand Down

0 comments on commit 8e3e296

Please sign in to comment.