From 224ab3621c8c3745aa5af78f655b4676794d7a5f Mon Sep 17 00:00:00 2001 From: Miklos <57664299+miklos-da@users.noreply.github.com> Date: Wed, 29 Jul 2020 10:48:30 +0200 Subject: [PATCH] Ledger writer supporting pre-execution and normal flow (#6904) * Added ledger writer that chooses between instances based on estimate interpretation cost. CHANGELOG_BEGIN CHANGELOG_END * Code tidying. * Delegate to pre-executing writer in case thershold is set to 0. * Added ability to change metrics. * Added metrics. * Code tidying. * Update ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala Co-authored-by: Samir Talwar Co-authored-by: Samir Talwar --- .../main/scala/com/daml/metrics/Metrics.scala | 7 ++ ...retationCostBasedLedgerWriterChooser.scala | 74 ++++++++++++++ ...tionCostBasedLedgerWriterChooserSpec.scala | 98 +++++++++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala create mode 100644 ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooserSpec.scala diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index 86568ff6ae67..365e29ba3ae5 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -200,6 +200,13 @@ final class Metrics(val registry: MetricRegistry) { private val Prefix: MetricName = kvutils.Prefix :+ "writer" val commit: Timer = registry.timer(Prefix :+ "commit") + + val preExecutedCount: Counter = registry.counter(Prefix :+ "pre_executed_count") + val preExecutedInterpretationCosts: Histogram = + registry.histogram(Prefix :+ "pre_executed_interpretation_costs") + val committedCount: Counter = registry.counter(Prefix :+ "committed_count") + val committedInterpretationCosts: Histogram = + registry.histogram(Prefix :+ "committed_interpretation_costs") } object conflictdetection { diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala new file mode 100644 index 000000000000..912ade9b100a --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala @@ -0,0 +1,74 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.api + +import com.daml.ledger.api.health.HealthStatus +import com.daml.ledger.participant.state.kvutils.Bytes +import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} +import com.daml.metrics.Metrics + +import scala.concurrent.Future + +/** + * Sends commits to [[cheapTransactionsDelegate]] in case estimated interpretation cost is below + * [[estimatedInterpretationCostThreshold]] otherwise to [[expensiveTransactionsDelegate]]. + * Submissions that don't have an estimated interpretation cost will be forwarded to + * [[cheapTransactionsDelegate]]. + * + * @param estimatedInterpretationCostThreshold all transactions that have a greater or equal estimated interpretation + * cost will be forwarded to [[expensiveTransactionsDelegate]] + */ +final class InterpretationCostBasedLedgerWriterChooser( + estimatedInterpretationCostThreshold: Long, + cheapTransactionsDelegate: LedgerWriter, + expensiveTransactionsDelegate: LedgerWriter, + incrementCheapCounter: () => Unit = () => (), + incrementExpensiveCounter: () => Unit = () => (), + addInterpretationCostBelowThreshold: Long => Unit = _ => (), + addInterpretationCostAboveThreshold: Long => Unit = _ => ()) + extends LedgerWriter { + assert(cheapTransactionsDelegate.participantId == expensiveTransactionsDelegate.participantId) + + override def participantId: ParticipantId = cheapTransactionsDelegate.participantId + + override def commit( + correlationId: String, + envelope: Bytes, + metadata: CommitMetadata, + ): Future[SubmissionResult] = { + val estimatedInterpretationCost = metadata.estimatedInterpretationCost.getOrElse(0L) + if (estimatedInterpretationCost >= estimatedInterpretationCostThreshold) { + incrementExpensiveCounter() + addInterpretationCostAboveThreshold(estimatedInterpretationCost) + expensiveTransactionsDelegate.commit(correlationId, envelope, metadata) + } else { + incrementCheapCounter() + addInterpretationCostBelowThreshold(estimatedInterpretationCost) + cheapTransactionsDelegate.commit(correlationId, envelope, metadata) + } + } + + override def currentHealth(): HealthStatus = + cheapTransactionsDelegate.currentHealth() and expensiveTransactionsDelegate + .currentHealth() +} + +object InterpretationCostBasedLedgerWriterChooser { + def apply( + estimatedInterpretationCostThreshold: Long, + cheapTransactionsDelegate: LedgerWriter, + expensiveTransactionsDelegate: LedgerWriter, + damlMetrics: Metrics): InterpretationCostBasedLedgerWriterChooser = { + val metrics = damlMetrics.daml.kvutils.writer + new InterpretationCostBasedLedgerWriterChooser( + estimatedInterpretationCostThreshold, + cheapTransactionsDelegate, + expensiveTransactionsDelegate, + incrementCheapCounter = metrics.committedCount.inc, + incrementExpensiveCounter = metrics.preExecutedCount.inc, + addInterpretationCostBelowThreshold = metrics.committedInterpretationCosts.update, + addInterpretationCostAboveThreshold = metrics.preExecutedInterpretationCosts.update + ) + } +} diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooserSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooserSpec.scala new file mode 100644 index 000000000000..0a5d1116cfa6 --- /dev/null +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooserSpec.scala @@ -0,0 +1,98 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.api + +import com.daml.ledger.api.health.{Healthy, Unhealthy} +import com.daml.ledger.participant.state.kvutils.Bytes +import com.daml.ledger.participant.state.v1.SubmissionResult.Acknowledged +import com.google.protobuf.ByteString +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{AsyncWordSpec, Matchers} + +import scala.concurrent.Future + +class InterpretationCostBasedLedgerWriterChooserSpec + extends AsyncWordSpec + with Matchers + with MockitoSugar { + "commit" should { + "delegate to cheap writer in case of no estimated interpretation cost" in { + val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = None) + val mockWriterCheap = mock[LedgerWriter] + when(mockWriterCheap.commit(any[String], any[Bytes], any[CommitMetadata])) + .thenReturn(Future.successful(Acknowledged)) + val instance = + new InterpretationCostBasedLedgerWriterChooser(1L, mockWriterCheap, mock[LedgerWriter]) + + instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ => + verify(mockWriterCheap, times(1)).commit(any[String], any[Bytes], any[CommitMetadata]) + } + succeed + } + + "delegate to cheap writer in case estimated interpretation cost is below threshold" in { + val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = Some(1)) + val mockWriterCheap = mock[LedgerWriter] + when(mockWriterCheap.commit(anyString(), any[Bytes], any[CommitMetadata])) + .thenReturn(Future.successful(Acknowledged)) + val instance = + new InterpretationCostBasedLedgerWriterChooser(2L, mockWriterCheap, mock[LedgerWriter]) + + instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ => + verify(mockWriterCheap, times(1)).commit(any[String], any[Bytes], any[CommitMetadata]) + } + succeed + } + + "delegate to expensive writer in case estimated interpretation cost reaches the threshold" in { + val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = Some(1)) + val mockWriterExpensive = mock[LedgerWriter] + when(mockWriterExpensive.commit(anyString(), any[Bytes], any[CommitMetadata])) + .thenReturn(Future.successful(Acknowledged)) + val instance = + new InterpretationCostBasedLedgerWriterChooser(1L, mock[LedgerWriter], mockWriterExpensive) + + instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ => + verify(mockWriterExpensive, times(1)).commit(any[String], any[Bytes], any[CommitMetadata]) + } + succeed + } + + "delegate to expensive writer in case threshold is 0" in { + val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = None) + val mockWriterExpensive = mock[LedgerWriter] + when(mockWriterExpensive.commit(anyString(), any[Bytes], any[CommitMetadata])) + .thenReturn(Future.successful(Acknowledged)) + val instance = + new InterpretationCostBasedLedgerWriterChooser(0L, mock[LedgerWriter], mockWriterExpensive) + + instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ => + verify(mockWriterExpensive, times(1)).commit(any[String], any[Bytes], any[CommitMetadata]) + } + succeed + } + } + + "currentHealth" should { + "query both writer's health" in { + val mockWriterCheap = mock[LedgerWriter] + val mockWriterExpensive = mock[LedgerWriter] + when(mockWriterCheap.currentHealth()).thenReturn(Unhealthy) + when(mockWriterExpensive.currentHealth()).thenReturn(Healthy) + val instance = + new InterpretationCostBasedLedgerWriterChooser(1L, mockWriterCheap, mockWriterExpensive) + + instance.currentHealth() shouldBe (Unhealthy and Healthy) + + verify(mockWriterCheap, times(1)).currentHealth() + verify(mockWriterExpensive, times(1)).currentHealth() + succeed + } + } + + private def aCorrelationId: String = "" + private def anEnvelope: Bytes = ByteString.EMPTY +}