diff --git a/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala b/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala index 1467635db48e..b60c9a1205b1 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala @@ -10,7 +10,7 @@ import com.daml.auth.TokenHolder import com.daml.extractor.Types._ import com.daml.extractor.config.{ExtractorConfig, SnapshotEndSetting} import com.daml.extractor.helpers.FutureUtil.toFuture -import com.daml.extractor.helpers.{TemplateIds, TransactionTreeTrimmer} +import com.daml.extractor.helpers.{TemplateIds} import com.daml.extractor.ledger.types.TransactionTree import com.daml.extractor.ledger.types.TransactionTree._ import com.daml.extractor.writers.Writer @@ -19,6 +19,7 @@ import com.daml.grpc.GrpcException import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory} import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter} +import com.daml.ledger.api.v1.value.Identifier import com.daml.ledger.api.{v1 => api} import com.daml.ledger.client.LedgerClient import com.daml.ledger.client.configuration._ @@ -166,9 +167,6 @@ class Extractor[T](config: ExtractorConfig, target: T)( val transactionFilter = selectTransactions(config.parties) logger.info(s"Setting transaction filter: ${transactionFilter}") - val trim: api.transaction.TransactionTree => api.transaction.TransactionTree = - TransactionTreeTrimmer.trim(parties, requestedTemplateIds) - RestartSource .onFailuresWithBackoff( minBackoff = 3.seconds, @@ -186,9 +184,8 @@ class Extractor[T](config: ExtractorConfig, target: T)( tokenHolder.flatMap(_.token) ) .via(killSwitch.flow) - .map(trim) .collect { - case t if nonEmpty(t) => convertTransactionTree(t) + Function.unlift(convertTransactionTree(parties, requestedTemplateIds)) } .mapAsync(parallelism = 1) { t => writer @@ -206,10 +203,15 @@ class Extractor[T](config: ExtractorConfig, target: T)( .void } - private def nonEmpty(t: api.transaction.TransactionTree): Boolean = t.eventsById.nonEmpty - - private def convertTransactionTree(t: api.transaction.TransactionTree): TransactionTree = - t.convert.fold(e => throw DataIntegrityError(e), identity) + private def convertTransactionTree(parties: Set[String], templateIds: Set[Identifier])( + t: api.transaction.TransactionTree): Option[TransactionTree] = { + val tree = t.convert(parties, templateIds).fold(e => throw DataIntegrityError(e), identity) + if (tree.events.nonEmpty) { + Some(tree) + } else { + None + } + } /** * We encountered a transaction that reference a previously not witnessed type. diff --git a/extractor/src/main/scala/com/digitalasset/extractor/helpers/TransactionTreeTrimmer.scala b/extractor/src/main/scala/com/digitalasset/extractor/helpers/TransactionTreeTrimmer.scala index 211482a8aa63..06b6423fb173 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/helpers/TransactionTreeTrimmer.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/helpers/TransactionTreeTrimmer.scala @@ -4,24 +4,14 @@ package com.daml.extractor.helpers import com.daml.ledger.api.v1.transaction.TreeEvent.Kind -import com.daml.ledger.api.v1.transaction.{TransactionTree, TreeEvent} +import com.daml.ledger.api.v1.transaction.TreeEvent import com.daml.ledger.api.v1.value.Identifier object TransactionTreeTrimmer { - def trim( - parties: Set[String], - templateIds: Set[Identifier]): TransactionTree => TransactionTree = { - val shouldKeep: TreeEvent.Kind => Boolean = event => - (templateIds.isEmpty || containsTemplateId(templateIds.map(asTuple))(event)) && - exerciseEventOrStakeholder(parties)(event) - transactionTree: TransactionTree => - { - val eventsById = transactionTree.eventsById.filter(kv => shouldKeep(kv._2.kind)) - val eventIds = eventsById.keySet - val rootEventIds = transactionTree.rootEventIds.filter(eventIds) - transactionTree.copy(eventsById = eventsById, rootEventIds = rootEventIds) - } - } + def shouldKeep(parties: Set[String], templateIds: Set[Identifier])( + event: TreeEvent.Kind): Boolean = + (templateIds.isEmpty || containsTemplateId(templateIds.map(asTuple))(event)) && + exerciseEventOrStakeholder(parties)(event) private def containsTemplateId( templateIds: Set[(String, String, String)]): TreeEvent.Kind => Boolean = { diff --git a/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/Event.scala b/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/Event.scala index 9861aab2e451..fe99a5cd5c1e 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/Event.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/Event.scala @@ -31,7 +31,7 @@ final case class ExercisedEvent( actingParties: Set[String], consuming: Boolean, witnessParties: Set[String], - childEventIds: Set[String] + childEventIds: Seq[String] ) extends Event object Event { @@ -88,7 +88,7 @@ object Event { apiEvent.actingParties.toSet, apiEvent.consuming, apiEvent.witnessParties.toSet, - apiEvent.childEventIds.toSet + apiEvent.childEventIds ) } } diff --git a/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/TransactionTree.scala b/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/TransactionTree.scala index f86cb867939f..fc52e85f31d5 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/TransactionTree.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/ledger/types/TransactionTree.scala @@ -12,14 +12,24 @@ import java.time.Instant import scalaz._ import Scalaz._ import api.transaction.TreeEvent.Kind +import com.daml.extractor.helpers.TransactionTreeTrimmer +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer + +// This is the state _after_ applying template and party filtering. +// This means that this is no longer a valid subtransaction or tree. +// In particular, there can be events that are not reachable from any +// root node (e.g. because the root nodes were all for different templates) +// and the child event ids can point to nodes that have been filtered out. final case class TransactionTree( transactionId: String, workflowId: String, effectiveAt: Instant, offset: String, - events: Map[String, Event], - rootEventIds: Set[String] + // Ordered in pre-order (the order in which the events happen on the ledger). + events: List[(String, Event)], + rootEventIds: Seq[String], ) object TransactionTree { @@ -28,21 +38,59 @@ object TransactionTree { final implicit class ApiTransactionOps(val apiTransaction: api.transaction.TransactionTree) extends AnyVal { - def convert: String \/ TransactionTree = + private def filteredEvents( + parties: Set[String], + templateIds: Set[api.value.Identifier]): List[(String, api.transaction.TreeEvent.Kind)] = { + val events = ListBuffer.empty[(String, api.transaction.TreeEvent.Kind)] + foreach { + case (id, ev) if TransactionTreeTrimmer.shouldKeep(parties, templateIds)(ev) => + events += ((id, ev)) + case _ => + } + events.result + } + def convert( + parties: Set[String], + templateIds: Set[api.value.Identifier]): String \/ TransactionTree = for { apiEffectiveAt <- effectiveAtLens(apiTransaction) effectiveAt = TimestampConversion.toInstant(apiEffectiveAt) - events <- apiTransaction.eventsById.toList.traverse(kv => - kv._2.kind.convert.map(kv._1 -> _)) + events <- ApiTransactionOps(apiTransaction) + .filteredEvents(parties, templateIds) + .traverse(kv => kv._2.kind.convert.map(kv._1 -> _)) + kept = events.map(_._1).toSet } yield TransactionTree( apiTransaction.transactionId, apiTransaction.workflowId, effectiveAt, apiTransaction.offset, - events.toMap, - apiTransaction.rootEventIds.toSet + events, + apiTransaction.rootEventIds.filter(kept) ) + // pre-order traversal over the transaction tree. This is the equivalent of + // the traversal in com.daml.lf.transaction for the client side. + private def foreach(f: (String, api.transaction.TreeEvent.Kind) => Unit): Unit = { + @tailrec + def go(toVisit: List[String]): Unit = toVisit match { + case id :: toVisit => + apiTransaction.eventsById.get(id) match { + case None => + throw new IllegalArgumentException(s"Missing event id in transaction tree: $id") + case Some(node) => + f(id, node.kind) + node.kind match { + case Kind.Exercised(e) => + go(List(e.childEventIds: _*) ++ toVisit) + case Kind.Created(_) => + go(toVisit) + case Kind.Empty => + } + } + case Nil => + } + go(List(apiTransaction.rootEventIds: _*)) + } } final implicit class TreeEventKindOps(val kind: api.transaction.TreeEvent.Kind) extends AnyVal { diff --git a/extractor/src/main/scala/com/digitalasset/extractor/writers/PostgreSQLWriter.scala b/extractor/src/main/scala/com/digitalasset/extractor/writers/PostgreSQLWriter.scala index 0e33e3893cef..3e567b7bc405 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/writers/PostgreSQLWriter.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/writers/PostgreSQLWriter.scala @@ -170,47 +170,28 @@ class PostgreSQLWriter(config: ExtractorConfig, target: PostgreSQLTarget, ledger val insertIO = insertTransaction(transaction).update.run.void - val createdEvents: List[CreatedEvent] = transaction.events.values.collect { - case e @ CreatedEvent(_, _, _, _, _) => e - }(scala.collection.breakOut) + val events = transaction.events.map(_._2) - val exercisedEvents: List[ExercisedEvent] = transaction.events.values.collect { - case e @ ExercisedEvent(_, _, _, _, _, _, _, _, _) => e - }(scala.collection.breakOut) - - logger.trace(s"Create events: ${com.daml.extractor.pformat(createdEvents)}") - logger.trace(s"Exercise events: ${com.daml.extractor.pformat(exercisedEvents)}") + logger.trace(s"Events events: ${com.daml.extractor.pformat(events)}") (for { - archiveIOsMulti <- if (useMultiTableFormat) - exercisedEvents.traverse( - multiTableFormat.handleExercisedEvent(multiTableState, transaction, _) - ) - else - List.empty[ConnectionIO[Unit]].right - createIOsMulti <- if (useMultiTableFormat) - createdEvents.traverse( - multiTableFormat.handleCreatedEvent(multiTableState, transaction, _) - ) - else - List.empty[ConnectionIO[Unit]].right - archiveIOsSingle <- if (useSingleTableFormat) - exercisedEvents.traverse( - singleTableFormat.handleExercisedEvent(SingleTableState, transaction, _) - ) - else - List.empty[ConnectionIO[Unit]].right - createIOsSingle <- if (useSingleTableFormat) - createdEvents.traverse( - singleTableFormat.handleCreatedEvent(SingleTableState, transaction, _) - ) - else - List.empty[ConnectionIO[Unit]].right + statements <- events.traverse { + case e: CreatedEvent => + if (useMultiTableFormat) { + multiTableFormat.handleCreatedEvent(multiTableState, transaction, e) + } else { + singleTableFormat.handleCreatedEvent(SingleTableState, transaction, e) + } + case e: ExercisedEvent => + if (useMultiTableFormat) { + multiTableFormat.handleExercisedEvent(multiTableState, transaction, e) + } else { + singleTableFormat.handleExercisedEvent(SingleTableState, transaction, e) + } + } } yield { val sqlTransaction = - (archiveIOsMulti ++ createIOsMulti ++ archiveIOsSingle ++ createIOsSingle) - .foldLeft(insertIO)(_ *> _) - + statements.foldLeft(insertIO)(_ *> _) sqlTransaction.transact(xa).unsafeToFuture() }).sequence } diff --git a/extractor/src/main/scala/com/digitalasset/extractor/writers/PrinterFunctionWriter.scala b/extractor/src/main/scala/com/digitalasset/extractor/writers/PrinterFunctionWriter.scala index b87947237107..57cb0a4b95fe 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/writers/PrinterFunctionWriter.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/writers/PrinterFunctionWriter.scala @@ -40,7 +40,7 @@ trait PrinterFunctionWriter { self: Writer => def handleTransaction(transaction: TransactionTree): Future[RefreshPackages \/ Unit] = { printer(s"Handling transaction #${transaction.transactionId}...") printer(s"Events:") - transaction.events.values.foreach(printEvent) + transaction.events.map(_._2).foreach(printEvent) Future.successful(().right) } diff --git a/extractor/src/test/resources/damls/TransactionExample.daml b/extractor/src/test/resources/damls/TransactionExample.daml index 0a55b309e3f4..38be396280fc 100644 --- a/extractor/src/test/resources/damls/TransactionExample.daml +++ b/extractor/src/test/resources/damls/TransactionExample.daml @@ -63,6 +63,16 @@ example = scenario do submit bob do exercise offer Accept + -- create a transient RightOfUseOffer and archive it immediately. + submit bob do + createAndExercise + RightOfUseOffer with + landlord = bob + tenant = alice + address = "fobar" + expirationDate = date 2020 Jan 1 + Archive + templateFilterTest = scenario do bob <- getParty "Bob" alice <- getParty "Alice" diff --git a/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionMultiTableSpec.scala b/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionMultiTableSpec.scala index 378f2d8aa022..1e6e93d1b404 100644 --- a/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionMultiTableSpec.scala +++ b/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionMultiTableSpec.scala @@ -36,7 +36,7 @@ class TransactionMultiTableSpec override protected def outputFormat: String = "multi-table" "Transactions" should "be extracted" in { - getTransactions should have length 2 + getTransactions should have length 3 } it should "be valid transactions" in { @@ -63,45 +63,62 @@ class TransactionMultiTableSpec it should "be transactions with different ids" in { val transactions = getTransactions - transactions.map(_.transaction_id).toSet should have size 2 - transactions.map(_.workflow_id).toSet should have size 2 - transactions.map(_.seq).toSet should have size 2 - transactions.map(_.ledger_offset).toSet should have size 2 + transactions.map(_.transaction_id).toSet should have size 3 + transactions.map(_.workflow_id).toSet should have size 3 + transactions.map(_.seq).toSet should have size 3 + transactions.map(_.ledger_offset).toSet should have size 3 } "Exercises" should "be extracted" in { - getExercises should have length 1 + getExercises should have length 2 } "All the data" should "represent what went down in the scenario" in { // `transaction1` created `contract1`, then // `transaction2` created `exercise`, which archived `contract1` and resulted `contract2` - val List(transaction1, transaction2) = getTransactions.sortBy(_.seq) - val List(exercise) = getExercises - val List((archived_by_event_id1, transaction_id1, archived_by_transaction_id1)) = + val List(transaction1, transaction2, transaction3) = getTransactions.sortBy(_.seq) + val List(exercise1, exercise2) = getExercises + val List( + (archived_by_event_id_offer1, transaction_id_offer1, archived_by_transaction_id_offer1), + (archived_by_event_id_offer2, transaction_id_offer2, archived_by_transaction_id_offer2)) = getResultList[(Option[String], String, Option[String])]( sql"SELECT _archived_by_event_id, _transaction_id, _archived_by_transaction_id FROM template.transactionexample_rightofuseoffer") - val List((event_id2, archived_by_event_id2, transaction_id2, archived_by_transaction_id2)) = + val List( + ( + event_id_accept, + archived_by_event_id_accept, + transaction_id_accept, + archived_by_transaction_id_accept)) = getResultList[(String, Option[String], String, Option[String])]( sql"SELECT _event_id, _archived_by_event_id, _transaction_id, _archived_by_transaction_id FROM template.transactionexample_rightofuseagreement") - // `transaction1` created `contract1`, then - transaction_id1 shouldEqual transaction1.transaction_id + // `transaction1` created `contract1` (first offer), then + transaction_id_offer1 shouldEqual transaction1.transaction_id - // `transaction2` created `exercise` - exercise.transaction_id shouldEqual transaction2.transaction_id + // `transaction2` created `exercise1` + exercise1.transaction_id shouldEqual transaction2.transaction_id - // `exercised` archived `contract1` - archived_by_transaction_id1 shouldEqual Some(transaction2.transaction_id) - archived_by_event_id1 shouldEqual Some(exercise.event_id) + // `exercise1` archived `contract1` (first offer) + archived_by_transaction_id_offer1 shouldEqual Some(transaction2.transaction_id) + archived_by_event_id_offer1 shouldEqual Some(exercise1.event_id) - // ... while it resulted in `contract2` - exercise.child_event_ids.asArray.toList.toVector.flatten should contain(event_id2.asJson) - transaction_id2 shouldEqual transaction2.transaction_id + // ... while it resulted in `contract2` (first accept) + exercise1.child_event_ids.asArray.toList.toVector.flatten should contain(event_id_accept.asJson) + transaction_id_accept shouldEqual transaction2.transaction_id // which is not archived - archived_by_transaction_id2 shouldEqual None - archived_by_event_id2 shouldEqual None + archived_by_transaction_id_accept shouldEqual None + archived_by_event_id_accept shouldEqual None + + // `transaction3` (second containing an offer) created `contract3` (second offer); then + transaction_id_offer2 shouldEqual transaction3.transaction_id + + // `transaction3` created `exercise2` + exercise2.transaction_id shouldEqual transaction3.transaction_id + + // `exercise2` archived `contract3` (second offer) + archived_by_transaction_id_offer2 shouldEqual Some(transaction3.transaction_id) + archived_by_event_id_offer2 shouldEqual Some(exercise2.event_id) } } diff --git a/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionSingleTableSpec.scala b/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionSingleTableSpec.scala index 9035743dab48..47fe9f3602e2 100644 --- a/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionSingleTableSpec.scala +++ b/extractor/src/test/suite/scala/com/digitalasset/extractor/TransactionSingleTableSpec.scala @@ -33,7 +33,7 @@ class TransactionSingleTableSpec override def scenario: Option[String] = Some("TransactionExample:example") "Transactions" should "be extracted" in { - getTransactions should have length 2 + getTransactions should have length 3 } it should "be valid transactions" in { @@ -60,45 +60,53 @@ class TransactionSingleTableSpec it should "be transactions with different ids" in { val transactions = getTransactions - transactions.map(_.transaction_id).toSet should have size 2 - transactions.map(_.workflow_id).toSet should have size 2 - transactions.map(_.seq).toSet should have size 2 - transactions.map(_.ledger_offset).toSet should have size 2 + transactions.map(_.transaction_id).toSet should have size 3 + transactions.map(_.workflow_id).toSet should have size 3 + transactions.map(_.seq).toSet should have size 3 + transactions.map(_.ledger_offset).toSet should have size 3 } "Exercises" should "be extracted" in { - getExercises should have length 1 + getExercises should have length 2 } "Contracts" should "be extracted" in { - getContracts should have length 2 + getContracts should have length 3 } "All the data" should "represent what went down in the scenario" in { // `transaction1` created `contract1`, then // `transaction2` created `exercise`, which archived `contract1` and resulted `contract2` - val List(transaction1, transaction2) = getTransactions.sortBy(_.seq) - val List(exercise) = getExercises - val List(contract1, contract2) = getContracts + val List(transaction1, transaction2, transaction3) = getTransactions.sortBy(_.seq) + val List(exercise1, exercise2) = getExercises + val List(contract1, contract2, contract3) = getContracts // `transaction1` created `contract1`, then contract1.transaction_id shouldEqual transaction1.transaction_id - // `transaction2` created `exercise` - exercise.transaction_id shouldEqual transaction2.transaction_id + // `transaction2` created `exercise1` + exercise1.transaction_id shouldEqual transaction2.transaction_id - // `exercised` archived `contract1` + // `exercise1` archived `contract1` contract1.archived_by_transaction_id shouldEqual Some(transaction2.transaction_id) - contract1.archived_by_event_id shouldEqual Some(exercise.event_id) + contract1.archived_by_event_id shouldEqual Some(exercise1.event_id) // ... while it resulted in `contract2` - exercise.child_event_ids.asArray.toList.toVector.flatten should contain( + exercise1.child_event_ids.asArray.toList.toVector.flatten should contain( contract2.event_id.asJson) contract2.transaction_id shouldEqual transaction2.transaction_id // which is not archived contract2.archived_by_transaction_id shouldEqual None contract2.archived_by_event_id shouldEqual None + + // `transaction3` created `contract3` + contract3.transaction_id shouldEqual transaction3.transaction_id + // `transaction3` created `exercise2` + exercise2.transaction_id shouldEqual transaction3.transaction_id + // `exercise2` archived `contract3` + contract3.archived_by_transaction_id shouldEqual Some(transaction3.transaction_id) + contract3.archived_by_event_id shouldEqual Some(exercise2.event_id) } }