Skip to content

Commit

Permalink
Fix handling of transient contracts in extractor (#7210)
Browse files Browse the repository at this point in the history
* Fix handling of transient contracts in extractor

fixes #7201

As described in the issue, the problem was that we completely ignored
the order of events and always processed exercises before creates
which falls apart as soon as they are in the same transaction.

I still think that in the current state doing creates before exercises
would be fine but this seems like the kind of assumption that breaks
very quickly during refactoring (e.g. let’s say you start creating
foreign key references) so instead this PR fixes this properly by
iterating over the transaction tree in pre-order.

This turns out to be a bit more complex than I expected it to be since
we filter the tree on the client side and afterwards we have lost
enough information so that we cannot traverse the tree instead
anymore. Therefore, the “tree” type in extractor now contains a list
of events in pre-order and a comment that it’s not actually a valid
tree or subtransaction.

changelog_begin

- [Extractor] Fix a bug that resulted in transient contracts (created
  and archived within the same transaction) to not be displayed as
  archived. See #7201 for details.

changelog_end

* Update extractor/src/main/scala/com/digitalasset/extractor/ledger/types/TransactionTree.scala

Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com>

* Suffix with template name

changelog_begin
changelog_end

Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com>
  • Loading branch information
cocreature and aherrmann-da authored Aug 24, 2020
1 parent 0e9616f commit c997f5d
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 108 deletions.
22 changes: 12 additions & 10 deletions extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.daml.auth.TokenHolder
import com.daml.extractor.Types._
import com.daml.extractor.config.{ExtractorConfig, SnapshotEndSetting}
import com.daml.extractor.helpers.FutureUtil.toFuture
import com.daml.extractor.helpers.{TemplateIds, TransactionTreeTrimmer}
import com.daml.extractor.helpers.{TemplateIds}
import com.daml.extractor.ledger.types.TransactionTree
import com.daml.extractor.ledger.types.TransactionTree._
import com.daml.extractor.writers.Writer
Expand All @@ -19,6 +19,7 @@ import com.daml.grpc.GrpcException
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.daml.ledger.api.v1.value.Identifier
import com.daml.ledger.api.{v1 => api}
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration._
Expand Down Expand Up @@ -166,9 +167,6 @@ class Extractor[T](config: ExtractorConfig, target: T)(
val transactionFilter = selectTransactions(config.parties)
logger.info(s"Setting transaction filter: ${transactionFilter}")

val trim: api.transaction.TransactionTree => api.transaction.TransactionTree =
TransactionTreeTrimmer.trim(parties, requestedTemplateIds)

RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
Expand All @@ -186,9 +184,8 @@ class Extractor[T](config: ExtractorConfig, target: T)(
tokenHolder.flatMap(_.token)
)
.via(killSwitch.flow)
.map(trim)
.collect {
case t if nonEmpty(t) => convertTransactionTree(t)
Function.unlift(convertTransactionTree(parties, requestedTemplateIds))
}
.mapAsync(parallelism = 1) { t =>
writer
Expand All @@ -206,10 +203,15 @@ class Extractor[T](config: ExtractorConfig, target: T)(
.void
}

private def nonEmpty(t: api.transaction.TransactionTree): Boolean = t.eventsById.nonEmpty

private def convertTransactionTree(t: api.transaction.TransactionTree): TransactionTree =
t.convert.fold(e => throw DataIntegrityError(e), identity)
private def convertTransactionTree(parties: Set[String], templateIds: Set[Identifier])(
t: api.transaction.TransactionTree): Option[TransactionTree] = {
val tree = t.convert(parties, templateIds).fold(e => throw DataIntegrityError(e), identity)
if (tree.events.nonEmpty) {
Some(tree)
} else {
None
}
}

/**
* We encountered a transaction that reference a previously not witnessed type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@
package com.daml.extractor.helpers

import com.daml.ledger.api.v1.transaction.TreeEvent.Kind
import com.daml.ledger.api.v1.transaction.{TransactionTree, TreeEvent}
import com.daml.ledger.api.v1.transaction.TreeEvent
import com.daml.ledger.api.v1.value.Identifier

object TransactionTreeTrimmer {
def trim(
parties: Set[String],
templateIds: Set[Identifier]): TransactionTree => TransactionTree = {
val shouldKeep: TreeEvent.Kind => Boolean = event =>
(templateIds.isEmpty || containsTemplateId(templateIds.map(asTuple))(event)) &&
exerciseEventOrStakeholder(parties)(event)
transactionTree: TransactionTree =>
{
val eventsById = transactionTree.eventsById.filter(kv => shouldKeep(kv._2.kind))
val eventIds = eventsById.keySet
val rootEventIds = transactionTree.rootEventIds.filter(eventIds)
transactionTree.copy(eventsById = eventsById, rootEventIds = rootEventIds)
}
}
def shouldKeep(parties: Set[String], templateIds: Set[Identifier])(
event: TreeEvent.Kind): Boolean =
(templateIds.isEmpty || containsTemplateId(templateIds.map(asTuple))(event)) &&
exerciseEventOrStakeholder(parties)(event)

private def containsTemplateId(
templateIds: Set[(String, String, String)]): TreeEvent.Kind => Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final case class ExercisedEvent(
actingParties: Set[String],
consuming: Boolean,
witnessParties: Set[String],
childEventIds: Set[String]
childEventIds: Seq[String]
) extends Event

object Event {
Expand Down Expand Up @@ -88,7 +88,7 @@ object Event {
apiEvent.actingParties.toSet,
apiEvent.consuming,
apiEvent.witnessParties.toSet,
apiEvent.childEventIds.toSet
apiEvent.childEventIds
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@ import java.time.Instant
import scalaz._
import Scalaz._
import api.transaction.TreeEvent.Kind
import com.daml.extractor.helpers.TransactionTreeTrimmer

import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

// This is the state _after_ applying template and party filtering.
// This means that this is no longer a valid subtransaction or tree.
// In particular, there can be events that are not reachable from any
// root node (e.g. because the root nodes were all for different templates)
// and the child event ids can point to nodes that have been filtered out.
final case class TransactionTree(
transactionId: String,
workflowId: String,
effectiveAt: Instant,
offset: String,
events: Map[String, Event],
rootEventIds: Set[String]
// Ordered in pre-order (the order in which the events happen on the ledger).
events: List[(String, Event)],
rootEventIds: Seq[String],
)

object TransactionTree {
Expand All @@ -28,21 +38,59 @@ object TransactionTree {

final implicit class ApiTransactionOps(val apiTransaction: api.transaction.TransactionTree)
extends AnyVal {
def convert: String \/ TransactionTree =
private def filteredEvents(
parties: Set[String],
templateIds: Set[api.value.Identifier]): List[(String, api.transaction.TreeEvent.Kind)] = {
val events = ListBuffer.empty[(String, api.transaction.TreeEvent.Kind)]
foreach {
case (id, ev) if TransactionTreeTrimmer.shouldKeep(parties, templateIds)(ev) =>
events += ((id, ev))
case _ =>
}
events.result
}
def convert(
parties: Set[String],
templateIds: Set[api.value.Identifier]): String \/ TransactionTree =
for {
apiEffectiveAt <- effectiveAtLens(apiTransaction)
effectiveAt = TimestampConversion.toInstant(apiEffectiveAt)
events <- apiTransaction.eventsById.toList.traverse(kv =>
kv._2.kind.convert.map(kv._1 -> _))
events <- ApiTransactionOps(apiTransaction)
.filteredEvents(parties, templateIds)
.traverse(kv => kv._2.kind.convert.map(kv._1 -> _))
kept = events.map(_._1).toSet
} yield
TransactionTree(
apiTransaction.transactionId,
apiTransaction.workflowId,
effectiveAt,
apiTransaction.offset,
events.toMap,
apiTransaction.rootEventIds.toSet
events,
apiTransaction.rootEventIds.filter(kept)
)
// pre-order traversal over the transaction tree. This is the equivalent of
// the traversal in com.daml.lf.transaction for the client side.
private def foreach(f: (String, api.transaction.TreeEvent.Kind) => Unit): Unit = {
@tailrec
def go(toVisit: List[String]): Unit = toVisit match {
case id :: toVisit =>
apiTransaction.eventsById.get(id) match {
case None =>
throw new IllegalArgumentException(s"Missing event id in transaction tree: $id")
case Some(node) =>
f(id, node.kind)
node.kind match {
case Kind.Exercised(e) =>
go(List(e.childEventIds: _*) ++ toVisit)
case Kind.Created(_) =>
go(toVisit)
case Kind.Empty =>
}
}
case Nil =>
}
go(List(apiTransaction.rootEventIds: _*))
}
}

final implicit class TreeEventKindOps(val kind: api.transaction.TreeEvent.Kind) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,47 +170,28 @@ class PostgreSQLWriter(config: ExtractorConfig, target: PostgreSQLTarget, ledger

val insertIO = insertTransaction(transaction).update.run.void

val createdEvents: List[CreatedEvent] = transaction.events.values.collect {
case e @ CreatedEvent(_, _, _, _, _) => e
}(scala.collection.breakOut)
val events = transaction.events.map(_._2)

val exercisedEvents: List[ExercisedEvent] = transaction.events.values.collect {
case e @ ExercisedEvent(_, _, _, _, _, _, _, _, _) => e
}(scala.collection.breakOut)

logger.trace(s"Create events: ${com.daml.extractor.pformat(createdEvents)}")
logger.trace(s"Exercise events: ${com.daml.extractor.pformat(exercisedEvents)}")
logger.trace(s"Events events: ${com.daml.extractor.pformat(events)}")

(for {
archiveIOsMulti <- if (useMultiTableFormat)
exercisedEvents.traverse(
multiTableFormat.handleExercisedEvent(multiTableState, transaction, _)
)
else
List.empty[ConnectionIO[Unit]].right
createIOsMulti <- if (useMultiTableFormat)
createdEvents.traverse(
multiTableFormat.handleCreatedEvent(multiTableState, transaction, _)
)
else
List.empty[ConnectionIO[Unit]].right
archiveIOsSingle <- if (useSingleTableFormat)
exercisedEvents.traverse(
singleTableFormat.handleExercisedEvent(SingleTableState, transaction, _)
)
else
List.empty[ConnectionIO[Unit]].right
createIOsSingle <- if (useSingleTableFormat)
createdEvents.traverse(
singleTableFormat.handleCreatedEvent(SingleTableState, transaction, _)
)
else
List.empty[ConnectionIO[Unit]].right
statements <- events.traverse {
case e: CreatedEvent =>
if (useMultiTableFormat) {
multiTableFormat.handleCreatedEvent(multiTableState, transaction, e)
} else {
singleTableFormat.handleCreatedEvent(SingleTableState, transaction, e)
}
case e: ExercisedEvent =>
if (useMultiTableFormat) {
multiTableFormat.handleExercisedEvent(multiTableState, transaction, e)
} else {
singleTableFormat.handleExercisedEvent(SingleTableState, transaction, e)
}
}
} yield {
val sqlTransaction =
(archiveIOsMulti ++ createIOsMulti ++ archiveIOsSingle ++ createIOsSingle)
.foldLeft(insertIO)(_ *> _)

statements.foldLeft(insertIO)(_ *> _)
sqlTransaction.transact(xa).unsafeToFuture()
}).sequence
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait PrinterFunctionWriter { self: Writer =>
def handleTransaction(transaction: TransactionTree): Future[RefreshPackages \/ Unit] = {
printer(s"Handling transaction #${transaction.transactionId}...")
printer(s"Events:")
transaction.events.values.foreach(printEvent)
transaction.events.map(_._2).foreach(printEvent)

Future.successful(().right)
}
Expand Down
10 changes: 10 additions & 0 deletions extractor/src/test/resources/damls/TransactionExample.daml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ example = scenario do
submit bob do
exercise offer Accept

-- create a transient RightOfUseOffer and archive it immediately.
submit bob do
createAndExercise
RightOfUseOffer with
landlord = bob
tenant = alice
address = "fobar"
expirationDate = date 2020 Jan 1
Archive

templateFilterTest = scenario do
bob <- getParty "Bob"
alice <- getParty "Alice"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TransactionMultiTableSpec
override protected def outputFormat: String = "multi-table"

"Transactions" should "be extracted" in {
getTransactions should have length 2
getTransactions should have length 3
}

it should "be valid transactions" in {
Expand All @@ -63,45 +63,62 @@ class TransactionMultiTableSpec
it should "be transactions with different ids" in {
val transactions = getTransactions

transactions.map(_.transaction_id).toSet should have size 2
transactions.map(_.workflow_id).toSet should have size 2
transactions.map(_.seq).toSet should have size 2
transactions.map(_.ledger_offset).toSet should have size 2
transactions.map(_.transaction_id).toSet should have size 3
transactions.map(_.workflow_id).toSet should have size 3
transactions.map(_.seq).toSet should have size 3
transactions.map(_.ledger_offset).toSet should have size 3
}

"Exercises" should "be extracted" in {
getExercises should have length 1
getExercises should have length 2
}

"All the data" should "represent what went down in the scenario" in {
// `transaction1` created `contract1`, then
// `transaction2` created `exercise`, which archived `contract1` and resulted `contract2`

val List(transaction1, transaction2) = getTransactions.sortBy(_.seq)
val List(exercise) = getExercises
val List((archived_by_event_id1, transaction_id1, archived_by_transaction_id1)) =
val List(transaction1, transaction2, transaction3) = getTransactions.sortBy(_.seq)
val List(exercise1, exercise2) = getExercises
val List(
(archived_by_event_id_offer1, transaction_id_offer1, archived_by_transaction_id_offer1),
(archived_by_event_id_offer2, transaction_id_offer2, archived_by_transaction_id_offer2)) =
getResultList[(Option[String], String, Option[String])](
sql"SELECT _archived_by_event_id, _transaction_id, _archived_by_transaction_id FROM template.transactionexample_rightofuseoffer")
val List((event_id2, archived_by_event_id2, transaction_id2, archived_by_transaction_id2)) =
val List(
(
event_id_accept,
archived_by_event_id_accept,
transaction_id_accept,
archived_by_transaction_id_accept)) =
getResultList[(String, Option[String], String, Option[String])](
sql"SELECT _event_id, _archived_by_event_id, _transaction_id, _archived_by_transaction_id FROM template.transactionexample_rightofuseagreement")

// `transaction1` created `contract1`, then
transaction_id1 shouldEqual transaction1.transaction_id
// `transaction1` created `contract1` (first offer), then
transaction_id_offer1 shouldEqual transaction1.transaction_id

// `transaction2` created `exercise`
exercise.transaction_id shouldEqual transaction2.transaction_id
// `transaction2` created `exercise1`
exercise1.transaction_id shouldEqual transaction2.transaction_id

// `exercised` archived `contract1`
archived_by_transaction_id1 shouldEqual Some(transaction2.transaction_id)
archived_by_event_id1 shouldEqual Some(exercise.event_id)
// `exercise1` archived `contract1` (first offer)
archived_by_transaction_id_offer1 shouldEqual Some(transaction2.transaction_id)
archived_by_event_id_offer1 shouldEqual Some(exercise1.event_id)

// ... while it resulted in `contract2`
exercise.child_event_ids.asArray.toList.toVector.flatten should contain(event_id2.asJson)
transaction_id2 shouldEqual transaction2.transaction_id
// ... while it resulted in `contract2` (first accept)
exercise1.child_event_ids.asArray.toList.toVector.flatten should contain(event_id_accept.asJson)
transaction_id_accept shouldEqual transaction2.transaction_id
// which is not archived
archived_by_transaction_id2 shouldEqual None
archived_by_event_id2 shouldEqual None
archived_by_transaction_id_accept shouldEqual None
archived_by_event_id_accept shouldEqual None

// `transaction3` (second containing an offer) created `contract3` (second offer); then
transaction_id_offer2 shouldEqual transaction3.transaction_id

// `transaction3` created `exercise2`
exercise2.transaction_id shouldEqual transaction3.transaction_id

// `exercise2` archived `contract3` (second offer)
archived_by_transaction_id_offer2 shouldEqual Some(transaction3.transaction_id)
archived_by_event_id_offer2 shouldEqual Some(exercise2.event_id)
}

}
Loading

0 comments on commit c997f5d

Please sign in to comment.