Skip to content

Commit

Permalink
EventsBuffer implementation for in-memory fan-out for Ledger API serv…
Browse files Browse the repository at this point in the history
…ing (#9775)

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored May 28, 2021
1 parent 0c12259 commit 63bc0d1
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 0 deletions.
8 changes: 8 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ final class Metrics(val registry: MetricRegistry) {
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")

object streamsBuffer {
private val Prefix: MetricName = index.Prefix :+ "streams_buffer"

def push(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "push")
def slice(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "slice")
def prune(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "prune")
}
}

object read {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.cache

import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.cache.EventsBuffer.{
BufferStateRef,
RequestOffBufferBounds,
SearchableByVector,
UnorderedException,
}

import scala.annotation.tailrec
import scala.collection.Searching.{Found, InsertionPoint, SearchResult}
import scala.math.Ordering
import scala.math.Ordering.Implicits.infixOrderingOps

/** An ordered-by-offset ring buffer.
*
* The buffer allows appending only elements with strictly increasing offsets.
*
* @param maxBufferSize The maximum buffer size.
* @param metrics The DAML metrics.
* @param bufferQualifier The qualifier used for metrics tag specialization.
* @param isRangeEndMarker Identifies if an element [[E]] should be treated
* as a range end marker, in which case the element would be treated
* as a buffer range end updater and not appended to the actual buffer.
* @tparam O The offset type.
* @tparam E The entry buffer type.
*/
private[cache] final class EventsBuffer[O: Ordering, E](
maxBufferSize: Int,
metrics: Metrics,
bufferQualifier: String,
isRangeEndMarker: E => Boolean,
) {
@volatile private var _bufferStateRef = BufferStateRef[O, E]()

private val pushTimer = metrics.daml.services.index.streamsBuffer.push(bufferQualifier)
private val sliceTimer = metrics.daml.services.index.streamsBuffer.slice(bufferQualifier)
private val pruneTimer = metrics.daml.services.index.streamsBuffer.prune(bufferQualifier)

/** Appends a new event to the buffer.
*
* Starts evicting from the tail when `maxBufferSize` is reached.
*
* @param offset The event offset.
* Must be higher than the last appended entry's offset, with the exception
* of the range end marker, which can have an offset equal to the last appended element.
* @param entry The buffer entry.
*/
def push(offset: O, entry: E): Unit =
Timed.value(
pushTimer,
synchronized {
_bufferStateRef.rangeEnd.foreach { lastOffset =>
// Ensure vector grows with strictly monotonic offsets.
// Only specially-designated range end markers are allowed
// to have offsets equal to the buffer range end.
if (lastOffset > offset || (lastOffset == offset && !isRangeEndMarker(entry))) {
throw UnorderedException(lastOffset, offset)
}
}

var auxBufferVector = _bufferStateRef.vector

// The range end markers are not appended to the buffer
if (!isRangeEndMarker(entry)) {
if (auxBufferVector.size == maxBufferSize) {
auxBufferVector = auxBufferVector.drop(1)
}
auxBufferVector = auxBufferVector :+ offset -> entry
}

// Update the buffer reference
_bufferStateRef = BufferStateRef(auxBufferVector, Some(offset))
},
)

/** Returns a slice of events from the buffer.
*
* Throws an exception if requested with `endInclusive` higher than the highest offset in the buffer.
*
* @param startExclusive The start exclusive bound of the requested range.
* @param endInclusive The end inclusive bound of the requested range.
* @return The series of events as an ordered vector satisfying the input bounds.
*/
def slice(startExclusive: O, endInclusive: O): Vector[(O, E)] =
Timed.value(
sliceTimer, {
val bufferSnapshot = _bufferStateRef
if (bufferSnapshot.rangeEnd.exists(_ < endInclusive)) {
throw RequestOffBufferBounds(bufferSnapshot.vector.last._1, endInclusive)
} else if (bufferSnapshot.vector.isEmpty) {
Vector.empty
} else {
val bufferEndExclusiveIdx = bufferSnapshot.vector.searchBy(endInclusive, _._1) match {
case Found(foundIndex) => foundIndex + 1
case InsertionPoint(insertionPoint) => insertionPoint
}

val bufferStartInclusiveIdx = bufferSnapshot.vector.searchBy(startExclusive, _._1) match {
case InsertionPoint(insertionPoint) => insertionPoint
case Found(foundIndex) => foundIndex + 1
}

bufferSnapshot.vector.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx)
}
},
)

/** Removes entries starting from the buffer tail up until `endInclusive`.
*
* @param endInclusive The last inclusive (highest) buffer offset to be pruned.
*/
def prune(endInclusive: O): Unit =
Timed.value(
pruneTimer,
synchronized {
_bufferStateRef.vector.searchBy[O](endInclusive, _._1) match {
case Found(foundIndex) =>
_bufferStateRef =
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(foundIndex + 1))
case InsertionPoint(insertionPoint) =>
_bufferStateRef =
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(insertionPoint))
}
},
)
}

private[cache] object EventsBuffer {
private final case class BufferStateRef[O, E](
vector: Vector[(O, E)] = Vector.empty,
rangeEnd: Option[O] = Option.empty,
)

private[cache] final case class UnorderedException[O](first: O, second: O)
extends RuntimeException(
s"Elements appended to the buffer should have strictly increasing offsets: $first vs $second"
)

private[cache] final case class RequestOffBufferBounds[O](bufferEnd: O, requestEnd: O)
extends RuntimeException(
s"Request endInclusive ($requestEnd) is higher than bufferEnd ($bufferEnd)"
)

/** Binary search implementation inspired from scala.collection.Searching
* which allows specifying the search predicate.
*
* @param v The vector where to search
* @tparam E The element type
*/
private[cache] implicit class SearchableByVector[E](v: Vector[E]) {
// TODO: Remove this specialized implementation and use v.view.map(by).search(elem) from Scala 2.13+ when compatibility allows it.
final def searchBy[O: Ordering](elem: O, by: E => O): SearchResult =
binarySearch(elem, 0, v.length, by)

@tailrec
private def binarySearch[O](elem: O, from: Int, to: Int, by: E => O)(implicit
ord: Ordering[O]
): SearchResult =
if (to == from) InsertionPoint(from)
else {
val idx = from + (to - from - 1) / 2
math.signum(ord.compare(elem, by(v(idx)))) match {
case -1 => binarySearch(elem, from, idx, by)(ord)
case 1 => binarySearch(elem, idx + 1, to, by)(ord)
case _ => Found(idx)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.cache

import java.util.concurrent.Executors

import com.codahale.metrics.MetricRegistry
import com.daml.metrics.Metrics
import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException}
import org.scalatest.Succeeded
import org.scalatest.compatible.Assertion
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

import scala.collection.Searching.{Found, InsertionPoint}
import scala.collection.immutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}

class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks {
private val BufferElements = Vector(2, 3, 5, 8, 13).map(idx => idx -> idx * 2)
private val LastOffset = BufferElements.last._1

"push" when {
"max buffer size reached" should {
"drop oldest" in withBuffer(3) { buffer =>
buffer.slice(0, LastOffset) shouldBe BufferElements.drop(2)
buffer.push(21, 42)
buffer.slice(0, 21) shouldBe BufferElements.drop(3) :+ 21 -> 42
}
}

"element with smaller offset added" should {
"throw" in withBuffer(3) { buffer =>
intercept[UnorderedException[Int]] {
buffer.push(1, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 1"
}
}

"element with equal offset added" should {
"throw" in withBuffer(3) { buffer =>
intercept[UnorderedException[Int]] {
buffer.push(13, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 13"
}
}

"range end with equal offset added" should {
"accept it" in withBuffer(3) { buffer =>
buffer.push(13, Int.MaxValue)
buffer.slice(0, 13) shouldBe BufferElements.drop(2)
}
}

"range end with greater offset added" should {
"not allow new element with lower offset" in withBuffer(3) { buffer =>
buffer.push(15, Int.MaxValue)
buffer.slice(0, 13) shouldBe BufferElements.drop(2)
intercept[UnorderedException[Int]] {
buffer.push(14, 28)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 15 vs 14"
}
}
}

"getEvents" when {
"called with inclusive range" should {
"return the full buffer contents" in withBuffer() { buffer =>
buffer.slice(0, 13) shouldBe BufferElements
}
}

"called with range end before buffer range" should {
"not include elements past the requested end inclusive" in withBuffer() { buffer =>
buffer.slice(0, 12) shouldBe BufferElements.dropRight(1)
buffer.slice(0, 8) shouldBe BufferElements.dropRight(1)
}
}

"called with range start exclusive past the buffer start range" in withBuffer() { buffer =>
buffer.slice(4, 13) shouldBe BufferElements.drop(2)
buffer.slice(5, 13) shouldBe BufferElements.drop(3)
}

"called with endInclusive exceeding buffer range" should {
"fail with exception" in withBuffer() { buffer =>
intercept[RequestOffBufferBounds[Int]] {
buffer.slice(4, 15) shouldBe Vector(5 -> 10, 8 -> 16, 13 -> 26)
}.getMessage shouldBe s"Request endInclusive (15) is higher than bufferEnd (13)"
}
}

"called after push from a different thread" should {
"always see the most recent updates" in withBuffer(1000, Vector.empty) { buffer =>
(0 until 1000).foreach(idx => buffer.push(idx, idx)) // fill buffer to max size

val pushExecutor, sliceExecutor =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))

(0 until 1000).foreach { idx =>
val expected = ((idx + 901) to (1000 + idx)).map(idx => idx -> idx)

implicit val ec: ExecutionContextExecutorService = pushExecutor

Await.result(
// Simulate different thread accesses for push/slice
awaitable = {
for {
_ <- Future(buffer.push(1000 + idx, 1000 + idx))(pushExecutor)
_ <- Future(buffer.slice(900 + idx, 1000 + idx))(sliceExecutor)
.map(_ should contain theSameElementsInOrderAs expected)(sliceExecutor)
} yield Succeeded
},
atMost = 1.seconds,
)
}
Succeeded
}
}
}

"prune" when {
"element found" should {
"prune inclusive" in withBuffer() { buffer =>
buffer.prune(5)
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26)
}
}

"element not present" should {
"prune inclusive" in withBuffer() { buffer =>
buffer.prune(6)
buffer.slice(0, LastOffset) shouldBe Vector(8 -> 16, 13 -> 26)
}
}

"element before series" should {
"not prune" in withBuffer() { buffer =>
buffer.prune(1)
buffer.slice(0, LastOffset) shouldBe BufferElements
}
}

"element after series" should {
"prune all" in withBuffer() { buffer =>
buffer.prune(15)
buffer.slice(0, LastOffset) shouldBe Vector.empty
}
}

"one element in buffer" should {
"prune all" in withBuffer(1, Vector(1 -> 2)) { buffer =>
buffer.prune(1)
buffer.slice(0, 1) shouldBe Vector.empty
}
}
}

"binarySearch" should {
import EventsBuffer.SearchableByVector
val series = Vector(9, 10, 13).map(el => el -> el.toString)

"work on singleton series" in {
Vector(7).searchBy(5, identity) shouldBe InsertionPoint(0)
Vector(7).searchBy(7, identity) shouldBe Found(0)
Vector(7).searchBy(8, identity) shouldBe InsertionPoint(1)
}

"work on non-empty series" in {
series.searchBy(8, _._1) shouldBe InsertionPoint(0)
series.searchBy(10, _._1) shouldBe Found(1)
series.searchBy(12, _._1) shouldBe InsertionPoint(2)
series.searchBy(13, _._1) shouldBe Found(2)
series.searchBy(14, _._1) shouldBe InsertionPoint(3)
}

"work on empty series" in {
Vector.empty[Int].searchBy(1337, identity) shouldBe InsertionPoint(0)
}
}

private def withBuffer(
maxBufferSize: Int = 5,
elems: immutable.Vector[(Int, Int)] = BufferElements,
)(test: EventsBuffer[Int, Int] => Assertion): Assertion = {
val buffer = new EventsBuffer[Int, Int](
maxBufferSize,
new Metrics(new MetricRegistry),
"integers",
_ == Int.MaxValue, // Signifies ledger end
)
elems.foreach { case (offset, event) => buffer.push(offset, event) }
test(buffer)
}
}

0 comments on commit 63bc0d1

Please sign in to comment.