Skip to content

Commit

Permalink
Make ACS reader parameters configurable (digital-asset#11732)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
rautenrieth-da authored Nov 23, 2021
1 parent 329e609 commit bd2a685
Show file tree
Hide file tree
Showing 19 changed files with 186 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ case class ApiServerConfig(
configurationLoadTimeout: Duration,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism,
acsIdPageSize: Int = IndexConfiguration.DefaultAcsIdPageSize,
acsIdFetchingParallelism: Int = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism: Int = IndexConfiguration.DefaultAcsContractFetchingParallelism,
portFile: Option[Path],
seeding: Seeding,
managementServiceTimeout: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ final class StandaloneApiServer(
databaseConnectionTimeout = config.databaseConnectionTimeout,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ object IndexConfiguration {

val DefaultEventsPageSize: Int = 1000
val DefaultEventsProcessingParallelism: Int = 8

val DefaultAcsIdPageSize: Int = 20000
val DefaultAcsIdFetchingParallelism: Int = 2
val DefaultAcsContractFetchingParallelism: Int = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ private[platform] object JdbcIndex {
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -48,6 +51,9 @@ private[platform] object JdbcIndex {
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ private[platform] object ReadOnlySqlLedger {
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -190,6 +193,9 @@ private[platform] object ReadOnlySqlLedger {
dbDispatcher = dbDispatcher,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ object IndexMetadata {
dbDispatcher = dbDispatcher,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
acsIdPageSize = 20000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private class JdbcLedgerDao(
servicesExecutionContext: ExecutionContext,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -662,9 +665,9 @@ private class JdbcLedgerDao(
queryNonPruned = queryNonPruned,
eventStorageBackend = readStorageBackend.eventStorageBackend,
pageSize = eventsPageSize,
idPageSize = eventsPageSize * 20,
idFetchingParallelism = 2,
acsFetchingparallelism = 2,
idPageSize = acsIdPageSize,
idFetchingParallelism = acsIdFetchingParallelism,
acsFetchingparallelism = acsContractFetchingParallelism,
metrics = metrics,
materializer = materializer,
),
Expand Down Expand Up @@ -773,6 +776,9 @@ private[platform] object JdbcLedgerDao {
dbDispatcher: DbDispatcher,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -790,6 +796,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
false,
metrics,
lfValueTranslationCache,
Expand All @@ -812,6 +821,9 @@ private[platform] object JdbcLedgerDao {
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -829,6 +841,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
false,
metrics,
lfValueTranslationCache,
Expand All @@ -851,6 +866,9 @@ private[platform] object JdbcLedgerDao {
sequentialWriteDao: SequentialWriteDao,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -869,6 +887,9 @@ private[platform] object JdbcLedgerDao {
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
true,
metrics,
lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
protected def daoOwner(
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
Expand Down Expand Up @@ -85,6 +88,9 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down Expand Up @@ -120,7 +126,14 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate()
)
dao <- daoOwner(100, 4, errorFactories).acquire()
dao <- daoOwner(
eventsPageSize = 100,
eventsProcessingParallelism = 4,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
errorFactories,
).acquire()
_ <- Resource.fromFuture(dao.initialize(TestLedgerId, TestParticipantId))
initialLedgerEnd <- Resource.fromFuture(dao.lookupLedgerEnd())
_ = ledgerEndCache.set(initialLedgerEnd.lastOffset -> initialLedgerEnd.lastEventSeqId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
override protected def daoOwner(
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
Expand Down Expand Up @@ -76,6 +79,9 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,13 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid

// `pageSize = 2` and the offset gaps in the `commandWithOffsetGaps` above are to make sure
// that streaming works with event pages separated by offsets that don't have events in the store
ledgerDao <- createLedgerDao(pageSize = 2, eventsProcessingParallelism = 8)
ledgerDao <- createLedgerDao(
pageSize = 2,
eventsProcessingParallelism = 8,
acsIdPageSize = 2,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
)

response <- ledgerDao.transactionsReader
.getFlatTransactions(
Expand Down Expand Up @@ -633,11 +639,20 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
): Vector[Transaction] =
responses.foldLeft(Vector.empty[Transaction])((b, a) => b ++ a._2.transactions.toVector)

private def createLedgerDao(pageSize: Int, eventsProcessingParallelism: Int) =
private def createLedgerDao(
pageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
) =
LoggingContext.newLoggingContext { implicit loggingContext =>
daoOwner(
eventsPageSize = pageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
MockitoSugar.mock[ErrorFactories],
).acquire()(ResourceContext(executionContext))
}.asFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ final case class Config[Extra](
maxDeduplicationDuration: Option[Duration],
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
stateValueCache: caching.WeightedCache.Configuration,
lfValueTranslationEventCache: caching.SizedCache.Configuration,
lfValueTranslationContractCache: caching.SizedCache.Configuration,
Expand Down Expand Up @@ -76,6 +79,9 @@ object Config {
configurationLoadTimeout = Duration.ofSeconds(10),
eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism = IndexConfiguration.DefaultEventsProcessingParallelism,
acsIdPageSize = IndexConfiguration.DefaultAcsIdPageSize,
acsIdFetchingParallelism = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism = IndexConfiguration.DefaultAcsContractFetchingParallelism,
stateValueCache = caching.WeightedCache.Configuration.none,
lfValueTranslationEventCache = caching.SizedCache.Configuration.none,
lfValueTranslationContractCache = caching.SizedCache.Configuration.none,
Expand Down Expand Up @@ -466,6 +472,43 @@ object Config {
config.copy(eventsProcessingParallelism = eventsProcessingParallelism)
)

opt[Int]("acs-id-page-size")
.optional()
.text(
s"Number of contract ids fetched from the index for every round trip when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsIdPageSize}."
)
.validate { acsIdPageSize =>
if (acsIdPageSize > 0) Right(())
else Left("acs-id-page-size should be strictly positive")
}
.action((acsIdPageSize, config) => config.copy(acsIdPageSize = acsIdPageSize))

opt[Int]("acs-id-fetching-parallelism")
.optional()
.text(
s"Number of contract id pages fetched in parallel when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsIdFetchingParallelism}."
)
.validate { acsIdFetchingParallelism =>
if (acsIdFetchingParallelism > 0) Right(())
else Left("acs-id-fetching-parallelism should be strictly positive")
}
.action((acsIdFetchingParallelism, config) =>
config.copy(acsIdFetchingParallelism = acsIdFetchingParallelism)
)

opt[Int]("acs-contract-fetching-parallelism")
.optional()
.text(
s"Number of event pages fetched in parallel when serving ACS calls. Default is ${IndexConfiguration.DefaultAcsContractFetchingParallelism}."
)
.validate { acsContractFetchingParallelism =>
if (acsContractFetchingParallelism > 0) Right(())
else Left("acs-contract-fetching-parallelism should be strictly positive")
}
.action((acsContractFetchingParallelism, config) =>
config.copy(acsContractFetchingParallelism = acsContractFetchingParallelism)
)

opt[Long]("max-state-value-cache-size")
.optional()
.text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ class RecoveringIndexerIntegrationSpec
dbDispatcher = dbDispatcher,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 20000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ final class SandboxServer(
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ private[sandbox] object SandboxIndexAndWriteService {
templateStore: InMemoryPackageStore,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -88,6 +91,9 @@ private[sandbox] object SandboxIndexAndWriteService {
startMode = startMode,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private[sandbox] object SqlLedger {
startMode: SqlStartMode,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -318,6 +321,9 @@ private[sandbox] object SqlLedger {
),
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ private[sandbox] object LedgerResource {
startMode = SqlStartMode.ResetAndStart,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = servicesExecutionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ final class SqlLedgerSpec
startMode = SqlStartMode.MigrateAndStart,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 2000,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
servicesExecutionContext = executionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Loading

0 comments on commit bd2a685

Please sign in to comment.