Skip to content

Commit

Permalink
Make the InstrumentedSource.queue use the BoundedSourceQueue
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN

- [Integration Kit] InstrumentedSource.queue.offer no longer returns a Future

CHANGELOG_END
  • Loading branch information
hubert-da committed Nov 22, 2021
1 parent ab520fb commit 052c914
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,37 @@

package com.daml.metrics

import akka.Done
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.Source
import akka.stream.{BoundedSourceQueue, Materializer, OverflowStrategy, QueueOfferResult}
import com.codahale.metrics.{Counter, Timer}
import com.daml.dec.DirectExecutionContext

import scala.concurrent.Future

object InstrumentedSource {

final class QueueWithComplete[T](
delegate: SourceQueueWithComplete[(Timer.Context, T)],
final class InstrumentedBoundedSourceQueue[T](
delegate: BoundedSourceQueue[(Timer.Context, T)],
bufferSize: Int,
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
) extends SourceQueueWithComplete[T] {
) extends BoundedSourceQueue[T] {

override def complete(): Unit = delegate.complete()
override def complete(): Unit = {
delegate.complete()
capacityCounter.dec(bufferSize.toLong)
}

override def fail(ex: Throwable): Unit = delegate.fail(ex)

override def watchCompletion(): Future[Done] = delegate.watchCompletion()

override def offer(elem: T): Future[QueueOfferResult] = {
override def offer(elem: T): QueueOfferResult = {
val result = delegate.offer(
delayTimer.time() -> elem
)
// Use the `DirectExecutionContext` to ensure that the
// counter is updated as closely as possible to the
// update of the queue, so to offer the most consistent
// reading possible via the counter
result.foreach {
result match {
case QueueOfferResult.Enqueued =>
lengthCounter.inc()

case _ => // do nothing
}(DirectExecutionContext)
}
result
}
}
Expand All @@ -63,26 +58,24 @@ object InstrumentedSource {
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
)(implicit
materializer: Materializer
): Source[T, QueueWithComplete[T]] = {
val (queue, source) =
Source.queue[(Timer.Context, T)](bufferSize, overflowStrategy).preMaterialize()
): Source[T, BoundedSourceQueue[T]] = {
val (boundedQueue, source) =
Source.queue[(Timer.Context, T)](bufferSize).preMaterialize()

val instrumentedQueue =
new QueueWithComplete[T](queue, lengthCounter, delayTimer)
// Using `map` and not `wireTap` because the latter is skipped on backpressure.

new InstrumentedBoundedSourceQueue[T](
boundedQueue,
bufferSize,
capacityCounter,
lengthCounter,
delayTimer,
)
capacityCounter.inc(bufferSize.toLong)
instrumentedQueue
.watchCompletion()
.andThen { case _ =>
capacityCounter.dec(bufferSize.toLong)
}(DirectExecutionContext)

source.mapMaterializedValue(_ => instrumentedQueue).map { case (timingContext, item) =>
timingContext.stop()
Expand Down
Loading

0 comments on commit 052c914

Please sign in to comment.