-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
EventsBuffer implementation for in-memory fan-out for Ledger API serving
CHANGELOG_BEGIN CHANGELOG_END
- Loading branch information
Showing
3 changed files
with
330 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
166 changes: 166 additions & 0 deletions
166
ledger/participant-integration-api/src/main/scala/platform/store/cache/EventsBuffer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.store.cache | ||
|
||
import com.daml.metrics.{Metrics, Timed} | ||
import com.daml.platform.store.cache.EventsBuffer.{ | ||
RequestOffBufferBounds, | ||
UnorderedException, | ||
SearchableByVector, | ||
} | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.Searching.{Found, InsertionPoint, SearchResult} | ||
import scala.math.Ordering | ||
import scala.math.Ordering.Implicits.infixOrderingOps | ||
|
||
/** An ordered-by-offset ring buffer wrapper. | ||
* | ||
* The buffer allows appending only elements with strictly increasing offsets, with the exception | ||
* of the range end marker, which can have an offset equal to the last appended element. | ||
* | ||
* @param maxBufferSize The maximum buffer size. | ||
* @param metrics The DAML metrics. | ||
* @param bufferQualifier The qualifier used for metrics tag specialization. | ||
* @param isRangeEndMarker Identifies if an element [[E]] should be treated | ||
* as a range end marker. | ||
* @tparam O The offset type. | ||
* @tparam E The entry buffer type. | ||
*/ | ||
private[cache] final class EventsBuffer[O: Ordering, E]( | ||
maxBufferSize: Int, | ||
metrics: Metrics, | ||
bufferQualifier: String, | ||
isRangeEndMarker: E => Boolean, | ||
) { | ||
@volatile private var bufferRef = Vector.empty[(O, E)] | ||
@volatile private var bufferRangeEnd: Option[O] = None | ||
|
||
private val pushTimer = metrics.daml.services.index.buffer.push(bufferQualifier) | ||
private val sliceTimer = metrics.daml.services.index.buffer.slice(bufferQualifier) | ||
private val pruneTimer = metrics.daml.services.index.buffer.prune(bufferQualifier) | ||
|
||
/** Appends a new event to the buffer. | ||
* | ||
* Starts evicting from the tail when `maxBufferSize` is reached. | ||
* | ||
* @param offset The event offset. | ||
* Must be higher than the last appended entry's offset, with the exception | ||
* of the range end marker, which can have an offset equal to the last appended element. | ||
* @param entry The buffer entry. | ||
*/ | ||
def push(offset: O, entry: E): Unit = | ||
Timed.value( | ||
pushTimer, | ||
synchronized { | ||
if (bufferRef.size == maxBufferSize) | ||
bufferRef = bufferRef.drop(1) | ||
|
||
bufferRangeEnd.foreach { lastOffset => | ||
// Ensure vector grows with strictly monotonic offsets. | ||
// Elements with equal offsets to the last buffer offset are allowed | ||
// provided they are a range end marker | ||
if (lastOffset > offset || (lastOffset == offset && !isRangeEndMarker(entry))) { | ||
throw UnorderedException(lastOffset, offset) | ||
} | ||
} | ||
|
||
// Update the buffer upper limit | ||
bufferRangeEnd = Some(offset) | ||
|
||
if (!isRangeEndMarker(entry)) { | ||
// Only append entries | ||
bufferRef = bufferRef :+ offset -> entry | ||
} | ||
}, | ||
) | ||
|
||
/** Returns a slice of events from the buffer. | ||
* | ||
* Throws an exception if requested with `endInclusive` higher than the highest offset in the buffer. | ||
* | ||
* @param startExclusive The start exclusive bound of the requested range. | ||
* @param endInclusive The end inclusive bound of the requested range. | ||
* @return The series of events as an ordered vector satisfying the input bounds. | ||
*/ | ||
def slice(startExclusive: O, endInclusive: O): Vector[(O, E)] = | ||
Timed.value( | ||
sliceTimer, { | ||
val buffer = bufferRef | ||
if (buffer.isEmpty) Vector.empty | ||
else { | ||
|
||
val bufferEndExclusiveIdx = buffer.searchBy(endInclusive, _._1) match { | ||
case InsertionPoint(idx) if idx == buffer.size => | ||
throw RequestOffBufferBounds(buffer.last._1, endInclusive) | ||
case Found(foundIndex) => foundIndex + 1 | ||
case InsertionPoint(insertionPoint) => insertionPoint | ||
} | ||
|
||
val bufferStartInclusiveIdx = buffer.searchBy(startExclusive, _._1) match { | ||
case InsertionPoint(insertionPoint) => insertionPoint | ||
case Found(foundIndex) => foundIndex + 1 | ||
} | ||
|
||
buffer.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx) | ||
} | ||
}, | ||
) | ||
|
||
/** Removes entries starting from the buffer tail up until `endInclusive`. | ||
* | ||
* @param endInclusive The last inclusive (highest) buffer offset to be pruned. | ||
*/ | ||
def prune(endInclusive: O): Unit = | ||
Timed.value( | ||
pruneTimer, | ||
synchronized { | ||
bufferRef.searchBy[O](endInclusive, _._1) match { | ||
case Found(foundIndex) => | ||
bufferRef = bufferRef.drop(foundIndex + 1) | ||
case InsertionPoint(insertionPoint) => | ||
bufferRef = bufferRef.drop(insertionPoint) | ||
} | ||
}, | ||
) | ||
} | ||
|
||
private[cache] object EventsBuffer { | ||
private[cache] final case class UnorderedException[O](first: O, second: O) | ||
extends RuntimeException( | ||
s"Elements appended to the buffer should have strictly increasing offsets: $first vs $second" | ||
) | ||
|
||
private[cache] final case class RequestOffBufferBounds[O](bufferEnd: O, requestEnd: O) | ||
extends RuntimeException( | ||
s"Request endInclusive ($requestEnd) is higher than bufferEnd ($bufferEnd)" | ||
) | ||
|
||
/** Binary search implementation inspired from scala.collection.Searching | ||
* which allows specifying the search predicate. | ||
* | ||
* TODO: Use v.view.search from Scala 2.13+ when compatibility allows it. | ||
* | ||
* @param v The vector where to search | ||
* @tparam E The element type | ||
*/ | ||
private[cache] implicit class SearchableByVector[E](v: Vector[E]) { | ||
final def searchBy[O: Ordering](elem: O, by: E => O): SearchResult = | ||
binarySearch(elem, 0, v.length, by) | ||
|
||
@tailrec | ||
private def binarySearch[O](elem: O, from: Int, to: Int, by: E => O)(implicit | ||
ord: Ordering[O] | ||
): SearchResult = | ||
if (to == from) InsertionPoint(from) | ||
else { | ||
val idx = from + (to - from - 1) / 2 | ||
math.signum(ord.compare(elem, by(v(idx)))) match { | ||
case -1 => binarySearch(elem, from, idx, by)(ord) | ||
case 1 => binarySearch(elem, idx + 1, to, by)(ord) | ||
case _ => Found(idx) | ||
} | ||
} | ||
} | ||
} |
156 changes: 156 additions & 0 deletions
156
...ticipant-integration-api/src/test/suite/scala/platform/store/cache/EventsBufferSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.store.cache | ||
|
||
import com.codahale.metrics.MetricRegistry | ||
import com.daml.metrics.Metrics | ||
import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException} | ||
import org.scalacheck.Gen | ||
import org.scalatest.compatible.Assertion | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.wordspec.AnyWordSpec | ||
import org.scalatest.{Failed, Succeeded} | ||
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks | ||
|
||
import scala.collection.immutable | ||
|
||
class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { | ||
private val BufferElements = Vector(2, 3, 5, 8, 13).map(idx => idx -> idx * 2) | ||
private val LastOffset = BufferElements.last._1 | ||
|
||
"push" when { | ||
"max buffer size reached" should { | ||
"drop oldest" in withBuffer(3) { buffer => | ||
buffer.slice(0, LastOffset) shouldBe BufferElements.drop(2) | ||
buffer.push(21, 42) | ||
buffer.slice(0, 21) shouldBe BufferElements.drop(3) :+ 21 -> 42 | ||
} | ||
} | ||
|
||
"element with smaller offset added" should { | ||
"throw" in withBuffer(3) { buffer => | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(1, 2) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 1" | ||
} | ||
} | ||
|
||
"element with equal offset added" should { | ||
"throw" in withBuffer(3) { buffer => | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(13, 2) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 13" | ||
} | ||
} | ||
|
||
"range end with equal offset added" should { | ||
"accept it" in withBuffer(3) { buffer => | ||
buffer.push(13, Int.MaxValue) | ||
buffer.slice(0, 13) shouldBe Vector(8 -> 16, 13 -> 26) | ||
} | ||
} | ||
|
||
"range end with greater offset added" should { | ||
"not allow new element with lower offset" in withBuffer(3) { buffer => | ||
buffer.push(15, Int.MaxValue) | ||
buffer.slice(0, 13) shouldBe Vector(8 -> 16, 13 -> 26) | ||
intercept[UnorderedException[Int]] { | ||
buffer.push(14, 28) | ||
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 15 vs 14" | ||
} | ||
} | ||
} | ||
|
||
"getEvents" when { | ||
"called with valid range" should { | ||
val bufferElements = increasinglySparse(10).map(offset => offset -> offset * 2) | ||
|
||
"return the correct series" in withBuffer(10, bufferElements) { buffer => | ||
val rangesGen = for { | ||
startExclusive <- Gen.choose(0, bufferElements.last._1) | ||
endInclusive <- Gen.choose(startExclusive, bufferElements.last._1) | ||
} yield (startExclusive, endInclusive) | ||
|
||
forAll(rangesGen, minSuccessful(100)) { case (startExclusive, endInclusive) => | ||
buffer.slice(startExclusive, endInclusive) shouldBe bufferElements | ||
.dropWhile(_._1 <= startExclusive) | ||
.takeWhile(_._1 <= endInclusive) | ||
} | ||
} | ||
} | ||
|
||
"called with endInclusive exceeding buffer range" should { | ||
"fail with exception" in withBuffer() { buffer => | ||
intercept[RequestOffBufferBounds[Int]] { | ||
buffer.slice(4, 15) shouldBe Vector(5 -> 10, 8 -> 16, 13 -> 26) | ||
}.getMessage shouldBe s"Request endInclusive (15) is higher than bufferEnd (13)" | ||
} | ||
} | ||
|
||
"called after push" should { | ||
"always see the most recent updates" in withBuffer(1000, Vector.empty) { buffer => | ||
(0 until 1000).foreach(idx => buffer.push(idx, idx)) // fill buffer to max size | ||
|
||
(0 until 10000).foldLeft[Assertion](Succeeded) { | ||
case (failed: Failed, _) => failed | ||
case (_, idx) => | ||
val expected = ((idx + 1) to (100 + idx)).map(idx => idx -> idx) | ||
buffer.push(1000 + idx, 1000 + idx) | ||
buffer.slice(idx, 100 + idx) should contain theSameElementsInOrderAs expected | ||
} | ||
} | ||
} | ||
} | ||
|
||
"prune" when { | ||
"element found" should { | ||
"prune inclusive" in withBuffer() { buffer => | ||
buffer.prune(5) | ||
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26) | ||
} | ||
} | ||
|
||
"element inserted" should { | ||
"prune inclusive" in withBuffer() { buffer => | ||
buffer.prune(6) | ||
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26) | ||
} | ||
} | ||
|
||
"element before series" should { | ||
"not prune" in withBuffer() { buffer => | ||
buffer.prune(1) | ||
buffer.slice(0, LastOffset) shouldBe BufferElements | ||
} | ||
} | ||
|
||
"element after series" should { | ||
"prune all" in withBuffer() { buffer => | ||
buffer.prune(15) | ||
buffer.slice(0, LastOffset) shouldBe Vector.empty | ||
} | ||
} | ||
} | ||
|
||
private def withBuffer( | ||
maxBufferSize: Int = 5, | ||
elems: immutable.Vector[(Int, Int)] = BufferElements, | ||
)(test: EventsBuffer[Int, Int] => Assertion): Assertion = { | ||
val buffer = new EventsBuffer[Int, Int]( | ||
maxBufferSize, | ||
new Metrics(new MetricRegistry), | ||
"integers", | ||
_ == Int.MaxValue, // Signifies ledger end | ||
) | ||
elems.foreach { case (offset, event) => buffer.push(offset, event) } | ||
test(buffer) | ||
} | ||
|
||
private def increasinglySparse(size: Int) = | ||
(0 until size - 1) | ||
.foldLeft(1 -> Vector(1)) { case ((inc, vector), _) => | ||
(inc + 1) -> (vector :+ (vector.last + inc)) | ||
} | ||
._2 | ||
} |