Skip to content

Commit

Permalink
Ledger writer supporting pre-execution and normal flow (digital-asset…
Browse files Browse the repository at this point in the history
…#6904)

* Added ledger writer that chooses between instances based on estimate interpretation cost.
CHANGELOG_BEGIN
CHANGELOG_END

* Code tidying.

* Delegate to pre-executing writer in case thershold is set to 0.

* Added ability to change metrics.

* Added metrics.

* Code tidying.

* Update ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/InterpretationCostBasedLedgerWriterChooser.scala

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
  • Loading branch information
miklos-da and SamirTalwar authored Jul 29, 2020
1 parent aec5cac commit 224ab36
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
7 changes: 7 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ final class Metrics(val registry: MetricRegistry) {
private val Prefix: MetricName = kvutils.Prefix :+ "writer"

val commit: Timer = registry.timer(Prefix :+ "commit")

val preExecutedCount: Counter = registry.counter(Prefix :+ "pre_executed_count")
val preExecutedInterpretationCosts: Histogram =
registry.histogram(Prefix :+ "pre_executed_interpretation_costs")
val committedCount: Counter = registry.counter(Prefix :+ "committed_count")
val committedInterpretationCosts: Histogram =
registry.histogram(Prefix :+ "committed_interpretation_costs")
}

object conflictdetection {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.api

import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.metrics.Metrics

import scala.concurrent.Future

/**
* Sends commits to [[cheapTransactionsDelegate]] in case estimated interpretation cost is below
* [[estimatedInterpretationCostThreshold]] otherwise to [[expensiveTransactionsDelegate]].
* Submissions that don't have an estimated interpretation cost will be forwarded to
* [[cheapTransactionsDelegate]].
*
* @param estimatedInterpretationCostThreshold all transactions that have a greater or equal estimated interpretation
* cost will be forwarded to [[expensiveTransactionsDelegate]]
*/
final class InterpretationCostBasedLedgerWriterChooser(
estimatedInterpretationCostThreshold: Long,
cheapTransactionsDelegate: LedgerWriter,
expensiveTransactionsDelegate: LedgerWriter,
incrementCheapCounter: () => Unit = () => (),
incrementExpensiveCounter: () => Unit = () => (),
addInterpretationCostBelowThreshold: Long => Unit = _ => (),
addInterpretationCostAboveThreshold: Long => Unit = _ => ())
extends LedgerWriter {
assert(cheapTransactionsDelegate.participantId == expensiveTransactionsDelegate.participantId)

override def participantId: ParticipantId = cheapTransactionsDelegate.participantId

override def commit(
correlationId: String,
envelope: Bytes,
metadata: CommitMetadata,
): Future[SubmissionResult] = {
val estimatedInterpretationCost = metadata.estimatedInterpretationCost.getOrElse(0L)
if (estimatedInterpretationCost >= estimatedInterpretationCostThreshold) {
incrementExpensiveCounter()
addInterpretationCostAboveThreshold(estimatedInterpretationCost)
expensiveTransactionsDelegate.commit(correlationId, envelope, metadata)
} else {
incrementCheapCounter()
addInterpretationCostBelowThreshold(estimatedInterpretationCost)
cheapTransactionsDelegate.commit(correlationId, envelope, metadata)
}
}

override def currentHealth(): HealthStatus =
cheapTransactionsDelegate.currentHealth() and expensiveTransactionsDelegate
.currentHealth()
}

object InterpretationCostBasedLedgerWriterChooser {
def apply(
estimatedInterpretationCostThreshold: Long,
cheapTransactionsDelegate: LedgerWriter,
expensiveTransactionsDelegate: LedgerWriter,
damlMetrics: Metrics): InterpretationCostBasedLedgerWriterChooser = {
val metrics = damlMetrics.daml.kvutils.writer
new InterpretationCostBasedLedgerWriterChooser(
estimatedInterpretationCostThreshold,
cheapTransactionsDelegate,
expensiveTransactionsDelegate,
incrementCheapCounter = metrics.committedCount.inc,
incrementExpensiveCounter = metrics.preExecutedCount.inc,
addInterpretationCostBelowThreshold = metrics.committedInterpretationCosts.update,
addInterpretationCostAboveThreshold = metrics.preExecutedInterpretationCosts.update
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.api

import com.daml.ledger.api.health.{Healthy, Unhealthy}
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.v1.SubmissionResult.Acknowledged
import com.google.protobuf.ByteString
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{AsyncWordSpec, Matchers}

import scala.concurrent.Future

class InterpretationCostBasedLedgerWriterChooserSpec
extends AsyncWordSpec
with Matchers
with MockitoSugar {
"commit" should {
"delegate to cheap writer in case of no estimated interpretation cost" in {
val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = None)
val mockWriterCheap = mock[LedgerWriter]
when(mockWriterCheap.commit(any[String], any[Bytes], any[CommitMetadata]))
.thenReturn(Future.successful(Acknowledged))
val instance =
new InterpretationCostBasedLedgerWriterChooser(1L, mockWriterCheap, mock[LedgerWriter])

instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ =>
verify(mockWriterCheap, times(1)).commit(any[String], any[Bytes], any[CommitMetadata])
}
succeed
}

"delegate to cheap writer in case estimated interpretation cost is below threshold" in {
val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = Some(1))
val mockWriterCheap = mock[LedgerWriter]
when(mockWriterCheap.commit(anyString(), any[Bytes], any[CommitMetadata]))
.thenReturn(Future.successful(Acknowledged))
val instance =
new InterpretationCostBasedLedgerWriterChooser(2L, mockWriterCheap, mock[LedgerWriter])

instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ =>
verify(mockWriterCheap, times(1)).commit(any[String], any[Bytes], any[CommitMetadata])
}
succeed
}

"delegate to expensive writer in case estimated interpretation cost reaches the threshold" in {
val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = Some(1))
val mockWriterExpensive = mock[LedgerWriter]
when(mockWriterExpensive.commit(anyString(), any[Bytes], any[CommitMetadata]))
.thenReturn(Future.successful(Acknowledged))
val instance =
new InterpretationCostBasedLedgerWriterChooser(1L, mock[LedgerWriter], mockWriterExpensive)

instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ =>
verify(mockWriterExpensive, times(1)).commit(any[String], any[Bytes], any[CommitMetadata])
}
succeed
}

"delegate to expensive writer in case threshold is 0" in {
val commitMetadata = SimpleCommitMetadata(estimatedInterpretationCost = None)
val mockWriterExpensive = mock[LedgerWriter]
when(mockWriterExpensive.commit(anyString(), any[Bytes], any[CommitMetadata]))
.thenReturn(Future.successful(Acknowledged))
val instance =
new InterpretationCostBasedLedgerWriterChooser(0L, mock[LedgerWriter], mockWriterExpensive)

instance.commit(aCorrelationId, anEnvelope, commitMetadata).map { _ =>
verify(mockWriterExpensive, times(1)).commit(any[String], any[Bytes], any[CommitMetadata])
}
succeed
}
}

"currentHealth" should {
"query both writer's health" in {
val mockWriterCheap = mock[LedgerWriter]
val mockWriterExpensive = mock[LedgerWriter]
when(mockWriterCheap.currentHealth()).thenReturn(Unhealthy)
when(mockWriterExpensive.currentHealth()).thenReturn(Healthy)
val instance =
new InterpretationCostBasedLedgerWriterChooser(1L, mockWriterCheap, mockWriterExpensive)

instance.currentHealth() shouldBe (Unhealthy and Healthy)

verify(mockWriterCheap, times(1)).currentHealth()
verify(mockWriterExpensive, times(1)).currentHealth()
succeed
}
}

private def aCorrelationId: String = ""
private def anEnvelope: Bytes = ByteString.EMPTY
}

0 comments on commit 224ab36

Please sign in to comment.