Skip to content

Commit

Permalink
SQL diagnostics (digital-asset#2662)
Browse files Browse the repository at this point in the history
* Use proper names for the DB connection pools.

* Add some trace logging for DB operations.

This does not capture timing of individual statements but
rather the time for "units of work".
For example "lookup contract" doesn't mean only loading
a single row from the contracts table, but also 2 additional
queries for looking up witnesses and divulgences.

This is not a problem, because this is trace level logging
that helps us debug problems and shouldn't be made sense of by
users at this stage.

* Make description a by-name parameter.

This avoids building up the string if trace logging
is not enabled (i.e. most of the time).
  • Loading branch information
gerolf-da authored Aug 30, 2019
1 parent 93f3950 commit ecb506e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 72 deletions.
2 changes: 1 addition & 1 deletion 3rdparty/dependencies.digest
Original file line number Diff line number Diff line change
@@ -1 +1 @@
af0693baa318c2e9c2ff8eec0755d1ecd080c77e dependencies.yaml
bbc77ba8f91b74f9e668199d82e6a6eed370ab89 dependencies.yaml
2 changes: 1 addition & 1 deletion 3rdparty/workspace.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def list_dependencies():
{"artifact": "org.ow2.asm:asm:5.0.4", "lang": "java", "sha1": "0da08b8cce7bbf903602a25a3a163ae252435795", "sha256": "896618ed8ae62702521a78bc7be42b7c491a08e6920a15f89a3ecdec31e9a220", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar", "source": {"sha1": "112ff54474f1f04ccf1384c92e39fdc566f0bb5e", "sha256": "7ba89bc14669d86c1c0dc6abaeb74a87715089f3b904cc2016969e8737d70707", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4-sources.jar"} , "name": "org_ow2_asm_asm", "actual": "@org_ow2_asm_asm//jar", "bind": "jar/org/ow2/asm/asm"},
{"artifact": "org.parboiled:parboiled_2.12:2.1.4", "lang": "scala", "sha1": "f8b74a59f4cab415e2900144c67648b370559e73", "sha256": "557836e4250a968be75d780c8392eeed2b164ea90dbe858530473aa14ee6d010", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/parboiled/parboiled_2.12/2.1.4/parboiled_2.12-2.1.4.jar", "source": {"sha1": "6defc3d4f5e036a6b9f046dbc5c278ed16805a71", "sha256": "dd1450c6bcd6b41cde8a4dbba802dfe27082a5dd17444b7425a48f228b5c38fa", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/parboiled/parboiled_2.12/2.1.4/parboiled_2.12-2.1.4-sources.jar"} , "name": "org_parboiled_parboiled_2_12", "actual": "@org_parboiled_parboiled_2_12//jar:file", "bind": "jar/org/parboiled/parboiled_2_12"},
{"artifact": "org.pcollections:pcollections:2.1.3", "lang": "java", "sha1": "bf8693946c08b76166afce4652270868d5cebfe2", "sha256": "f94a44a953006ea13f0ea989881ae17fa1c0378ed855d3ba9fef44b71f71a8d1", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/pcollections/pcollections/2.1.3/pcollections-2.1.3.jar", "source": {"sha1": "584a28a2146ea90f0440f0aed0b75d268c0cf971", "sha256": "326b66bf9a18727c59fdd77306cd2f22dfcf678049998b4f823dd29c632ccc0a", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/pcollections/pcollections/2.1.3/pcollections-2.1.3-sources.jar"} , "name": "org_pcollections_pcollections", "actual": "@org_pcollections_pcollections//jar", "bind": "jar/org/pcollections/pcollections"},
{"artifact": "org.postgresql:postgresql:9.4.1212", "lang": "java", "sha1": "38931d70811d9bfcecf9c06f7222973c038a12de", "sha256": "0cb2a158be31acf218e0a4e18c8f3dbac68a3bc34806468773fc44094b0c365d", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar", "source": {"sha1": "d007dc0d6b3470582929bae1346c0659a4d448eb", "sha256": "01c576ea0a727112f54d5784c0b54ce55190a931e5b62e96e14c6371252f7fb4", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212-sources.jar"} , "name": "org_postgresql_postgresql", "actual": "@org_postgresql_postgresql//jar", "bind": "jar/org/postgresql/postgresql"},
{"artifact": "org.postgresql:postgresql:42.2.6", "lang": "java", "sha1": "58586d617d57746853fc0b8f37c83a8384fcd0ca", "sha256": "07daadb33e87638703bf41f3307fc0dbdb386e54af5d5d6481511a36f50ca004", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/postgresql/postgresql/42.2.6/postgresql-42.2.6.jar", "source": {"sha1": "af3ec7e06e0dd122cadf96a069f054a31c99c574", "sha256": "0d7f02c19f88d42045111ea8352428b1074723f1f756e72989850351bdfeabea", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/postgresql/postgresql/42.2.6/postgresql-42.2.6-sources.jar"} , "name": "org_postgresql_postgresql", "actual": "@org_postgresql_postgresql//jar", "bind": "jar/org/postgresql/postgresql"},
{"artifact": "org.reactivestreams:reactive-streams-examples:1.0.2", "lang": "java", "sha1": "4859cb03b45cba468f374185a55f9770946b7cc6", "sha256": "b0285302f2b372f6423977a21344549b2aa0e4c8c053f66c9bdc478d632779b3", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/reactivestreams/reactive-streams-examples/1.0.2/reactive-streams-examples-1.0.2.jar", "source": {"sha1": "bc43f0594ab861d0aacb1ffe0d4d01ffa1544718", "sha256": "33b5499acdd9be206d6657c18026a02a77c3150967d8ef04e396ee9132c0764a", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/reactivestreams/reactive-streams-examples/1.0.2/reactive-streams-examples-1.0.2-sources.jar"} , "name": "org_reactivestreams_reactive_streams_examples", "actual": "@org_reactivestreams_reactive_streams_examples//jar", "bind": "jar/org/reactivestreams/reactive_streams_examples"},
{"artifact": "org.reactivestreams:reactive-streams-tck:1.0.2", "lang": "java", "sha1": "2746d2a19b32bfa9c2cdab53a78dc5bc1293eee3", "sha256": "67fa341c934ad75587f2274af1f4d156935ec7da44784d12d6f7cb6155aa338e", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/reactivestreams/reactive-streams-tck/1.0.2/reactive-streams-tck-1.0.2.jar", "source": {"sha1": "9cb5366da0be84338902b9fdd7fa34de1ca5918e", "sha256": "091743da053ddffbf632a77e9e583441b8ef8c0e4814ceedb7e6b35d6bc181a5", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/org/reactivestreams/reactive-streams-tck/1.0.2/reactive-streams-tck-1.0.2-sources.jar"} , "name": "org_reactivestreams_reactive_streams_tck", "actual": "@org_reactivestreams_reactive_streams_tck//jar", "bind": "jar/org/reactivestreams/reactive_streams_tck"},
# duplicates in org.reactivestreams:reactive-streams fixed to 1.0.2
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ dependencies:
org.postgresql:
postgresql:
lang: java
version: "9.4.1212"
version: "42.2.6"

io.spray:
spray-json:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ class HikariJdbcConnectionProvider(
private val logger = LoggerFactory.getLogger(getClass)
// these connections should never timeout as we have exactly the same number of threads using them as many connections we have
private val shortLivedDataSource =
createDataSource(noOfShortLivedConnections, noOfShortLivedConnections, 250.millis)
createDataSource(
"Short-Lived-Connections",
noOfShortLivedConnections,
noOfShortLivedConnections,
250.millis)

// this a dynamic pool as it's used for serving ACS snapshot requests, which we don't expect to get a lot
private val streamingDataSource =
createDataSource(1, noOfStreamingConnections, 60.seconds)
createDataSource("Streaming-Connections", 1, noOfStreamingConnections, 60.seconds)

private def createDataSource(
poolName: String,
minimumIdle: Int,
maxPoolSize: Int,
connectionTimeout: FiniteDuration) = {
Expand All @@ -55,14 +60,15 @@ class HikariJdbcConnectionProvider(
config.setMaximumPoolSize(maxPoolSize)
config.setMinimumIdle(minimumIdle)
config.setConnectionTimeout(connectionTimeout.toMillis)
config.setPoolName(poolName)

//note that Hikari uses auto-commit by default.
//in `runSql` below, the `.close()` will automatically trigger a commit.
new HikariDataSource(config)
}

private val temporaryMigrationDataSource =
createDataSource(1, 2, 250.millis) // Flyway needs 2 connections
createDataSource("Temp-Flyway-Migration", 1, 2, 250.millis) // Flyway needs 2 connections
try {
FlywayMigrations(temporaryMigrationDataSource, dbType).migrate()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ private class JdbcLedgerDao(

override def lookupLedgerId(): Future[Option[LedgerId]] =
dbDispatcher
.executeSql { implicit conn =>
.executeSql("get ledger id") { implicit conn =>
SQL_SELECT_LEDGER_ID
.as(ledgerString("ledger_id").map(id => LedgerId(id.toString)).singleOpt)
}

private val SQL_SELECT_LEDGER_END = SQL("select ledger_end from parameters")

override def lookupLedgerEnd(): Future[Long] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("get ledger end") { implicit conn =>
SQL_SELECT_LEDGER_END
.as(long("ledger_end").single)
}

private val SQL_SELECT_EXTERNAL_LEDGER_END = SQL("select external_ledger_end from parameters")

override def lookupExternalLedgerEnd(): Future[Option[LedgerString]] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("get external ledger end") { implicit conn =>
SQL_SELECT_EXTERNAL_LEDGER_END
.as(ledgerString("external_ledger_end").?.single)
}
Expand All @@ -85,7 +85,7 @@ private class JdbcLedgerDao(
"insert into parameters(ledger_id, ledger_end) VALUES({LedgerId}, {LedgerEnd})")

override def initializeLedger(ledgerId: LedgerId, ledgerEnd: LedgerOffset): Future[Unit] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("initialize ledger parameters") { implicit conn =>
val _ = SQL_INITIALIZE
.on("LedgerId" -> ledgerId.unwrap)
.on("LedgerEnd" -> ledgerEnd)
Expand Down Expand Up @@ -151,7 +151,7 @@ private class JdbcLedgerDao(
.map(AbsoluteContractId)

override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] =
dbDispatcher.executeSql(implicit conn => selectContractKey(key))
dbDispatcher.executeSql("lookup contract by key")(implicit conn => selectContractKey(key))

private def storeContract(offset: Long, contract: Contract)(
implicit connection: Connection): Unit = storeContracts(offset, List(contract))
Expand Down Expand Up @@ -558,7 +558,7 @@ private class JdbcLedgerDao(
}

dbDispatcher
.executeSql { implicit conn =>
.executeSql(s"store ledger entry [${ledgerEntry.getClass.getSimpleName}]") { implicit conn =>
val resp = insertEntry(ledgerEntry)
updateLedgerEnd(newLedgerEnd, externalOffset)
resp
Expand All @@ -578,23 +578,25 @@ private class JdbcLedgerDao(
}.toMap

dbDispatcher
.executeSql { implicit conn =>
// First, store all ledger entries without updating the ACS
// We can't use the storeLedgerEntry(), as that one does update the ACS
ledgerEntries.foreach {
case (i, le) =>
le match {
case tx: LedgerEntry.Transaction => storeTransaction(i, tx)
case rj: LedgerEntry.Rejection => storeRejection(i, rj)
case cp: LedgerEntry.Checkpoint => storeCheckpoint(i, cp)
}
}
.executeSql(
s"store initial state from scenario [${activeContracts.size} contracts, ${ledgerEntries.size} ledger entries") {
implicit conn =>
// First, store all ledger entries without updating the ACS
// We can't use the storeLedgerEntry(), as that one does update the ACS
ledgerEntries.foreach {
case (i, le) =>
le match {
case tx: LedgerEntry.Transaction => storeTransaction(i, tx)
case rj: LedgerEntry.Rejection => storeRejection(i, rj)
case cp: LedgerEntry.Checkpoint => storeCheckpoint(i, cp)
}
}

// Then, write the given ACS. We trust the caller to supply an ACS that is
// consistent with the given list of ledger entries.
activeContracts.foreach(c => storeContract(transactionIdMap(c.transactionId), c))
// Then, write the given ACS. We trust the caller to supply an ACS that is
// consistent with the given list of ledger entries.
activeContracts.foreach(c => storeContract(transactionIdMap(c.transactionId), c))

updateLedgerEnd(newLedgerEnd, None)
updateLedgerEnd(newLedgerEnd, None)
}
}

Expand Down Expand Up @@ -734,7 +736,7 @@ private class JdbcLedgerDao(

override def lookupLedgerEntry(offset: Long): Future[Option[LedgerEntry]] = {
dbDispatcher
.executeSql { implicit conn =>
.executeSql(s"lookup ledger entry at offset $offset") { implicit conn =>
SQL_SELECT_ENTRY
.on("ledger_offset" -> offset)
.as(EntryParser.singleOpt)
Expand All @@ -745,7 +747,7 @@ private class JdbcLedgerDao(

override def lookupTransaction(
transactionId: TransactionId): Future[Option[(LedgerOffset, LedgerEntry.Transaction)]] = {
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql(s"lookup transaction [$transactionId]") { implicit conn =>
SQL_SELECT_TRANSACTION
.on("transaction_id" -> (transactionId: String))
.as(EntryParser.singleOpt)
Expand Down Expand Up @@ -790,7 +792,7 @@ private class JdbcLedgerDao(
.map(mapContractDetails)

override def lookupActiveContract(contractId: AbsoluteContractId): Future[Option[Contract]] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql(s"lookup active contract [${contractId.coid}]") { implicit conn =>
lookupActiveContractSync(contractId)
}

Expand Down Expand Up @@ -897,11 +899,12 @@ private class JdbcLedgerDao(
PageSize,
(startI, endE) => {
Source
.fromFuture(dbDispatcher.executeSql { implicit conn =>
SQL_GET_LEDGER_ENTRIES
.on("startInclusive" -> startI, "endExclusive" -> endE)
.as(EntryParser.*)
.map(toLedgerEntry)
.fromFuture(dbDispatcher.executeSql(s"load ledger entries [$startI, $endE[") {
implicit conn =>
SQL_GET_LEDGER_ENTRIES
.on("startInclusive" -> startI, "endExclusive" -> endE)
.as(EntryParser.*)
.map(toLedgerEntry)
})
.flatMapConcat(Source(_))
}
Expand All @@ -921,7 +924,7 @@ private class JdbcLedgerDao(
.mapAsync(dbDispatcher.noOfShortLivedConnections) { contractResult =>
// it's ok to not have query isolation as witnesses cannot change once we saved them
dbDispatcher
.executeSql { implicit conn =>
.executeSql(s"load contract details ${contractResult._1}") { implicit conn =>
mapContractDetails(contractResult)
}
}
Expand Down Expand Up @@ -951,7 +954,7 @@ private class JdbcLedgerDao(
)

override def getParties: Future[List[PartyDetails]] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("load parties") { implicit conn =>
SQL_SELECT_PARTIES
.as(PartyDataParser.*)
// TODO: isLocal should be based on equality of participantId reported in an
Expand All @@ -968,7 +971,7 @@ private class JdbcLedgerDao(
override def storeParty(
party: Party,
displayName: Option[String]): Future[PersistenceResponse] = {
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql(s"store party [$party]") { implicit conn =>
Try {
SQL_INSERT_PARTY
.on(
Expand Down Expand Up @@ -1012,7 +1015,7 @@ private class JdbcLedgerDao(
)

override def listLfPackages: Future[Map[PackageId, PackageDetails]] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("load packages") { implicit conn =>
SQL_SELECT_PACKAGES
.as(PackageDataParser.*)
.map(
Expand All @@ -1025,7 +1028,7 @@ private class JdbcLedgerDao(
}

override def getLfArchive(packageId: PackageId): Future[Option[Archive]] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql(s"load archive [$packageId]") { implicit conn =>
SQL_SELECT_PACKAGE
.on(
"package_id" -> packageId
Expand All @@ -1044,26 +1047,28 @@ private class JdbcLedgerDao(
requirements.fold(
Future.failed,
_ =>
dbDispatcher.executeSql { implicit conn =>
val params = packages
.map(
p =>
Seq[NamedParameter](
"package_id" -> p._1.getHash,
"upload_id" -> uploadId,
"source_description" -> p._2.sourceDescription,
"size" -> p._2.size,
"known_since" -> p._2.knownSince,
"package" -> p._1.toByteArray
dbDispatcher.executeSql(
s"store packages [${packages.map(_._1.getHash).mkString(", ")}] with uploadId [$uploadId]") {
implicit conn =>
val params = packages
.map(
p =>
Seq[NamedParameter](
"package_id" -> p._1.getHash,
"upload_id" -> uploadId,
"source_description" -> p._2.sourceDescription,
"size" -> p._2.size,
"known_since" -> p._2.knownSince,
"package" -> p._1.toByteArray
)
)
)
val updated = executeBatchSql(dbType.SQL_INSERT_PACKAGE, params).map(math.max(0, _)).sum
val duplicates = packages.length - updated

Map(
PersistenceResponse.Ok -> updated,
PersistenceResponse.Duplicate -> duplicates
).filter(_._2 > 0)
val updated = executeBatchSql(dbType.SQL_INSERT_PACKAGE, params).map(math.max(0, _)).sum
val duplicates = packages.length - updated

Map(
PersistenceResponse.Ok -> updated,
PersistenceResponse.Duplicate -> duplicates
).filter(_._2 > 0)
}
)
}
Expand All @@ -1080,7 +1085,7 @@ private class JdbcLedgerDao(
""".stripMargin)

override def reset(): Future[Unit] =
dbDispatcher.executeSql { implicit conn =>
dbDispatcher.executeSql("truncate all tables") { implicit conn =>
val _ = SQL_TRUNCATE_ALL_TABLES.execute()
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait DbDispatcher extends AutoCloseable {
* The isolation level by default is the one defined in the JDBC driver, it can be however overridden per query on
* the Connection. See further details at: https://docs.oracle.com/cd/E19830-01/819-4721/beamv/index.html
*/
def executeSql[T](sql: Connection => T): Future[T]
def executeSql[T](description: => String)(sql: Connection => T): Future[T]

/**
* Creates a lazy Source, which takes care of:
Expand Down Expand Up @@ -68,8 +68,8 @@ private class DbDispatcherImpl(
logger.error(s"got an uncaught exception on thread: ${thread.getName}", t))
.build()))

override def executeSql[T](sql: Connection => T): Future[T] =
sqlExecutor.runQuery(() => connectionProvider.runSQL(conn => sql(conn)))
override def executeSql[T](description: => String)(sql: Connection => T): Future[T] =
sqlExecutor.runQuery(description, () => connectionProvider.runSQL(conn => sql(conn)))

override def runStreamingSql[T](
sql: Connection => Source[T, Future[Done]]): Source[T, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.digitalasset.platform.sandbox.stores.ledger.sql.util

import java.util.concurrent.Executors
import java.util.concurrent.{Executors, TimeUnit}

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.LoggerFactory
Expand All @@ -28,11 +28,22 @@ class SqlExecutor(noOfThread: Int) extends AutoCloseable {
.build()
)

def runQuery[A](block: () => A): Future[A] = {
def runQuery[A](description: => String, block: () => A): Future[A] = {
val promise = Promise[A]
val startWait = System.nanoTime()
executor.execute(() => {
try {
promise.success(block())
val elapsedWait = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startWait)
val start = System.nanoTime()
val res = block()
val elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)

if (logger.isTraceEnabled) {
logger.trace(
s"""DB Operation "$description": wait time ${elapsedWait}ms, execution time ${elapsed}ms""")
}

promise.success(res)
} catch {
case NonFatal(e) =>
promise.failure(e)
Expand Down
Loading

0 comments on commit ecb506e

Please sign in to comment.