Skip to content

Commit

Permalink
DPP-664 Create contracts in batches (#11515)
Browse files Browse the repository at this point in the history
* Create contracts in batches through a factory contract

changelog_begin
changelog_end

* Use batching on protobuf level

* Revert needless changes

* Address review comments
  • Loading branch information
rautenrieth-da authored Nov 4, 2021
1 parent 9ef2eaf commit 8d48cf6
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ object Cli {
config.copy(maxInFlightCommands = size)
}

opt[Int]("submission-batch-size")
.hidden() // TODO: uncomment when production-ready
.text("Number of contracts created per command submission.")
.optional()
.action { case (size, config) =>
config.copy(submissionBatchSize = size)
}

opt[FiniteDuration]("log-interval")
.abbr("r")
.text("Stream metrics log interval.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ case class Config(
reportingPeriod: FiniteDuration,
contractSetDescriptorFile: Option[File],
maxInFlightCommands: Int,
submissionBatchSize: Int,
metricsReporter: MetricsReporter,
)

Expand Down Expand Up @@ -91,6 +92,7 @@ object Config {
reportingPeriod = 5.seconds,
contractSetDescriptorFile = None,
maxInFlightCommands = 100,
submissionBatchSize = 100,
metricsReporter = MetricsReporter.Console,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ object LedgerApiBenchTool {
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) =>
CommandSubmitter(apiServices).submit(descriptorFile, config.maxInFlightCommands)
CommandSubmitter(apiServices).submit(
descriptorFile,
config.maxInFlightCommands,
config.submissionBatchSize,
)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final class CommandGenerator(
.toMap
private val observersWithIndices: List[(Primitive.Party, Int)] = observers.zipWithIndex

def next(): Try[Primitive.Party => Command] =
def next(): Try[Command] =
(for {
(description, observers) <- Try((pickDescription(), pickObservers()))
payload <- Try(randomPayload(description.payloadSizeBytes))
Expand All @@ -35,7 +35,7 @@ final class CommandGenerator(
payload = payload,
archive = archive,
)
} yield command).recoverWith { case NonFatal(ex) =>
} yield command(signatory)).recoverWith { case NonFatal(ex) =>
Failure(
CommandGenerator.CommandGeneratorError(
msg = s"Command generation failed. Details: ${ex.getLocalizedMessage}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.daml.ledger.api.benchtool.submission

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.api.benchtool.infrastructure.TestDars
import com.daml.ledger.api.benchtool.services.LedgerApiServices
Expand All @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory
import scalaz.syntax.tag._

import java.io.File
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand All @@ -34,6 +35,7 @@ case class CommandSubmitter(services: LedgerApiServices) {
def submit(
descriptorFile: File,
maxInFlightCommands: Int,
submissionBatchSize: Int,
)(implicit ec: ExecutionContext): Future[Unit] =
(for {
_ <- Future.successful(logger.info("Generating contracts..."))
Expand All @@ -42,7 +44,13 @@ case class CommandSubmitter(services: LedgerApiServices) {
signatory <- allocateParty(signatoryName)
observers <- allocateParties(descriptor.numberOfObservers, observerName)
_ <- uploadTestDars()
_ <- submitCommands(descriptor, signatory, observers, maxInFlightCommands)
_ <- submitCommands(
descriptor,
signatory,
observers,
maxInFlightCommands,
submissionBatchSize,
)
} yield logger.info("Commands submitted successfully."))
.recoverWith { case NonFatal(ex) =>
logger.error(s"Command submission failed. Details: ${ex.getLocalizedMessage}", ex)
Expand Down Expand Up @@ -91,65 +99,90 @@ case class CommandSubmitter(services: LedgerApiServices) {
}
} yield ()

private def submitAndWait(id: String, party: Primitive.Party, command: Command)(implicit
private def submitAndWait(id: String, party: Primitive.Party, commands: List[Command])(implicit
ec: ExecutionContext
): Future[Unit] = {
val commands = new Commands(
val result = new Commands(
ledgerId = services.ledgerId,
applicationId = applicationId,
commandId = id,
party = party.unwrap,
commands = List(command),
commands = commands,
workflowId = workflowId,
)
services.commandService.submitAndWait(commands).map(_ => ())
services.commandService.submitAndWait(result).map(_ => ())
}

private def submitCommands(
descriptor: ContractSetDescriptor,
signatory: Primitive.Party,
observers: List[Primitive.Party],
maxInFlightCommands: Int,
submissionBatchSize: Int,
)(implicit
ec: ExecutionContext
): Future[Unit] = {
implicit val resourceContext: ResourceContext = ResourceContext(ec)

val numBatches: Int = descriptor.numberOfInstances / submissionBatchSize
val progressMeter = CommandSubmitter.ProgressMeter(descriptor.numberOfInstances)
// Output a log line roughly once per 10% progress, or once every 500 submissions (whichever comes first)
val progressLogInterval = math.min(descriptor.numberOfInstances / 10 + 1, 500)
val progressLoggingSink =
val progressLoggingSink = {
var lastInterval = 0
Sink.foreach[Int](index =>
if (index % progressLogInterval == 0) {
if (index / progressLogInterval != lastInterval) {
lastInterval = index / progressLogInterval
logger.info(progressMeter.getProgress(index))
}
)
val generator =
new CommandGenerator(RandomnessProvider.Default, descriptor, signatory, observers)

logger.info("Submitting commands...")
}

val generator = new CommandGenerator(
randomnessProvider = RandomnessProvider.Default,
signatory = signatory,
descriptor = descriptor,
observers = observers,
)

logger.info(
s"Submitting commands ($numBatches commands, $submissionBatchSize contracts per command)..."
)
materializerOwner()
.use { implicit materializer =>
Source
.fromIterator(() => (1 to descriptor.numberOfInstances).iterator)
.mapAsync(maxInFlightCommands) { index =>
generator.next() match {
case Success(command) =>
submitAndWait(
id = commandId(index),
party = signatory,
command = command(signatory),
).map(_ => index)
case Failure(ex) =>
Future.failed {
logger.error(s"Command generation failed. Details: ${ex.getLocalizedMessage}", ex)
CommandSubmitter.CommandSubmitterError(ex.getLocalizedMessage)
for {
_ <- Source
.fromIterator(() => (1 to descriptor.numberOfInstances).iterator)
.wireTap(i => if (i == 1) progressMeter.start())
.mapAsync(8)(index =>
Future.fromTry(
generator.next().map(cmd => index -> cmd)
)
)
.groupedWithin(submissionBatchSize, 1.minute)
.map(cmds => cmds.head._1 -> cmds.map(_._2).toList)
.buffer(maxInFlightCommands, OverflowStrategy.backpressure)
.mapAsync(maxInFlightCommands) { case (index, commands) =>
submitAndWait(
id = commandId(index),
party = signatory,
commands = commands,
)
.map(_ => index + commands.length - 1)
.recoverWith { case ex =>
Future.failed {
logger.error(
s"Command submission failed. Details: ${ex.getLocalizedMessage}",
ex,
)
CommandSubmitter.CommandSubmitterError(ex.getLocalizedMessage)
}
}
}
}
.runWith(progressLoggingSink)
.runWith(progressLoggingSink)
} yield ()
}
.map(_ => ())
}

private def materializerOwner(): ResourceOwner[Materializer] = {
Expand All @@ -163,9 +196,15 @@ case class CommandSubmitter(services: LedgerApiServices) {
object CommandSubmitter {
case class CommandSubmitterError(msg: String) extends RuntimeException(msg)

class ProgressMeter(totalItems: Int, startTimeMillis: Long) {
class ProgressMeter(totalItems: Int) {
var startTimeMillis: Long = System.currentTimeMillis()

def start(): Unit = {
startTimeMillis = System.currentTimeMillis()
}

def getProgress(index: Int): String =
f"Progress: $index/${totalItems} (${percentage(index)}%%). Remaining time: ${remainingSeconds(index)}%1.1f s"
f"Progress: $index/${totalItems} (${percentage(index)}%1.1f%%). Remaining time: ${remainingSeconds(index)}%1.1f s"

private def percentage(index: Int): Double = (index.toDouble / totalItems) * 100

Expand All @@ -183,8 +222,7 @@ object CommandSubmitter {

object ProgressMeter {
def apply(totalItems: Int) = new ProgressMeter(
totalItems = totalItems,
startTimeMillis = System.currentTimeMillis(),
totalItems = totalItems
)
}
}

0 comments on commit 8d48cf6

Please sign in to comment.