Skip to content

Commit

Permalink
ledger-on-sql: PostgreSQL support. (digital-asset#4204)
Browse files Browse the repository at this point in the history
* ledger-on-sql: Don't bother cleaning up after integration tests.

Turns out Bazel cleans up before each test run, so we only have one
set of outputs at a time. This is far more useful for debugging anyway.

* ledger-on-sql: Pull out a test-lib to simplify the conformance tests.

* ledger-on-sql: Turn `Database` from a sealed trait into a case class.

* ledger-on-sql: Support for PostgreSQL!

CHANGELOG_BEGIN
CHANGELOG_END

* ledger-on-sql: Run the conformance tests against PostgreSQL.

* ledger-on-sql: Run the LotsOfParties conformance tests.

* ledger-on-sql: Use PostgreSQL's `RETURNING` keyword to save a query.

* ledger-on-sql: Ensure the reader connection pool is read-only.

* ledger-on-sql: Avoid cloning ByteStrings as byte arrays where possible.

Instead, use `ByteString#newInput()`, which returns an `InputStream`.
This is supported by H2 and PostgreSQL, but unfortunately not SQLite.

* ledger-on-sql: Run integration tests in parallel.

* Update the PostgreSQL and SQLite JDBC drivers.
  • Loading branch information
SamirTalwar authored Jan 24, 2020
1 parent 020f694 commit 0be3e72
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 131 deletions.
4 changes: 2 additions & 2 deletions bazel-java-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def install_java_deps():
"org.mongodb:mongodb-driver-async:3.6.4",
"org.mongodb:mongodb-driver-core:3.6.4",
"org.pcollections:pcollections:2.1.3",
"org.postgresql:postgresql:42.2.6",
"org.postgresql:postgresql:42.2.9",
"org.reactivestreams:reactive-streams:1.0.2",
"org.reactivestreams:reactive-streams-tck:1.0.2",
"org.sangria-graphql:sangria_2.12:1.4.2",
Expand All @@ -142,7 +142,7 @@ def install_java_deps():
"org.tpolecat:doobie-postgres_2.12:0.6.0",
"org.typelevel:paiges-core_2.12:0.2.1",
"org.wartremover:wartremover_2.12:2.2.0",
"org.xerial:sqlite-jdbc:3.25.2",
"org.xerial:sqlite-jdbc:3.30.1",
"uk.co.datumedge:hamcrest-json:0.2",
],
fetch_sources = True,
Expand Down
64 changes: 42 additions & 22 deletions ledger/ledger-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_library",
"da_scala_test",
"da_scala_test_suite",
)
load("//ledger/ledger-api-test-tool:conformance.bzl", "conformance_test")

supported_databases = [
{
"name": "h2-memory",
"runtime_deps": [
"@maven//:com_h2database_h2",
],
"conformance_test_tags": [
"manual",
],
Expand All @@ -22,12 +25,12 @@ supported_databases = [
"--concurrent-test-runs=2",
"--timeout-scale-factor=4",
],
"runtime_deps": [
"@maven//:com_h2database_h2",
],
},
{
"name": "h2-file",
"runtime_deps": [
"@maven//:com_h2database_h2",
],
"conformance_test_tags": [
"manual",
],
Expand All @@ -39,28 +42,32 @@ supported_databases = [
"--concurrent-test-runs=2",
"--timeout-scale-factor=4",
],
},
{
"name": "postgresql",
"runtime_deps": [
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
],
"conformance_test_server_main": "com.daml.ledger.on.sql.MainWithEphemeralPostgresql",
},
{
"name": "sqlite-memory",
"conformance_test_server_args": [
"--jdbc-url=jdbc:sqlite::memory:",
],
"runtime_deps": [
"@maven//:org_xerial_sqlite_jdbc",
],
"conformance_test_server_args": [
"--jdbc-url=jdbc:sqlite::memory:",
],
},
{
"name": "sqlite-file",
"runtime_deps": [
"@maven//:org_xerial_sqlite_jdbc",
],
"conformance_test_server_main": "com.daml.ledger.on.sql.MainWithEphemeralDirectory",
"conformance_test_server_args": [
"--jdbc-url=jdbc:sqlite:%DIR/test.sqlite",
],
"runtime_deps": [
"@maven//:org_xerial_sqlite_jdbc",
],
},
]

Expand Down Expand Up @@ -97,7 +104,27 @@ da_scala_library(
],
)

da_scala_test(
da_scala_library(
name = "ledger-on-sql-test-lib",
srcs = glob(["src/test/lib/scala/**/*.scala"]),
visibility = [
"//visibility:public",
],
deps = [
":ledger-on-sql",
"//daml-lf/data",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
],
)

da_scala_test_suite(
name = "ledger-on-sql-tests",
size = "small",
srcs = glob(["src/test/suite/**/*.scala"]),
Expand All @@ -107,6 +134,7 @@ da_scala_test(
resources = glob(["src/test/resources/*"]),
runtime_deps = [
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:org_xerial_sqlite_jdbc",
],
deps = [
Expand All @@ -120,6 +148,7 @@ da_scala_test(
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/postgresql-testing",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
Expand All @@ -132,20 +161,12 @@ da_scala_test(
(
da_scala_binary(
name = "conformance-test-{}-bin".format(db["name"]),
srcs = glob(["src/test/lib/**/*.scala"]),
main_class = db.get("conformance_test_server_main", "com.daml.ledger.on.sql.Main"),
visibility = ["//visibility:public"],
runtime_deps = db.get("runtime_deps", []),
deps = [
":ledger-on-sql",
"//daml-lf/data",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/resources",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
":ledger-on-sql-test-lib",
],
),
conformance_test(
Expand All @@ -157,7 +178,6 @@ da_scala_test(
test_tool_args = db.get("conformance_test_tool_args", []) + [
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=LotsOfPartiesIT",
"--exclude=TimeIT",
"--exclude=TransactionScaleIT",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
package com.daml.ledger.on.sql

import com.daml.ledger.on.sql.queries.Queries.InvalidDatabaseException
import com.daml.ledger.on.sql.queries.{H2Queries, Queries, SqliteQueries}
import com.daml.ledger.on.sql.queries.{H2Queries, PostgresqlQueries, Queries, SqliteQueries}
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.resources.ResourceOwner
import com.zaxxer.hikari.HikariDataSource
import javax.sql.DataSource

sealed trait Database {
val queries: Queries

val readerConnectionPool: DataSource

val writerConnectionPool: DataSource
}
case class Database(
queries: Queries,
readerConnectionPool: DataSource,
writerConnectionPool: DataSource,
)

object Database {
private val logger = ContextualizedLogger.get(classOf[Database])
Expand All @@ -31,54 +29,45 @@ object Database {

def owner(jdbcUrl: String)(implicit logCtx: LoggingContext): ResourceOwner[Database] =
(jdbcUrl match {
case url if url.startsWith("jdbc:h2:") => MultipleReaderSingleWriterDatabase.owner(jdbcUrl)
case url if url.startsWith("jdbc:sqlite:") => SingleConnectionDatabase.owner(jdbcUrl)
case url if url.startsWith("jdbc:h2:") =>
MultipleReaderSingleWriterDatabase.owner(jdbcUrl, new H2Queries)
case url if url.startsWith("jdbc:postgresql:") =>
MultipleReaderSingleWriterDatabase.owner(jdbcUrl, new PostgresqlQueries)
case url if url.startsWith("jdbc:sqlite:") =>
SingleConnectionDatabase.owner(jdbcUrl, new SqliteQueries)
case _ => throw new InvalidDatabaseException(jdbcUrl)
}).map { database =>
logger.info(s"Connected to the ledger over JDBC: $jdbcUrl")
database
}

final class MultipleReaderSingleWriterDatabase(
override val readerConnectionPool: DataSource,
override val writerConnectionPool: DataSource,
) extends Database {
override val queries: Queries = new H2Queries
}

object MultipleReaderSingleWriterDatabase {
def owner(jdbcUrl: String): ResourceOwner[MultipleReaderSingleWriterDatabase] =
def owner(jdbcUrl: String, queries: Queries): ResourceOwner[Database] =
for {
readerConnectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maximumPoolSize = None))
newHikariDataSource(jdbcUrl, readOnly = true))
writerConnectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maximumPoolSize = Some(MaximumWriterConnectionPoolSize)))
} yield new MultipleReaderSingleWriterDatabase(readerConnectionPool, writerConnectionPool)
}

final class SingleConnectionDatabase(connectionPool: DataSource) extends Database {
override val queries: Queries = new SqliteQueries

override val readerConnectionPool: DataSource = connectionPool

override val writerConnectionPool: DataSource = connectionPool
} yield new Database(queries, readerConnectionPool, writerConnectionPool)
}

object SingleConnectionDatabase {
def owner(jdbcUrl: String): ResourceOwner[SingleConnectionDatabase] =
def owner(jdbcUrl: String, queries: Queries): ResourceOwner[Database] =
for {
connectionPool <- ResourceOwner.forCloseable(() =>
newHikariDataSource(jdbcUrl, maximumPoolSize = Some(MaximumWriterConnectionPoolSize)))
} yield new SingleConnectionDatabase(connectionPool)
} yield new Database(queries, connectionPool, connectionPool)
}

private def newHikariDataSource(
jdbcUrl: String,
maximumPoolSize: Option[Int],
maximumPoolSize: Option[Int] = None,
readOnly: Boolean = false,
): HikariDataSource = {
val pool = new HikariDataSource()
pool.setAutoCommit(false)
pool.setJdbcUrl(jdbcUrl)
pool.setReadOnly(readOnly)
maximumPoolSize.foreach { maximumPoolSize =>
pool.setMaximumPoolSize(maximumPoolSize)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class SqlLedgerReaderWriter(
)
verifyStateUpdatesAgainstPreDeclaredOutputs(stateUpdates, entryId, submission)
queries.updateState(stateUpdates)
appendLog(entryId, Envelope.enclose(logEntry))
val latestSequenceNo = queries.insertIntoLog(entryId, Envelope.enclose(logEntry))
latestSequenceNo + 1
}
dispatcher.signalNewHead(newHead)
SubmissionResult.Acknowledged
Expand Down Expand Up @@ -130,14 +131,6 @@ class SqlLedgerReaderWriter(
.setEntryId(ByteString.copyFromUtf8(UUID.randomUUID().toString))
.build()

private def appendLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
queries.insertIntoLog(entry, envelope)
queries.lastLogInsertId() + 1
}

private def readState(
stateInputKeys: Set[DamlStateKey],
)(implicit connection: Connection): Map[DamlStateKey, Option[DamlStateValue]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import com.google.protobuf.ByteString
import scala.collection.immutable

trait CommonQueries extends Queries {
override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key VARBINARY(16384) PRIMARY KEY NOT NULL, value BLOB NOT NULL)"
.execute()
()
}

override def selectFromLog(
start: Index,
end: Index,
Expand All @@ -45,15 +39,6 @@ trait CommonQueries extends Queries {
}.*
)

override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Unit = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.toByteArray}, ${envelope.toByteArray})"
.executeInsert()
()
}

override def selectStateByKeys(
keys: Iterable[DamlStateKey],
)(implicit connection: Connection): immutable.Seq[(DamlStateKey, Option[DamlStateValue])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import java.sql.Connection
import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString

class H2Queries extends Queries with CommonQueries {
override def createLogTable()(implicit connection: Connection): Unit = {
Expand All @@ -16,9 +18,21 @@ class H2Queries extends Queries with CommonQueries {
()
}

override def lastLogInsertId()(implicit connection: Connection): Index =
override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key VARBINARY(16384) PRIMARY KEY NOT NULL, value BLOB NOT NULL)"
.execute()
()
}

override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.newInput()}, ${envelope.newInput()})"
.executeInsert()
SQL"CALL IDENTITY()"
.as(long("IDENTITY()").single)
}

override protected val updateStateQuery: String =
"MERGE INTO state VALUES ({key}, {value})"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.on.sql.queries

import java.sql.Connection

import anorm.SqlParser._
import anorm._
import com.daml.ledger.on.sql.queries.Queries.Index
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.google.protobuf.ByteString

class PostgresqlQueries extends Queries with CommonQueries {
override def createLogTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS log (sequence_no SERIAL PRIMARY KEY, entry_id BYTEA NOT NULL, envelope BYTEA NOT NULL)"
.execute()
()
}

override def createStateTable()(implicit connection: Connection): Unit = {
SQL"CREATE TABLE IF NOT EXISTS state (key BYTEA PRIMARY KEY NOT NULL, value BYTEA NOT NULL)"
.execute()
()
}

override def insertIntoLog(
entry: DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Index = {
SQL"INSERT INTO log (entry_id, envelope) VALUES (${entry.getEntryId.newInput()}, ${envelope.newInput()}) RETURNING sequence_no"
.as(long("sequence_no").single)
}

override protected val updateStateQuery: String =
"INSERT INTO state VALUES ({key}, {value}) ON CONFLICT(key) DO UPDATE SET value = {value}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ trait Queries {
def insertIntoLog(
entry: DamlKvutils.DamlLogEntryId,
envelope: ByteString,
)(implicit connection: Connection): Unit

def lastLogInsertId()(implicit connection: Connection): Index
)(implicit connection: Connection): Index

def selectStateByKeys(
keys: Iterable[DamlKvutils.DamlStateKey],
Expand Down
Loading

0 comments on commit 0be3e72

Please sign in to comment.