Skip to content

Commit

Permalink
Add two phase retrieval merge function [DPP-718] (#11571)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 10, 2021
1 parent 743ee46 commit a6f745f
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.Future

Expand Down Expand Up @@ -135,4 +136,172 @@ private[events] object FilterTableACSReader {
}
}
}

/** A stateful merge function to be used in akka-streams statefulMapConcat.
* This merge function receives a stream of id-ranges where each range is associated with a "task",
* and creates an evenly batched stream of ordered and deduplicated ids.
*
* @param tasks The initial set of tasks. This function expects for each task to receive:
* - the input id ranges must be monotonically increasing
* (both in one batch, and all batches related to the same task)
* - the size of all input id ranges except the last one must be exactly `inputBatchSize`
* - the size of the last input id range must not be equal to `inputBatchSize` (but may be empty)
* @param outputBatchSize The output stream will contain batches with this size, except the last one
* @param inputBatchSize Batch size of the input id ranges
*/
def mergeIdStreams[TASK](
tasks: Iterable[TASK],
outputBatchSize: Int,
inputBatchSize: Int,
): () => ((TASK, Iterable[Long])) => Vector[Vector[Long]] = () => {
val outputQueue = new BatchedDistinctOutputQueue(outputBatchSize)
val taskQueue = new MergingTaskQueue[TASK](outputQueue.push)
val taskTracker = new TaskTracker[TASK](tasks, inputBatchSize)

{ case (task, ids) =>
@tailrec def go(next: (Option[(Iterable[Long], TASK)], Boolean)): Unit = {
next._1.foreach(taskQueue.push)
if (next._2) taskQueue.runUntilATaskEmpty match {
case Some(task) => go(taskTracker.finished(task))
case None => outputQueue.flushPartialBatch()
}
else ()
}
go(taskTracker.add(task, ids))
outputQueue.flushOutput
}
}

/** Helper class to encapsulate stateful output batching, and deduplication.
*/
class BatchedDistinctOutputQueue(batchSize: Int) {
private var last: Long = -1
private var buff: Array[Long] = Array.ofDim(batchSize)
private var buffIndex: Int = 0
private var output: Vector[Vector[Long]] = Vector.empty

/** Add one Long entry to the output.
*/
def push(l: Long): Unit = {
if (last != l) {
buff.update(buffIndex, l)
buffIndex += 1
last = l
}
if (buffIndex == batchSize) {
output = output :+ buff.toVector
buff = Array.ofDim(batchSize)
buffIndex = 0
}
}

/** @return all the currently available buffered output
*/
def flushOutput: Vector[Vector[Long]] = {
val result = output
output = Vector.empty
result
}

/** Calling this function adds the current partially filled batch to the output returned by flushOutput.
* This need to be called in order to retrieve all results if processing finishes
* (normally only evenly sized batches emitted, and partial results will be buffered).
*/
def flushPartialBatch(): Unit =
if (buffIndex != 0) {
output = output :+ buff.view.take(buffIndex).toVector
buff = Array.ofDim(batchSize)
buffIndex = 0
}
}

/** Helper class to encapsulate stateful merging of multiple ordered streams.
*/
class MergingTaskQueue[TASK](output: Long => Unit) {
private val iteratorQueue: mutable.PriorityQueue[(Long, Iterator[Long], TASK)] =
new mutable.PriorityQueue()(
Ordering.by[(Long, Iterator[Long], TASK), Long](_._1).reverse
)

/** Adding a new task to the queue
* @param task the id sequence received from upstream and the task identifier.
*/
def push(task: (Iterable[Long], TASK)): Unit = {
val iterator = task._1.iterator
if (iterator.hasNext)
iteratorQueue.enqueue((iterator.next(), iterator, task._2))
}

/** Consume all task's iterators until the first one completes.
* This populates the merged sequence to the output callback function.
*
* @return Some TASK in case an iterator finished for one, or None in case the whole processing is finished.
*/
def runUntilATaskEmpty: Option[TASK] = {
@tailrec def go(): Option[TASK] = if (iteratorQueue.isEmpty) None
else {
val (elem, iterator, task) = iteratorQueue.dequeue()
output(elem)
if (iterator.hasNext) {
iteratorQueue.enqueue((iterator.next(), iterator, task))
go()
} else {
Some(task)
}
}
go()
}
}

/** Helper class to encapsulate stateful tracking of task streams.
*/
class TaskTracker[TASK](allTasks: Iterable[TASK], inputBatchSize: Int) {
private val idle: mutable.Set[TASK] = mutable.Set.empty
private val queuedRanges: mutable.Map[TASK, Vector[Iterable[Long]]] = mutable.Map.empty

idle ++= allTasks

/** Add one entry from upstream
* @param task the TASK identifier
* @param ids the ordered sequence of ids
* @return An optional entry to be added to the MergingTaskQueue, and a flag if further merging needed
*/
def add(task: TASK, ids: Iterable[Long]): (Option[(Iterable[Long], TASK)], Boolean) = {
val toEnqueue =
if (idle(task)) queueEntry(task, ids)
else {
queuedRanges += (task -> queuedRanges.getOrElse(task, Vector.empty).:+(ids))
None
}
if (
ids.nonEmpty && ids.size < inputBatchSize
) // add one more empty signalling the end of one task-stream
queuedRanges += (task -> queuedRanges.getOrElse(task, Vector.empty).:+(Vector.empty))
idle -= task
(toEnqueue, idle.isEmpty)
}

/** If merging finished with a TASK running out of elements, this method will populate a suitable continuation,
* if applicable.
* @return An optional entry to be added to the MergingTaskQueue, and a flag if further merging needed
*/
def finished(task: TASK): (Option[(Iterable[Long], TASK)], Boolean) =
queuedRanges.get(task) match {
case Some(idsQueue) =>
val newIdsQueue = idsQueue.drop(1)
if (newIdsQueue.isEmpty) queuedRanges.remove(task)
else queuedRanges += (task -> newIdsQueue)
(queueEntry(task, idsQueue.head), true)
case None =>
idle.add(task)
(None, false)
}

private def queueEntry(task: TASK, ids: Iterable[Long]): Option[(Iterable[Long], TASK)] =
if (ids.isEmpty) None
else {
Some((ids, task))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,34 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
succeed
}
}

behavior of "mergeIdStreams"

it should "merge, deduplicate and batch a stream of 3" in {
val mutableLogic = FilterTableACSReader.mergeIdStreams(
tasks = List("a", "b", "c"),
outputBatchSize = 3,
inputBatchSize = 2,
)()
mutableLogic("a" -> List(1, 3)) shouldBe Nil // a [1 3] b [] c []
mutableLogic("a" -> List(5, 7)) shouldBe Nil // a [1 3 5 7] b [] c []
mutableLogic("a" -> List(9, 11)) shouldBe Nil // a [1 3 5 7 9 11] b [] c []
mutableLogic("b" -> List(2, 4)) shouldBe Nil // a [1 3 5 7 9 11] b [2 4] c []
mutableLogic("b" -> List(6, 8)) shouldBe Nil // a [1 3 5 7 9 11] b [2 4 6 8] c []
mutableLogic("b" -> List(10, 12)) shouldBe Nil // a [1 3 5 7 9 11] b [2 4 6 8 10 12] c []
mutableLogic("c" -> List(10, 14)) shouldBe List( // a [] b [12] c [14] stashed 10 11
List(1, 2, 3),
List(4, 5, 6),
List(7, 8, 9),
) // stashed: 10, 11
mutableLogic("a" -> List(12, 13)) shouldBe List(
List(10, 11, 12)
) // a [13] b [] c [14] stashed 10 11 12
mutableLogic("b" -> List(13)) shouldBe Nil // a [] c [14] stashed: 13
mutableLogic("c" -> List(15, 16)) shouldBe Nil // a [] c [14 15 16] stashed: 13
mutableLogic("a" -> Nil) shouldBe List(List(13, 14, 15)) // c [] stashed: 16
mutableLogic("c" -> List(16, 17)) shouldBe Nil // c [] stashed: 16 17
mutableLogic("c" -> List(18, 19)) shouldBe List(List(16, 17, 18)) // c [] stashed: 19
mutableLogic("c" -> List(20)) shouldBe List(List(19, 20))
}
}

0 comments on commit a6f745f

Please sign in to comment.