From e5a6d701827aa33f10a5ce79487970f1bf677c50 Mon Sep 17 00:00:00 2001 From: tudor-da Date: Fri, 3 Sep 2021 10:51:00 +0200 Subject: [PATCH] Added buffer size metrics for getTransactions/getTransactionTrees (#10744) CHANGELOG_BEGIN CHANGELOG_END --- .../main/scala/com/daml/metrics/Metrics.scala | 15 +++++++++----- .../events/BufferedTransactionsReader.scala | 4 ++-- .../events/TransactionsReader.scala | 20 +++++++++++++++---- .../BufferedTransactionsReaderSpec.scala | 2 +- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index cfd6edf13168..deb750159bd3 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -370,6 +370,11 @@ final class Metrics(val registry: MetricRegistry) { val transactionLogUpdatesBufferSize: Counter = registry.counter(Prefix :+ "transaction_log_updates_buffer_size") + val transactionTreesBufferSize: Counter = + registry.counter(Prefix :+ "transaction_trees_buffer_size") + val flatTransactionsBufferSize: Counter = + registry.counter(Prefix :+ "flat_transactions_buffer_size") + val contractStateEventsBufferSize: Counter = registry.counter(Prefix :+ "contract_state_events_buffer_size") @@ -686,12 +691,12 @@ final class Metrics(val registry: MetricRegistry) { val toFlatTransactions: Timer = registry.timer(Prefix :+ "to_flat_transactions") val toTransactionTrees: Timer = registry.timer(Prefix :+ "to_transaction_trees") - } - val transactionTreesBufferSize: Counter = - registry.counter(Prefix :+ "transaction_trees_buffer_size") - val flatTransactionsBufferSize: Counter = - registry.counter(Prefix :+ "flat_transactions_buffer_size") + val transactionTreesBufferSize: Counter = + registry.counter(Prefix :+ "transaction_trees_buffer_size") + val flatTransactionsBufferSize: Counter = + registry.counter(Prefix :+ "flat_transactions_buffer_size") + } val getContractStateEventsChunkSize: Histogram = registry.histogram(Prefix :+ "get_contract_state_events_chunk_fetch_size") diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/BufferedTransactionsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/BufferedTransactionsReader.scala index 4edab139ec37..2a9dc151ef1a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/BufferedTransactionsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/BufferedTransactionsReader.scala @@ -68,7 +68,7 @@ private[events] class BufferedTransactionsReader( resolvedFromBufferCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsBuffered, totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsTotal, - bufferSizeCounter = metrics.daml.services.index.flatTransactionsBufferSize, + bufferSizeCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsBufferSize, outputStreamBufferSize = outputStreamBufferSize, ) } @@ -93,7 +93,7 @@ private[events] class BufferedTransactionsReader( totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal, bufferSizeCounter = // TODO in-memory fan-out: Specialize the metric per consumer - metrics.daml.services.index.transactionTreesBufferSize, + metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize, outputStreamBufferSize = outputStreamBufferSize, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala index 63de8ce82484..b28cb19c3ffa 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala @@ -126,12 +126,18 @@ private[appendonlydao] final class TransactionsReader( }) .mapMaterializedValue(_ => NotUsed) - groupContiguous(events)(by = _.transactionId) + val flatTransactionsStream = groupContiguous(events)(by = _.transactionId) .mapConcat { events => val response = EventsTable.Entry.toGetTransactionsResponse(events) response.map(r => offsetFor(r) -> r) } - .buffer(outputStreamBufferSize, OverflowStrategy.backpressure) + + InstrumentedSource + .bufferedSource( + flatTransactionsStream, + metrics.daml.index.flatTransactionsBufferSize, + outputStreamBufferSize, + ) .wireTap(_ match { case (_, response) => response.transactions.foreach(txn => @@ -221,12 +227,18 @@ private[appendonlydao] final class TransactionsReader( }) .mapMaterializedValue(_ => NotUsed) - groupContiguous(events)(by = _.transactionId) + val transactionTreesStream = groupContiguous(events)(by = _.transactionId) .mapConcat { events => val response = EventsTable.Entry.toGetTransactionTreesResponse(events) response.map(r => offsetFor(r) -> r) } - .buffer(outputStreamBufferSize, OverflowStrategy.backpressure) + + InstrumentedSource + .bufferedSource( + transactionTreesStream, + metrics.daml.index.transactionTreesBufferSize, + outputStreamBufferSize, + ) .wireTap(_ match { case (_, response) => response.transactions.foreach(txn => diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/BufferedTransactionsReaderSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/BufferedTransactionsReaderSpec.scala index 0c1b233bbcf1..75ad185ac19a 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/BufferedTransactionsReaderSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/BufferedTransactionsReaderSpec.scala @@ -101,7 +101,7 @@ class BufferedTransactionsReaderSpec resolvedFromBufferCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBuffered, totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal, - bufferSizeCounter = metrics.daml.services.index.transactionTreesBufferSize, + bufferSizeCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize, outputStreamBufferSize = 128, ) .runWith(Sink.seq)