Skip to content

Commit

Permalink
DPP-665 Make in-flight commands configurable (digital-asset#11456)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
rautenrieth-da authored Oct 28, 2021
1 parent bf00956 commit bb4f4c5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ object Cli {
config.copy(contractSetDescriptorFile = Some(descriptorFile))
}

opt[Int]("max-in-flight-commands")
.hidden() // TODO: uncomment when production-ready
.text("Maximum in-flight commands for command submissions.")
.optional()
.action { case (size, config) =>
config.copy(maxInFlightCommands = size)
}

opt[FiniteDuration]("log-interval")
.abbr("r")
.text("Stream metrics log interval.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ case class Config(
streams: List[Config.StreamConfig],
reportingPeriod: FiniteDuration,
contractSetDescriptorFile: Option[File],
maxInFlightCommands: Int,
metricsReporter: MetricsReporter,
)

Expand Down Expand Up @@ -93,6 +94,7 @@ object Config {
streams = List.empty[Config.StreamConfig],
reportingPeriod = 5.seconds,
contractSetDescriptorFile = None,
maxInFlightCommands = 100,
metricsReporter = MetricsReporter.Console,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object LedgerApiBenchTool {
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) => CommandSubmitter(apiServices).submit(descriptorFile)
case Some(descriptorFile) =>
CommandSubmitter(apiServices).submit(descriptorFile, config.maxInFlightCommands)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ case class CommandSubmitter(services: LedgerApiServices) {
private def darId(index: Int) = s"submission-dars-$index-$identifierSuffix"

def submit(
descriptorFile: File
descriptorFile: File,
maxInFlightCommands: Int,
)(implicit ec: ExecutionContext): Future[Unit] =
(for {
_ <- Future.successful(logger.info("Generating contracts..."))
Expand All @@ -41,7 +42,7 @@ case class CommandSubmitter(services: LedgerApiServices) {
signatory <- allocateParty(signatoryName)
observers <- allocateParties(descriptor.numberOfObservers, observerName)
_ <- uploadTestDars()
_ <- submitCommands(descriptor, signatory, observers)
_ <- submitCommands(descriptor, signatory, observers, maxInFlightCommands)
} yield logger.info("Commands submitted successfully."))
.recoverWith { case NonFatal(ex) =>
logger.error(s"Command submission failed. Details: ${ex.getLocalizedMessage}", ex)
Expand Down Expand Up @@ -108,25 +109,29 @@ case class CommandSubmitter(services: LedgerApiServices) {
descriptor: ContractSetDescriptor,
signatory: Primitive.Party,
observers: List[Primitive.Party],
maxInFlightCommands: Int,
)(implicit
ec: ExecutionContext
): Future[Unit] = {
implicit val resourceContext: ResourceContext = ResourceContext(ec)

val progressMeter = CommandSubmitter.ProgressMeter(descriptor.numberOfInstances)
// Output a log line roughly once per 10% progress, or once every 500 submissions (whichever comes first)
val progressLogInterval = math.min(descriptor.numberOfInstances / 10 + 1, 500)
val progressLoggingSink =
Sink.foreach[Int](index =>
if (index % 500 == 0) {
if (index % progressLogInterval == 0) {
logger.info(progressMeter.getProgress(index))
}
)
val generator = new CommandGenerator(RandomnessProvider.Default, descriptor, observers)

logger.info("Submitting commands...")
materializerOwner()
.use { implicit materializer =>
Source
.fromIterator(() => (1 to descriptor.numberOfInstances).iterator)
.mapAsync(100) { index =>
.mapAsync(maxInFlightCommands) { index =>
generator.next() match {
case Success(command) =>
submitAndWait(
Expand Down

0 comments on commit bb4f4c5

Please sign in to comment.