Skip to content

Commit

Permalink
EventsBuffer implementation for in-memory fan-out for Ledger API serving
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da committed May 25, 2021
1 parent 1b428be commit d2b6de2
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 0 deletions.
8 changes: 8 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ final class Metrics(val registry: MetricRegistry) {
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")

object buffer {
private val Prefix: MetricName = index.Prefix :+ "streams_buffer"

def push(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "push")
def slice(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "slice")
def prune(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "prune")
}
}

object read {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.cache

import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.cache.EventsBuffer.{
RequestOffBufferBounds,
UnorderedException,
SearchableByVector,
}

import scala.annotation.tailrec
import scala.collection.Searching.{Found, InsertionPoint, SearchResult}
import scala.math.Ordering
import scala.math.Ordering.Implicits.infixOrderingOps

/** An ordered-by-offset ring buffer wrapper.
*
* The buffer allows appending only elements with strictly increasing offsets, with the exception
* of the range end marker, which can have an offset equal to the last appended element.
*
* @param maxBufferSize The maximum buffer size.
* @param metrics The DAML metrics.
* @param bufferQualifier The qualifier used for metrics tag specialization.
* @param isRangeEndMarker Identifies if an element [[E]] should be treated
* as a range end marker.
* @tparam O The offset type.
* @tparam E The entry buffer type.
*/
private[cache] final class EventsBuffer[O: Ordering, E](
maxBufferSize: Int,
metrics: Metrics,
bufferQualifier: String,
isRangeEndMarker: E => Boolean,
) {
@volatile private var bufferRef = Vector.empty[(O, E)]
@volatile private var bufferRangeEnd: Option[O] = None

private val pushTimer = metrics.daml.services.index.buffer.push(bufferQualifier)
private val sliceTimer = metrics.daml.services.index.buffer.slice(bufferQualifier)
private val pruneTimer = metrics.daml.services.index.buffer.prune(bufferQualifier)

/** Appends a new event to the buffer.
*
* Starts evicting from the tail when `maxBufferSize` is reached.
*
* @param offset The event offset.
* Must be higher than the last appended entry's offset, with the exception
* of the range end marker, which can have an offset equal to the last appended element.
* @param entry The buffer entry.
*/
def push(offset: O, entry: E): Unit =
Timed.value(
pushTimer,
synchronized {
if (bufferRef.size == maxBufferSize)
bufferRef = bufferRef.drop(1)

bufferRangeEnd.foreach { lastOffset =>
// Ensure vector grows with strictly monotonic offsets.
// Elements with equal offsets to the last buffer offset are allowed
// provided they are a range end marker
if (lastOffset > offset || (lastOffset == offset && !isRangeEndMarker(entry))) {
throw UnorderedException(lastOffset, offset)
}
}

// Update the buffer upper limit
bufferRangeEnd = Some(offset)

if (!isRangeEndMarker(entry)) {
// Only append entries
bufferRef = bufferRef :+ offset -> entry
}
},
)

/** Returns a slice of events from the buffer.
*
* Throws an exception if requested with `endInclusive` higher than the highest offset in the buffer.
*
* @param startExclusive The start exclusive bound of the requested range.
* @param endInclusive The end inclusive bound of the requested range.
* @return The series of events as an ordered vector satisfying the input bounds.
*/
def slice(startExclusive: O, endInclusive: O): Vector[(O, E)] =
Timed.value(
sliceTimer, {
val buffer = bufferRef
if (buffer.isEmpty) Vector.empty
else {

val bufferEndExclusiveIdx = buffer.searchBy(endInclusive, _._1) match {
case InsertionPoint(idx) if idx == buffer.size =>
throw RequestOffBufferBounds(buffer.last._1, endInclusive)
case Found(foundIndex) => foundIndex + 1
case InsertionPoint(insertionPoint) => insertionPoint
}

val bufferStartInclusiveIdx = buffer.searchBy(startExclusive, _._1) match {
case InsertionPoint(insertionPoint) => insertionPoint
case Found(foundIndex) => foundIndex + 1
}

buffer.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx)
}
},
)

/** Removes entries starting from the buffer tail up until `endInclusive`.
*
* @param endInclusive The last inclusive (highest) buffer offset to be pruned.
*/
def prune(endInclusive: O): Unit =
Timed.value(
pruneTimer,
synchronized {
bufferRef.searchBy[O](endInclusive, _._1) match {
case Found(foundIndex) =>
bufferRef = bufferRef.drop(foundIndex + 1)
case InsertionPoint(insertionPoint) =>
bufferRef = bufferRef.drop(insertionPoint)
}
},
)
}

private[cache] object EventsBuffer {
private[cache] final case class UnorderedException[O](first: O, second: O)
extends RuntimeException(
s"Elements appended to the buffer should have strictly increasing offsets: $first vs $second"
)

private[cache] final case class RequestOffBufferBounds[O](bufferEnd: O, requestEnd: O)
extends RuntimeException(
s"Request endInclusive ($requestEnd) is higher than bufferEnd ($bufferEnd)"
)

/** Binary search implementation inspired from scala.collection.Searching
* which allows specifying the search predicate.
*
* TODO: Use v.view.search from Scala 2.13+ when compatibility allows it.
*
* @param v The vector where to search
* @tparam E The element type
*/
private[cache] implicit class SearchableByVector[E](v: Vector[E]) {
final def searchBy[O: Ordering](elem: O, by: E => O): SearchResult =
binarySearch(elem, 0, v.length, by)

@tailrec
private def binarySearch[O](elem: O, from: Int, to: Int, by: E => O)(implicit
ord: Ordering[O]
): SearchResult =
if (to == from) InsertionPoint(from)
else {
val idx = from + (to - from - 1) / 2
math.signum(ord.compare(elem, by(v(idx)))) match {
case -1 => binarySearch(elem, from, idx, by)(ord)
case 1 => binarySearch(elem, idx + 1, to, by)(ord)
case _ => Found(idx)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.cache

import com.codahale.metrics.MetricRegistry
import com.daml.metrics.Metrics
import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException}
import org.scalacheck.Gen
import org.scalatest.compatible.Assertion
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{Failed, Succeeded}
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

import scala.collection.immutable

class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks {
private val BufferElements = Vector(2, 3, 5, 8, 13).map(idx => idx -> idx * 2)
private val LastOffset = BufferElements.last._1

"push" when {
"max buffer size reached" should {
"drop oldest" in withBuffer(3) { buffer =>
buffer.slice(0, LastOffset) shouldBe BufferElements.drop(2)
buffer.push(21, 42)
buffer.slice(0, 21) shouldBe BufferElements.drop(3) :+ 21 -> 42
}
}

"element with smaller offset added" should {
"throw" in withBuffer(3) { buffer =>
intercept[UnorderedException[Int]] {
buffer.push(1, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 1"
}
}

"element with equal offset added" should {
"throw" in withBuffer(3) { buffer =>
intercept[UnorderedException[Int]] {
buffer.push(13, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 13"
}
}

"range end with equal offset added" should {
"accept it" in withBuffer(3) { buffer =>
buffer.push(13, Int.MaxValue)
buffer.slice(0, 13) shouldBe Vector(8 -> 16, 13 -> 26)
}
}

"range end with greater offset added" should {
"not allow new element with lower offset" in withBuffer(3) { buffer =>
buffer.push(15, Int.MaxValue)
buffer.slice(0, 13) shouldBe Vector(8 -> 16, 13 -> 26)
intercept[UnorderedException[Int]] {
buffer.push(14, 28)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 15 vs 14"
}
}
}

"getEvents" when {
"called with valid range" should {
val bufferElements = increasinglySparse(10).map(offset => offset -> offset * 2)

"return the correct series" in withBuffer(10, bufferElements) { buffer =>
val rangesGen = for {
startExclusive <- Gen.choose(0, bufferElements.last._1)
endInclusive <- Gen.choose(startExclusive, bufferElements.last._1)
} yield (startExclusive, endInclusive)

forAll(rangesGen, minSuccessful(100)) { case (startExclusive, endInclusive) =>
buffer.slice(startExclusive, endInclusive) shouldBe bufferElements
.dropWhile(_._1 <= startExclusive)
.takeWhile(_._1 <= endInclusive)
}
}
}

"called with endInclusive exceeding buffer range" should {
"fail with exception" in withBuffer() { buffer =>
intercept[RequestOffBufferBounds[Int]] {
buffer.slice(4, 15) shouldBe Vector(5 -> 10, 8 -> 16, 13 -> 26)
}.getMessage shouldBe s"Request endInclusive (15) is higher than bufferEnd (13)"
}
}

"called after push" should {
"always see the most recent updates" in withBuffer(1000, Vector.empty) { buffer =>
(0 until 1000).foreach(idx => buffer.push(idx, idx)) // fill buffer to max size

(0 until 10000).foldLeft[Assertion](Succeeded) {
case (failed: Failed, _) => failed
case (_, idx) =>
val expected = ((idx + 1) to (100 + idx)).map(idx => idx -> idx)
buffer.push(1000 + idx, 1000 + idx)
buffer.slice(idx, 100 + idx) should contain theSameElementsInOrderAs expected
}
}
}
}

"prune" when {
"element found" should {
"prune inclusive" in withBuffer() { buffer =>
buffer.prune(5)
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26)
}
}

"element inserted" should {
"prune inclusive" in withBuffer() { buffer =>
buffer.prune(6)
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26)
}
}

"element before series" should {
"not prune" in withBuffer() { buffer =>
buffer.prune(1)
buffer.slice(0, LastOffset) shouldBe BufferElements
}
}

"element after series" should {
"prune all" in withBuffer() { buffer =>
buffer.prune(15)
buffer.slice(0, LastOffset) shouldBe Vector.empty
}
}
}

private def withBuffer(
maxBufferSize: Int = 5,
elems: immutable.Vector[(Int, Int)] = BufferElements,
)(test: EventsBuffer[Int, Int] => Assertion): Assertion = {
val buffer = new EventsBuffer[Int, Int](
maxBufferSize,
new Metrics(new MetricRegistry),
"integers",
_ == Int.MaxValue, // Signifies ledger end
)
elems.foreach { case (offset, event) => buffer.push(offset, event) }
test(buffer)
}

private def increasinglySparse(size: Int) =
(0 until size - 1)
.foldLeft(1 -> Vector(1)) { case ((inc, vector), _) =>
(inc + 1) -> (vector :+ (vector.last + inc))
}
._2
}

0 comments on commit d2b6de2

Please sign in to comment.