-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
EventsBuffer implementation for in-memory fan-out for Ledger API serving
CHANGELOG_BEGIN CHANGELOG_END
- Loading branch information
Showing
3 changed files
with
380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
174 changes: 174 additions & 0 deletions
174
ledger/participant-integration-api/src/main/scala/platform/store/cache/EventsBuffer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.store.cache | ||
|
||
import com.daml.metrics.{Metrics, Timed} | ||
import com.daml.platform.store.cache.EventsBuffer.{ | ||
BufferStateRef, | ||
RequestOffBufferBounds, | ||
SearchableByVector, | ||
UnorderedException, | ||
} | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.Searching.{Found, InsertionPoint, SearchResult} | ||
import scala.math.Ordering | ||
import scala.math.Ordering.Implicits.infixOrderingOps | ||
|
||
/** An ordered-by-offset ring buffer. | ||
* | ||
* The buffer allows appending only elements with strictly increasing offsets. | ||
* | ||
* @param maxBufferSize The maximum buffer size. | ||
* @param metrics The DAML metrics. | ||
* @param bufferQualifier The qualifier used for metrics tag specialization. | ||
* @param isRangeEndMarker Identifies if an element [[E]] should be treated | ||
* as a range end marker, in which case the element would be treated | ||
* as a buffer range end updater and not appended to the actual buffer. | ||
* @tparam O The offset type. | ||
* @tparam E The entry buffer type. | ||
*/ | ||
private[cache] final class EventsBuffer[O: Ordering, E]( | ||
maxBufferSize: Int, | ||
metrics: Metrics, | ||
bufferQualifier: String, | ||
isRangeEndMarker: E => Boolean, | ||
) { | ||
@volatile private var _bufferStateRef = BufferStateRef[O, E]() | ||
|
||
private val pushTimer = metrics.daml.services.index.streamsBuffer.push(bufferQualifier) | ||
private val sliceTimer = metrics.daml.services.index.streamsBuffer.slice(bufferQualifier) | ||
private val pruneTimer = metrics.daml.services.index.streamsBuffer.prune(bufferQualifier) | ||
|
||
/** Appends a new event to the buffer. | ||
* | ||
* Starts evicting from the tail when `maxBufferSize` is reached. | ||
* | ||
* @param offset The event offset. | ||
* Must be higher than the last appended entry's offset, with the exception | ||
* of the range end marker, which can have an offset equal to the last appended element. | ||
* @param entry The buffer entry. | ||
*/ | ||
def push(offset: O, entry: E): Unit = | ||
Timed.value( | ||
pushTimer, | ||
synchronized { | ||
_bufferStateRef.rangeEnd.foreach { lastOffset => | ||
// Ensure vector grows with strictly monotonic offsets. | ||
// Only specially-designated range end markers are allowed | ||
// to have offsets equal to the buffer range end. | ||
if (lastOffset > offset || (lastOffset == offset && !isRangeEndMarker(entry))) { | ||
throw UnorderedException(lastOffset, offset) | ||
} | ||
} | ||
|
||
var auxBufferVector = _bufferStateRef.vector | ||
|
||
// The range end markers are not appended to the buffer | ||
if (!isRangeEndMarker(entry)) { | ||
if (auxBufferVector.size == maxBufferSize) { | ||
auxBufferVector = auxBufferVector.drop(1) | ||
} | ||
auxBufferVector = auxBufferVector :+ offset -> entry | ||
} | ||
|
||
// Update the buffer reference | ||
_bufferStateRef = BufferStateRef(auxBufferVector, Some(offset)) | ||
}, | ||
) | ||
|
||
/** Returns a slice of events from the buffer. | ||
* | ||
* Throws an exception if requested with `endInclusive` higher than the highest offset in the buffer. | ||
* | ||
* @param startExclusive The start exclusive bound of the requested range. | ||
* @param endInclusive The end inclusive bound of the requested range. | ||
* @return The series of events as an ordered vector satisfying the input bounds. | ||
*/ | ||
def slice(startExclusive: O, endInclusive: O): Vector[(O, E)] = | ||
Timed.value( | ||
sliceTimer, { | ||
val bufferSnapshot = _bufferStateRef | ||
if (bufferSnapshot.rangeEnd.exists(_ < endInclusive)) { | ||
throw RequestOffBufferBounds(bufferSnapshot.vector.last._1, endInclusive) | ||
} else if (bufferSnapshot.vector.isEmpty) { | ||
Vector.empty | ||
} else { | ||
val bufferEndExclusiveIdx = bufferSnapshot.vector.searchBy(endInclusive, _._1) match { | ||
case Found(foundIndex) => foundIndex + 1 | ||
case InsertionPoint(insertionPoint) => insertionPoint | ||
} | ||
|
||
val bufferStartInclusiveIdx = bufferSnapshot.vector.searchBy(startExclusive, _._1) match { | ||
case InsertionPoint(insertionPoint) => insertionPoint | ||
case Found(foundIndex) => foundIndex + 1 | ||
} | ||
|
||
bufferSnapshot.vector.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx) | ||
} | ||
}, | ||
) | ||
|
||
/** Removes entries starting from the buffer tail up until `endInclusive`. | ||
* | ||
* @param endInclusive The last inclusive (highest) buffer offset to be pruned. | ||
*/ | ||
def prune(endInclusive: O): Unit = | ||
Timed.value( | ||
pruneTimer, | ||
synchronized { | ||
_bufferStateRef.vector.searchBy[O](endInclusive, _._1) match { | ||
case Found(foundIndex) => | ||
_bufferStateRef = | ||
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(foundIndex + 1)) | ||
case InsertionPoint(insertionPoint) => | ||
_bufferStateRef = | ||
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(insertionPoint)) | ||
} | ||
}, | ||
) | ||
} | ||
|
||
private[cache] object EventsBuffer { | ||
private final case class BufferStateRef[O, E]( | ||
vector: Vector[(O, E)] = Vector.empty, | ||
rangeEnd: Option[O] = Option.empty, | ||
) | ||
|
||
private[cache] final case class UnorderedException[O](first: O, second: O) | ||
extends RuntimeException( | ||
s"Elements appended to the buffer should have strictly increasing offsets: $first vs $second" | ||
) | ||
|
||
private[cache] final case class RequestOffBufferBounds[O](bufferEnd: O, requestEnd: O) | ||
extends RuntimeException( | ||
s"Request endInclusive ($requestEnd) is higher than bufferEnd ($bufferEnd)" | ||
) | ||
|
||
/** Binary search implementation inspired from scala.collection.Searching | ||
* which allows specifying the search predicate. | ||
* | ||
* @param v The vector where to search | ||
* @tparam E The element type | ||
*/ | ||
private[cache] implicit class SearchableByVector[E](v: Vector[E]) { | ||
// TODO: Remove this specialized implementation and use v.view.map(by).search(elem) from Scala 2.13+ when compatibility allows it. | ||
final def searchBy[O: Ordering](elem: O, by: E => O): SearchResult = | ||
binarySearch(elem, 0, v.length, by) | ||
|
||
@tailrec | ||
private def binarySearch[O](elem: O, from: Int, to: Int, by: E => O)(implicit | ||
ord: Ordering[O] | ||
): SearchResult = | ||
if (to == from) InsertionPoint(from) | ||
else { | ||
val idx = from + (to - from - 1) / 2 | ||
math.signum(ord.compare(elem, by(v(idx)))) match { | ||
case -1 => binarySearch(elem, from, idx, by)(ord) | ||
case 1 => binarySearch(elem, idx + 1, to, by)(ord) | ||
case _ => Found(idx) | ||
} | ||
} | ||
} | ||
} |
198 changes: 198 additions & 0 deletions
198
...ticipant-integration-api/src/test/suite/scala/platform/store/cache/EventsBufferSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.store.cache | ||
|
||
import java.util.concurrent.Executors | ||
|
||
import com.codahale.metrics.MetricRegistry | ||
import com.daml.metrics.Metrics | ||
import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException} | ||
import org.scalatest.Succeeded | ||
import org.scalatest.compatible.Assertion | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.wordspec.AnyWordSpec | ||
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks | ||
|
||
import scala.collection.Searching.{Found, InsertionPoint} | ||
import scala.collection.immutable | ||
import scala.concurrent.duration.DurationInt | ||
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} | ||
|
||
class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { | ||
private val BufferElements = Vector(2, 3, 5, 8, 13).map(idx => idx -> idx * 2) | ||
private val LastOffset = BufferElements.last._1 | ||
|
||
"push" when { | ||
"max buffer size reached" should { | ||
"drop oldest" in withBuffer(3) { buffer => | ||
buffer.slice(0, LastOffset) shouldBe BufferElements.drop(2) | ||
buffer.push(21, 42) | ||
buffer.slice(0, 21) shouldBe BufferElements.drop(3) :+ 21 -> 42 | ||
} | ||
} | ||
|
||
"element with smaller offset added" should { | ||
"throw" in withBuffer(3) { buffer => | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(1, 2) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 1" | ||
} | ||
} | ||
|
||
"element with equal offset added" should { | ||
"throw" in withBuffer(3) { buffer => | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(13, 2) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 13" | ||
} | ||
} | ||
|
||
"range end with equal offset added" should { | ||
"accept it" in withBuffer(3) { buffer => | ||
buffer.push(13, Int.MaxValue) | ||
buffer.slice(0, 13) shouldBe BufferElements.drop(2) | ||
} | ||
} | ||
|
||
"range end with greater offset added" should { | ||
"not allow new element with lower offset" in withBuffer(3) { buffer => | ||
buffer.push(15, Int.MaxValue) | ||
buffer.slice(0, 13) shouldBe BufferElements.drop(2) | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(14, 28) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 15 vs 14" | ||
} | ||
} | ||
} | ||
|
||
"getEvents" when { | ||
"called with inclusive range" should { | ||
"return the full buffer contents" in withBuffer() { buffer => | ||
buffer.slice(0, 13) shouldBe BufferElements | ||
} | ||
} | ||
|
||
"called with range end before buffer range" should { | ||
"not include elements past the requested end inclusive" in withBuffer() { buffer => | ||
buffer.slice(0, 12) shouldBe BufferElements.dropRight(1) | ||
buffer.slice(0, 8) shouldBe BufferElements.dropRight(1) | ||
} | ||
} | ||
|
||
"called with range start exclusive past the buffer start range" in withBuffer() { buffer => | ||
buffer.slice(4, 13) shouldBe BufferElements.drop(2) | ||
buffer.slice(5, 13) shouldBe BufferElements.drop(3) | ||
} | ||
|
||
"called with endInclusive exceeding buffer range" should { | ||
"fail with exception" in withBuffer() { buffer => | ||
intercept[RequestOffBufferBounds[Int]] { | ||
buffer.slice(4, 15) shouldBe Vector(5 -> 10, 8 -> 16, 13 -> 26) | ||
}.getMessage shouldBe s"Request endInclusive (15) is higher than bufferEnd (13)" | ||
} | ||
} | ||
|
||
"called after push from a different thread" should { | ||
"always see the most recent updates" in withBuffer(1000, Vector.empty) { buffer => | ||
(0 until 1000).foreach(idx => buffer.push(idx, idx)) // fill buffer to max size | ||
|
||
val pushExecutor, sliceExecutor = | ||
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) | ||
|
||
(0 until 1000).foreach { idx => | ||
val expected = ((idx + 901) to (1000 + idx)).map(idx => idx -> idx) | ||
|
||
implicit val ec: ExecutionContextExecutorService = pushExecutor | ||
|
||
Await.result( | ||
// Simulate different thread accesses for push/slice | ||
awaitable = { | ||
for { | ||
_ <- Future(buffer.push(1000 + idx, 1000 + idx))(pushExecutor) | ||
_ <- Future(buffer.slice(900 + idx, 1000 + idx))(sliceExecutor) | ||
.map(_ should contain theSameElementsInOrderAs expected)(sliceExecutor) | ||
} yield Succeeded | ||
}, | ||
atMost = 1.seconds, | ||
) | ||
} | ||
Succeeded | ||
} | ||
} | ||
} | ||
|
||
"prune" when { | ||
"element found" should { | ||
"prune inclusive" in withBuffer() { buffer => | ||
buffer.prune(5) | ||
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26) | ||
} | ||
} | ||
|
||
"element not present" should { | ||
"prune inclusive" in withBuffer() { buffer => | ||
buffer.prune(6) | ||
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26) | ||
} | ||
} | ||
|
||
"element before series" should { | ||
"not prune" in withBuffer() { buffer => | ||
buffer.prune(1) | ||
buffer.slice(0, LastOffset) shouldBe BufferElements | ||
} | ||
} | ||
|
||
"element after series" should { | ||
"prune all" in withBuffer() { buffer => | ||
buffer.prune(15) | ||
buffer.slice(0, LastOffset) shouldBe Vector.empty | ||
} | ||
} | ||
|
||
"one element in buffer" should { | ||
"prune all" in withBuffer(1, Vector(1 -> 2)) { buffer => | ||
buffer.prune(1) | ||
buffer.slice(0, 1) shouldBe Vector.empty | ||
} | ||
} | ||
} | ||
|
||
"binarySearch" should { | ||
import EventsBuffer.SearchableByVector | ||
val series = Vector(9, 10, 13).map(el => el -> el.toString) | ||
|
||
"work on singleton series" in { | ||
Vector(7).searchBy(5, identity) shouldBe InsertionPoint(0) | ||
Vector(7).searchBy(7, identity) shouldBe Found(0) | ||
Vector(7).searchBy(8, identity) shouldBe InsertionPoint(1) | ||
} | ||
|
||
"work on non-empty series" in { | ||
series.searchBy(8, _._1) shouldBe InsertionPoint(0) | ||
series.searchBy(10, _._1) shouldBe Found(1) | ||
series.searchBy(12, _._1) shouldBe InsertionPoint(2) | ||
series.searchBy(13, _._1) shouldBe Found(2) | ||
series.searchBy(14, _._1) shouldBe InsertionPoint(3) | ||
} | ||
|
||
"work on empty series" in { | ||
Vector.empty[Int].searchBy(1337, identity) shouldBe InsertionPoint(0) | ||
} | ||
} | ||
|
||
private def withBuffer( | ||
maxBufferSize: Int = 5, | ||
elems: immutable.Vector[(Int, Int)] = BufferElements, | ||
)(test: EventsBuffer[Int, Int] => Assertion): Assertion = { | ||
val buffer = new EventsBuffer[Int, Int]( | ||
maxBufferSize, | ||
new Metrics(new MetricRegistry), | ||
"integers", | ||
_ == Int.MaxValue, // Signifies ledger end | ||
) | ||
elems.foreach { case (offset, event) => buffer.push(offset, event) } | ||
test(buffer) | ||
} | ||
} |