Skip to content

Commit

Permalink
ledger-api-bench-tool - command submission and stream reading complet…
Browse files Browse the repository at this point in the history
…e workflow [DPP-668] (#11544)

* Created a top level WorkflowDescriptor

* Stream descriptor in yaml

* Do not pass global config to Benchmark

* Fixed parsing template ids

* Moved descriptor to the top level

* Complete workflow

* Print descriptor

* Simplifiying

* Simplifiying

* Simplifiying

* Defining submission+benchmark workflow in a yaml file.

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool can run a workflow consisting of the command submission phase and stream benchmark phase.
CHANGELOG_END

* Log submission elapsed time

* Change submission progress logging interval to 10000

* Changed misleading error type

* unique_parties switch

* Allow to skip submission phase
  • Loading branch information
kamil-da authored Nov 10, 2021
1 parent f8f8807 commit 743ee46
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,44 @@ import com.daml.ledger.api.benchtool.metrics.{
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.resources.ResourceContext
import com.daml.metrics.MetricsReporter
import org.slf4j.LoggerFactory

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object Benchmark {
private val logger = LoggerFactory.getLogger(getClass)

def run(
config: Config,
streams: List[Config.StreamConfig],
reportingPeriod: FiniteDuration,
apiServices: LedgerApiServices,
metricsReporter: MetricsReporter,
)(implicit ec: ExecutionContext, resourceContext: ResourceContext): Future[Unit] = {
val resources = for {
system <- TypedActorSystemResourceOwner.owner()
registry <- new MetricRegistryOwner(
reporter = config.metricsReporter,
reportingInterval = config.reportingPeriod,
reporter = metricsReporter,
reportingInterval = reportingPeriod,
logger = logger,
)
} yield (system, registry)

resources.use { case (system, registry) =>
Future
.traverse(config.streams) {
.traverse(streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
logInterval = reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet
.transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
.transactionExposedMetrics(streamConfig.name, registry, reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
Expand All @@ -55,14 +59,14 @@ object Benchmark {
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
logInterval = reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.transactionTreesExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
reportingPeriod,
)
),
)(system, ec)
Expand All @@ -73,14 +77,14 @@ object Benchmark {
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
logInterval = reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
reportingPeriod,
)
),
)(system, ec)
Expand All @@ -91,12 +95,12 @@ object Benchmark {
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
logInterval = reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet
.completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
.completionsExposedMetrics(streamConfig.name, registry, reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.Config.StreamConfig
import com.daml.ledger.api.benchtool.submission.CommandSubmitter
import com.daml.ledger.test.model.Foo.{Foo1, Foo2, Foo3}
import com.daml.ledger.client.binding.Primitive

object DescriptorConverter {

def streamDescriptorToConfig(
descriptor: StreamDescriptor,
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
): StreamConfig = {
import scalaz.syntax.tag._
def templateStringToId(template: String) = template match {
case "Foo1" => Foo1.id.unwrap
case "Foo2" => Foo2.id.unwrap
case "Foo3" => Foo3.id.unwrap
case invalid => throw new RuntimeException(s"Invalid template: $invalid")
}

def convertedParty(party: String): String = {
submissionSummary match {
case None => party
case Some(summary) => partyFromObservers(party, summary.observers)
}
}

def partyFromObservers(party: String, observers: List[Primitive.Party]): String =
observers
.map(_.unwrap)
.find(_.contains(party))
.getOrElse(throw new RuntimeException(s"Observer not found: $party"))

val filters = descriptor.filters.map { filter =>
convertedParty(filter.party) -> Some(filter.templates.map(templateStringToId))
}.toMap

descriptor.streamType match {
case StreamDescriptor.StreamType.ActiveContracts =>
Config.StreamConfig.ActiveContractsStreamConfig(
name = descriptor.name,
filters = filters,
)
case invalid =>
throw new RuntimeException(s"Invalid stream type: $invalid")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.submission.CommandSubmitter
import com.daml.ledger.api.benchtool.services._
import com.daml.ledger.api.benchtool.util.SimpleFileReader
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import org.slf4j.{Logger, LoggerFactory}
import pprint.PPrinter

import java.io.File
import java.util.concurrent.{
ArrayBlockingQueue,
Executor,
Expand All @@ -20,6 +23,7 @@ import java.util.concurrent.{
}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object LedgerApiBenchTool {
def main(args: Array[String]): Unit = {
Expand All @@ -38,38 +42,66 @@ object LedgerApiBenchTool {
}

private def run(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
val printer = pprint.PPrinter(200, 1000)
logger.info(s"Starting benchmark with configuration:\n${printer(config).toString()}")
logger.info(s"Starting benchmark with configuration:\n${prettyPrint(config)}")

implicit val resourceContext: ResourceContext = ResourceContext(ec)

apiServicesOwner(config).use { apiServices =>
def testContractsGenerationStep(): Future[Unit] = config.contractSetDescriptorFile match {
case None =>
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
def benchmarkStep(streams: List[Config.StreamConfig]): Future[Unit] =
if (streams.isEmpty) {
Future.successful(logger.info(s"No streams defined. Skipping the benchmark step."))
} else {
Benchmark.run(
streams = streams,
reportingPeriod = config.reportingPeriod,
apiServices = apiServices,
metricsReporter = config.metricsReporter,
)
}

def submissionStep(
submissionDescriptor: Option[SubmissionDescriptor]
): Future[Option[CommandSubmitter.SubmissionSummary]] =
submissionDescriptor match {
case None =>
logger.info(s"No submission defined. Skipping.")
Future.successful(None)
case Some(descriptor) =>
CommandSubmitter(apiServices)
.submit(
descriptor = descriptor,
maxInFlightCommands = config.maxInFlightCommands,
submissionBatchSize = config.submissionBatchSize,
)
.map(Some(_))
}

config.contractSetDescriptorFile match {
case None =>
benchmarkStep(config.streams)
case Some(descriptorFile) =>
CommandSubmitter(apiServices).submit(
descriptorFile,
config.maxInFlightCommands,
config.submissionBatchSize,
)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Future.successful(logger.info(s"No streams defined. Skipping the benchmark step."))
} else {
Benchmark.run(config, apiServices)
for {
descriptor <- Future.fromTry(parseDescriptor(descriptorFile))
_ = logger.info(prettyPrint(descriptor))
summary <- submissionStep(descriptor.submission)
streams = descriptor.streams.map(
DescriptorConverter.streamDescriptorToConfig(_, summary)
)
_ = logger.info(s"Converted stream configs: ${prettyPrint(streams)}")
_ <- benchmarkStep(streams)
} yield ()
}

for {
_ <- testContractsGenerationStep()
_ <- benchmarkStep()
} yield ()
}
}

private def parseDescriptor(descriptorFile: File): Try[WorkflowDescriptor] =
SimpleFileReader.readFile(descriptorFile)(WorkflowParser.parse).flatMap {
case Left(err: WorkflowParser.ParserError) =>
Failure(new RuntimeException(s"Workflow parsing error. Details: ${err.details}"))
case Right(descriptor) =>
Success(descriptor)
}

private def apiServicesOwner(
config: Config
)(implicit ec: ExecutionContext): ResourceOwner[LedgerApiServices] =
Expand Down Expand Up @@ -124,4 +156,6 @@ object LedgerApiBenchTool {
)

private val logger: Logger = LoggerFactory.getLogger(getClass)
private val printer: PPrinter = pprint.PPrinter(200, 1000)
private def prettyPrint(x: Any): String = printer(x).toString()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

final case class WorkflowDescriptor(
submission: Option[SubmissionDescriptor],
streams: List[StreamDescriptor] = List.empty,
)

final case class SubmissionDescriptor(
numberOfInstances: Int,
numberOfObservers: Int,
uniqueParties: Boolean,
instanceDistribution: List[SubmissionDescriptor.ContractDescription],
)

object SubmissionDescriptor {
final case class ContractDescription(
template: String,
weight: Int,
payloadSizeBytes: Int,
archiveChance: Double,
)
}

final case class StreamDescriptor(
streamType: StreamDescriptor.StreamType,
name: String,
filters: List[StreamDescriptor.PartyFilter],
)

object StreamDescriptor {
sealed trait StreamType
object StreamType {
case object Transactions extends StreamType
case object TransactionTrees extends StreamType
case object ActiveContracts extends StreamType
}

final case class PartyFilter(party: String, templates: List[String])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

import io.circe.Decoder
import io.circe.yaml.parser

import java.io.Reader

object WorkflowParser {
import Decoders._

def parse(reader: Reader): Either[ParserError, WorkflowDescriptor] =
parser
.parse(reader)
.flatMap(_.as[WorkflowDescriptor])
.left
.map(error => ParserError(error.getLocalizedMessage))

case class ParserError(details: String)

object Decoders {
implicit val contractDescriptionDecoder: Decoder[SubmissionDescriptor.ContractDescription] =
Decoder.forProduct4(
"template",
"weight",
"payload_size_bytes",
"archive_probability",
)(SubmissionDescriptor.ContractDescription.apply)

implicit val submissionDescriptorDecoder: Decoder[SubmissionDescriptor] =
Decoder.forProduct4(
"num_instances",
"num_observers",
"unique_parties",
"instance_distribution",
)(SubmissionDescriptor.apply)

implicit val streamTypeDecoder: Decoder[StreamDescriptor.StreamType] = Decoder[String].emap {
case "active-contracts" => Right(StreamDescriptor.StreamType.ActiveContracts)
case "transactions" => Right(StreamDescriptor.StreamType.Transactions)
case "transaction-trees" => Right(StreamDescriptor.StreamType.TransactionTrees)
case invalid => Left(s"Invalid stream type: $invalid")
}

implicit val filtersDecoder: Decoder[StreamDescriptor.PartyFilter] =
Decoder.forProduct2(
"party",
"templates",
)(StreamDescriptor.PartyFilter.apply)

implicit val streamDescriptorDecoder: Decoder[StreamDescriptor] =
Decoder.forProduct3(
"type",
"name",
"filters",
)(StreamDescriptor.apply)

implicit val workflowDescriptorDecoder: Decoder[WorkflowDescriptor] =
Decoder.forProduct2(
"submission",
"streams",
)(WorkflowDescriptor.apply)
}

}
Loading

0 comments on commit 743ee46

Please sign in to comment.