Skip to content

Commit

Permalink
Properly adhere to DbType#supportsParallelWrites in SqlLedger (digita…
Browse files Browse the repository at this point in the history
…l-asset#2902)

Since we still have issues with parallel writes when using H2,
we should properly only use a single DB connection and executor thread
for that.

Because we didn't do that before for Sandbox, tests like PackageManagement
appeared to be flaky due to racy inserts in H2.
  • Loading branch information
gerolf-da authored Sep 16, 2019
1 parent 6ed6e0e commit ad1b885
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlLedger.{
noOfShortLivedConnections,
noOfStreamingConnections
defaultNumberOfShortLivedConnections,
defaultNumberOfStreamingConnections
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
DbType,
Expand Down Expand Up @@ -106,8 +106,8 @@ class JdbcIndexerFactory[Status <: InitStatus] private () {
val dbDispatcher =
DbDispatcher(
jdbcUrl,
if (dbType.supportsParallelWrites) noOfShortLivedConnections else 1,
noOfStreamingConnections)
if (dbType.supportsParallelWrites) defaultNumberOfShortLivedConnections else 1,
defaultNumberOfStreamingConnections)
val ledgerDao = LedgerDao.metered(
JdbcLedgerDao(
dbDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ object SqlStartMode {

object SqlLedger {

val noOfShortLivedConnections = 16
val noOfStreamingConnections = 2
val defaultNumberOfShortLivedConnections = 16
val defaultNumberOfStreamingConnections = 2

//jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret"
def apply(
Expand All @@ -81,8 +81,10 @@ object SqlLedger {
new FlywayMigrations(jdbcUrl).migrate()

val dbType = DbType.jdbcType(jdbcUrl)
val noOfShortLivedConnections =
if (dbType.supportsParallelWrites) defaultNumberOfShortLivedConnections else 1
val dbDispatcher =
DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
DbDispatcher(jdbcUrl, noOfShortLivedConnections, defaultNumberOfStreamingConnections)

val ledgerDao = LedgerDao.metered(
JdbcLedgerDao(
Expand All @@ -103,7 +105,10 @@ object SqlLedger {
packages,
initialLedgerEntries,
queueDepth,
dbType.supportsParallelWrites)
// we use noOfShortLivedConnections for the maximum batch size, since it doesn't make sense
// to try to persist more ledger entries concurrently than we have SQL executor threads and SQL connections available.
noOfShortLivedConnections
)
}
}

Expand All @@ -114,12 +119,10 @@ private class SqlLedger(
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
queueDepth: Int,
parallelLedgerAppend: Boolean)(implicit mat: Materializer)
maxBatchSize: Int)(implicit mat: Materializer)
extends BaseLedger(ledgerId, headAtInitialization, ledgerDao)
with Ledger {

import SqlLedger._

private val logger = LoggerFactory.getLogger(getClass)

// the reason for modelling persistence as a reactive pipeline is to avoid having race-conditions between the
Expand Down Expand Up @@ -166,9 +169,8 @@ private class SqlLedger(
// that this is safe on the read end because the readers rely on the dispatchers to know the
// ledger end, and not the database itself. This means that they will not start reading from the new
// ledger end until we tell them so, which we do when _all_ the entries have been committed.
val maxBatchSize = if (parallelLedgerAppend) noOfShortLivedConnections * 2L else 1L
mergedSources
.batch(maxBatchSize, e => Queue(e))((batch, e) => batch :+ e)
.batch(maxBatchSize.toLong, e => Queue(e))((batch, e) => batch :+ e)
.mapAsync(1) { queue =>
val startOffset = dispatcher.getHead()
// we can only do this because there is no parallelism here!
Expand Down Expand Up @@ -337,7 +339,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
* used if starting from a fresh database.
* @param queueDepth the depth of the buffer for persisting entries. When gets full, the system will signal back-pressure
* upstream
* @param parallelLedgerAppend whether to append to the ledger in parallelized batches
* @param maxBatchSize maximum size of ledger entry batches to be persisted
* @return a compliant Ledger implementation
*/
def createSqlLedger(
Expand All @@ -348,7 +350,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int,
parallelLedgerAppend: Boolean
maxBatchSize: Int
)(implicit mat: Materializer): Future[SqlLedger] = {
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
implicit val ec = DEC
Expand All @@ -374,7 +376,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
timeProvider,
packages,
queueDepth,
parallelLedgerAppend)
maxBatchSize)
}

private def reset(): Future[Unit] =
Expand Down

0 comments on commit ad1b885

Please sign in to comment.