Skip to content

Commit

Permalink
Added buffer size metrics for getTransactions/getTransactionTrees (#1…
Browse files Browse the repository at this point in the history
…0744)

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored Sep 3, 2021
1 parent f76c868 commit e5a6d70
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
15 changes: 10 additions & 5 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ final class Metrics(val registry: MetricRegistry) {
val transactionLogUpdatesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_log_updates_buffer_size")

val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")

val contractStateEventsBufferSize: Counter =
registry.counter(Prefix :+ "contract_state_events_buffer_size")

Expand Down Expand Up @@ -686,12 +691,12 @@ final class Metrics(val registry: MetricRegistry) {

val toFlatTransactions: Timer = registry.timer(Prefix :+ "to_flat_transactions")
val toTransactionTrees: Timer = registry.timer(Prefix :+ "to_transaction_trees")
}

val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
}

val getContractStateEventsChunkSize: Histogram =
registry.histogram(Prefix :+ "get_contract_state_events_chunk_fetch_size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[events] class BufferedTransactionsReader(
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.flatTransactionsBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsTotal,
bufferSizeCounter = metrics.daml.services.index.flatTransactionsBufferSize,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
)
}
Expand All @@ -93,7 +93,7 @@ private[events] class BufferedTransactionsReader(
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter =
// TODO in-memory fan-out: Specialize the metric per consumer
metrics.daml.services.index.transactionTreesBufferSize,
metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ private[appendonlydao] final class TransactionsReader(
})
.mapMaterializedValue(_ => NotUsed)

groupContiguous(events)(by = _.transactionId)
val flatTransactionsStream = groupContiguous(events)(by = _.transactionId)
.mapConcat { events =>
val response = EventsTable.Entry.toGetTransactionsResponse(events)
response.map(r => offsetFor(r) -> r)
}
.buffer(outputStreamBufferSize, OverflowStrategy.backpressure)

InstrumentedSource
.bufferedSource(
flatTransactionsStream,
metrics.daml.index.flatTransactionsBufferSize,
outputStreamBufferSize,
)
.wireTap(_ match {
case (_, response) =>
response.transactions.foreach(txn =>
Expand Down Expand Up @@ -221,12 +227,18 @@ private[appendonlydao] final class TransactionsReader(
})
.mapMaterializedValue(_ => NotUsed)

groupContiguous(events)(by = _.transactionId)
val transactionTreesStream = groupContiguous(events)(by = _.transactionId)
.mapConcat { events =>
val response = EventsTable.Entry.toGetTransactionTreesResponse(events)
response.map(r => offsetFor(r) -> r)
}
.buffer(outputStreamBufferSize, OverflowStrategy.backpressure)

InstrumentedSource
.bufferedSource(
transactionTreesStream,
metrics.daml.index.transactionTreesBufferSize,
outputStreamBufferSize,
)
.wireTap(_ match {
case (_, response) =>
response.transactions.foreach(txn =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BufferedTransactionsReaderSpec
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.transactionTreesBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter = metrics.daml.services.index.transactionTreesBufferSize,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = 128,
)
.runWith(Sink.seq)
Expand Down

0 comments on commit e5a6d70

Please sign in to comment.