Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

factor common structure in RawBatch.Event #6216

Merged
merged 1 commit into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,24 @@ private[events] trait EventsTableInsert { this: EventsTable =>
}

private case class AccumulatingBatches(
creates: Vector[RawBatch.Event.Created],
exercises: Vector[RawBatch.Event.Exercised],
creates: Vector[RawBatch.Event[RawBatch.Event.Created]],
exercises: Vector[RawBatch.Event[RawBatch.Event.Exercised]],
archives: Vector[Vector[NamedParameter]],
) {

def add(create: RawBatch.Event.Created): AccumulatingBatches =
def add(create: RawBatch.Event[RawBatch.Event.Created]): AccumulatingBatches =
copy(creates = creates :+ create)

def add(exercise: RawBatch.Event.Exercised): AccumulatingBatches =
def add(exercise: RawBatch.Event[RawBatch.Event.Exercised])(
implicit dummy: DummyImplicit): AccumulatingBatches =
stefanobaghino-da marked this conversation as resolved.
Show resolved Hide resolved
copy(exercises = exercises :+ exercise)

def add(archive: Vector[NamedParameter]): AccumulatingBatches =
copy(archives = archives :+ archive)

private def prepareRawNonEmpty(
query: String,
params: Vector[RawBatch.Event],
params: Vector[RawBatch.Event[_]],
): Option[RawBatch] =
if (params.nonEmpty) Some(new RawBatch(query, params)) else None

Expand Down Expand Up @@ -167,42 +168,29 @@ private[events] trait EventsTableInsert { this: EventsTable =>
transaction: GenTransaction.WithTxValue[Nid, ContractId],
flatWitnesses: WitnessRelation[Nid],
treeWitnesses: WitnessRelation[Nid],
): RawBatches =
): RawBatches = {
def event[Sp <: RawBatch.Event.Specific](nodeId: Nid, sp: Sp) =
new RawBatch.Event(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
specific = sp,
)

transaction
.fold(AccumulatingBatches.empty) {
case (batches, (nodeId, node: Create)) =>
batches.add(
new RawBatch.Event.Created(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
create = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Created(node)))
case (batches, (nodeId, node: Exercise)) =>
val batchWithExercises =
batches.add(
new RawBatch.Event.Exercised(
applicationId = submitterInfo.map(_.applicationId),
workflowId = workflowId,
commandId = submitterInfo.map(_.commandId),
transactionId = transactionId,
nodeId = nodeId,
submitter = submitterInfo.map(_.submitter),
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses getOrElse (nodeId, Set.empty),
treeWitnesses = treeWitnesses getOrElse (nodeId, Set.empty),
exercise = node,
)
)
batches.add(event(nodeId, new RawBatch.Event.Exercised(node)))
if (node.consuming) {
batchWithExercises.add(
archive(
Expand All @@ -217,5 +205,6 @@ private[events] trait EventsTableInsert { this: EventsTable =>
batches // ignore any event which is neither a create nor an exercise
}
.prepare
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.platform.store.dao.events
import java.time.Instant

import anorm.NamedParameter
import com.daml.ledger.EventId
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.{ApplicationId, CommandId, TransactionId, WorkflowId}
import com.daml.platform.events.EventIdFormatter.fromTransactionId
Expand Down Expand Up @@ -51,7 +52,10 @@ private[events] object RawBatch {
partial :+ lfValueTranslation.serialize(contractId, createArgument)
}

sealed abstract class Event(
// this unfortunate upper bound is here to get access to
// [[Event.Specific#applySerialization]]; we would do away with it if
// [[PartialParameters#applySerialization]] was statically determined
final case class Event[+Specific <: Event.Specific](
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
Expand All @@ -62,9 +66,10 @@ private[events] object RawBatch {
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
specific: Specific,
) extends PartialParameters {
final protected val eventId = fromTransactionId(transactionId, nodeId)
final protected val base: Vector[NamedParameter] =
private[this] val eventId = fromTransactionId(transactionId, nodeId)
private[this] val base: Vector[NamedParameter] =
Vector[NamedParameter](
"event_id" -> eventId,
"event_offset" -> offset,
Expand All @@ -78,87 +83,60 @@ private[events] object RawBatch {
"flat_event_witnesses" -> Party.Array(flatWitnesses.toSeq: _*),
"tree_event_witnesses" -> Party.Array(treeWitnesses.toSeq: _*),
)

override def applySerialization(
lfValueTranslation: LfValueTranslation): Vector[NamedParameter] =
base ++ specific.applySerialization(transactionId, eventId, lfValueTranslation)
}

object Event {

sealed abstract class Specific {
private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter]
}

final class Created(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
create: Create,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> create.coid.coid,
"template_id" -> create.coinst.template,
"create_signatories" -> create.signatories.toArray[String],
"create_observers" -> create.stakeholders.diff(create.signatories).toArray[String],
"create_agreement_text" -> Some(create.coinst.agreementText).filter(_.nonEmpty),
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, create)
}

final class Exercised(
applicationId: Option[ApplicationId],
workflowId: Option[WorkflowId],
commandId: Option[CommandId],
transactionId: TransactionId,
nodeId: NodeId,
submitter: Option[Party],
ledgerEffectiveTime: Instant,
offset: Offset,
flatWitnesses: Set[Party],
treeWitnesses: Set[Party],
exercise: Exercise,
) extends Event(
applicationId = applicationId,
workflowId = workflowId,
commandId = commandId,
transactionId = transactionId,
nodeId = nodeId,
submitter = submitter,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
flatWitnesses = flatWitnesses,
treeWitnesses = treeWitnesses,
) {
val partial: Vector[NamedParameter] =
base ++ Vector[NamedParameter](
) extends Specific {
private val partial: Vector[NamedParameter] =
Vector[NamedParameter](
"contract_id" -> exercise.targetCoid,
"template_id" -> exercise.templateId,
"exercise_consuming" -> exercise.consuming,
"exercise_choice" -> exercise.choiceId,
"exercise_actors" -> exercise.actingParties.toArray[String],
"exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String],
)
override def applySerialization(
override private[Event] def applySerialization(
transactionId: TransactionId,
eventId: EventId,
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, exercise)
(partial :+ ("exercise_child_event_ids" -> exercise.children
.map(fromTransactionId(transactionId, _))
.toArray[String]: NamedParameter)) ++ lfValueTranslation.serialize(eventId, exercise)
}

}
Expand Down