Skip to content

Commit

Permalink
Explicitly run schema migration/validation (digital-asset#2832)
Browse files Browse the repository at this point in the history
The Ledger Indexer can now be started in 3 modes:

- validate schema and start
- migrate schema and start
- migrate schema only and exit

Contributes to digital-asset#2660.
  • Loading branch information
gerolf-da authored Sep 10, 2019
1 parent 0fdf854 commit eefd481
Show file tree
Hide file tree
Showing 19 changed files with 227 additions and 161 deletions.
10 changes: 7 additions & 3 deletions ledger/api-server-damlonx/reference-v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ conformance_test(
server = ":reference-v2",
server_args = [
"--port 6865",
"--jdbc-url jdbc:h2:mem:daml_on_x;db_close_on_exit=false",
# db_close_delay=-1 is needed so that the in-memory database is not closed (and therefore lost)
# after the flyway migration
"--jdbc-url jdbc:h2:mem:daml_on_x;db_close_delay=-1;db_close_on_exit=false",
],
test_tool_args = [
"--all-tests",
Expand All @@ -104,8 +106,10 @@ conformance_test(
server = ":reference-v2",
server_args = [
"--port 6865",
"--jdbc-url jdbc:h2:mem:daml_on_x;db_close_on_exit=false",
"--extra-participant second-participant,6866,jdbc:h2:mem:daml_on_x2;db_close_on_exit=false",
# db_close_delay=-1 is needed so that the in-memory database is not closed (and therefore lost)
# after the flyway migration
"--jdbc-url jdbc:h2:mem:daml_on_x;db_close_delay=-1;db_close_on_exit=false",
"--extra-participant second-participant,6866,jdbc:h2:mem:daml_on_x2;db_close_delay=-1;db_close_on_exit=false",
],
test_tool_args = [
"--include=SemanticTests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import java.util.concurrent.atomic.AtomicBoolean

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.daml.ledger.api.server.damlonx.reference.v2.cli.Cli
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.platform.index.cli.Cli
import com.digitalasset.platform.index.{StandaloneIndexServer, StandaloneIndexerServer}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -54,20 +54,21 @@ object ReferenceServer extends App {
} yield ledger.uploadPackages(dar.all, None)
}

val indexerServer = StandaloneIndexerServer(readService, config.jdbcUrl)
val indexerServer = StandaloneIndexerServer(readService, config)
val indexServer = StandaloneIndexServer(config, readService, writeService).start()

val extraParticipants =
for {
(participantId, port, jdbcUrl) <- config.extraPartipants
} yield {
val extraIndexer = StandaloneIndexerServer(readService, jdbcUrl)
val participantConfig = config.copy(
port = port,
participantId = participantId,
jdbcUrl = jdbcUrl
)
val extraIndexer = StandaloneIndexerServer(readService, participantConfig)
val extraLedgerApiServer = StandaloneIndexServer(
config.copy(
port = port,
participantId = participantId,
jdbcUrl = jdbcUrl
),
participantConfig,
readService,
writeService
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.index.cli
package com.daml.ledger.api.server.damlonx.reference.v2.cli

import java.io.File

Expand Down
23 changes: 7 additions & 16 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ compileDependencies = [
"//3rdparty/jvm/com/typesafe/akka:akka_actor",
"//3rdparty/jvm/com/google/guava:guava",
"//3rdparty/jvm/org/postgresql:postgresql",
"//3rdparty/jvm/com/h2database:h2",
"//3rdparty/jvm/com/zaxxer:HikariCP",
"//3rdparty/jvm/org/flywaydb:flyway_core",
"//3rdparty/jvm/com/typesafe/play:anorm",
Expand Down Expand Up @@ -93,7 +94,6 @@ da_scala_binary(
visibility = ["//visibility:public"],
deps = [
":sandbox",
"//3rdparty/jvm/com/h2database:h2",
],
)

Expand Down Expand Up @@ -142,7 +142,6 @@ testDependencies = [
"//3rdparty/jvm/org/scalacheck:scalacheck",
"//3rdparty/jvm/org/awaitility:awaitility",
"//3rdparty/jvm/commons_io:commons_io",
"//3rdparty/jvm/com/h2database:h2",
"//bazel_tools/runfiles:scala_runfiles",
] + compileDependencies

Expand Down Expand Up @@ -243,18 +242,14 @@ conformance_test(

conformance_test(
name = "conformance-test-static-time-h2database",
extra_data = [
"@postgresql_dev_env//:all",
"@postgresql_dev_env//:createdb",
"@postgresql_dev_env//:initdb",
"@postgresql_dev_env//:pg_ctl",
],
server = ":sandbox-binary",
server_args = [
"--port 6865",
"--static-time",
"--eager-package-loading",
"--sql-backend-jdbcurl jdbc:h2:mem:static_time",
# db_close_delay=-1 is needed so that the in-memory database is not closed (and therefore lost)
# after the flyway migration
"--sql-backend-jdbcurl jdbc:h2:mem:static_time;db_close_delay=-1",
],
test_tool_args = [
"--all-tests",
Expand All @@ -264,18 +259,14 @@ conformance_test(

conformance_test(
name = "conformance-test-wall-clock-h2database",
extra_data = [
"@postgresql_dev_env//:all",
"@postgresql_dev_env//:createdb",
"@postgresql_dev_env//:initdb",
"@postgresql_dev_env//:pg_ctl",
],
server = ":sandbox-binary",
server_args = [
"--port 6865",
"--wall-clock-time",
"--eager-package-loading",
"--sql-backend-jdbcurl jdbc:h2:mem:wall_clock_time",
# db_close_delay=-1 is needed so that the in-memory database is not closed (and therefore lost)
# after the flyway migration
"--sql-backend-jdbcurl jdbc:h2:mem:wall_clock_time;db_close_delay=-1",
],
test_tool_args = [
"--all-tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlLedger.{
noOfStreamingConnections
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
DbType,
JdbcLedgerDao,
LedgerDao,
PersistenceEntry,
JdbcLedgerDao
PersistenceEntry
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ContractSerializer,
KeyHasher,
Expand All @@ -45,14 +47,32 @@ import scalaz.syntax.tag._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object JdbcIndexer {
sealed trait InitStatus
final abstract class Initialized extends InitStatus
final abstract class Uninitialized extends InitStatus

object JdbcIndexerFactory {
def apply(): JdbcIndexerFactory[Uninitialized] = new JdbcIndexerFactory[Uninitialized]()
}

class JdbcIndexerFactory[Status <: InitStatus] private () {
private val logger = LoggerFactory.getLogger(classOf[JdbcIndexer])
private[index] val asyncTolerance = 30.seconds

def create(
actorSystem: ActorSystem,
readService: ReadService,
jdbcUrl: String): Future[JdbcIndexer] = {
def validateSchema(jdbcUrl: String)(
implicit x: Status =:= Uninitialized): JdbcIndexerFactory[Initialized] = {
FlywayMigrations(jdbcUrl).validate()
this.asInstanceOf[JdbcIndexerFactory[Initialized]]
}

def migrateSchema(jdbcUrl: String)(
implicit x: Status =:= Uninitialized): JdbcIndexerFactory[Initialized] = {
FlywayMigrations(jdbcUrl).migrate()
this.asInstanceOf[JdbcIndexerFactory[Initialized]]
}

def create(actorSystem: ActorSystem, readService: ReadService, jdbcUrl: String)(
implicit x: Status =:= Initialized): Future[JdbcIndexer] = {
val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
val metricsManager = MetricsManager(false)

Expand Down Expand Up @@ -82,11 +102,10 @@ object JdbcIndexer {
}

private def initializeDao(jdbcUrl: String, mm: MetricsManager) = {
val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
val dbType = DbType.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(
jdbcUrl,
dbType,
if (dbType.supportsParallelWrites) noOfShortLivedConnections else 1,
noOfStreamingConnections)
val ledgerDao = LedgerDao.metered(
Expand Down Expand Up @@ -130,7 +149,7 @@ object JdbcIndexer {
* @param beginAfterExternalOffset The last offset received from the read service.
* This offset has inclusive semantics,
*/
class JdbcIndexer private (
class JdbcIndexer private[index] (
initialInternalOffset: Long,
beginAfterExternalOffset: Option[LedgerString],
ledgerDao: LedgerDao)(implicit mat: Materializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object StandaloneIndexServer {
readService: ReadService,
writeService: WriteService): StandaloneIndexServer =
new StandaloneIndexServer(
"sandbox",
"index",
config,
readService,
writeService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.digitalasset.platform.index
import akka.actor.ActorSystem
import com.daml.ledger.participant.state.v1.ReadService
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
import com.digitalasset.platform.index.config.{Config, StartupMode}

import scala.concurrent.duration._

Expand All @@ -14,16 +15,32 @@ import scala.concurrent.duration._
object StandaloneIndexerServer {
private[this] val actorSystem = ActorSystem("StandaloneIndexerServer")

def apply(readService: ReadService, jdbcUrl: String): AutoCloseable = {
def apply(readService: ReadService, config: Config): AutoCloseable = {

val indexerFactory = JdbcIndexerFactory()
val indexer =
RecoveringIndexer(actorSystem.scheduler, 10.seconds, JdbcIndexer.asyncTolerance)

indexer.start(
() =>
JdbcIndexer
.create(actorSystem, readService, jdbcUrl)
.flatMap(_.subscribe(readService))(DEC))
RecoveringIndexer(actorSystem.scheduler, 10.seconds, indexerFactory.asyncTolerance)

config.startupMode match {
case StartupMode.MigrateOnly =>
indexerFactory.migrateSchema(config.jdbcUrl)

case StartupMode.MigrateAndStart =>
indexer.start { () =>
indexerFactory
.migrateSchema(config.jdbcUrl)
.create(actorSystem, readService, config.jdbcUrl)
.flatMap(_.subscribe(readService))(DEC)
}

case StartupMode.ValidateAndStart =>
indexer.start { () =>
indexerFactory
.validateSchema(config.jdbcUrl)
.create(actorSystem, readService, config.jdbcUrl)
.flatMap(_.subscribe(readService))(DEC)
}
}

indexer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ package com.digitalasset.platform.index.config

import java.io.File

import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.ledger.api.tls.TlsConfiguration

sealed trait StartupMode
object StartupMode {
case object ValidateAndStart extends StartupMode
case object MigrateAndStart extends StartupMode
case object MigrateOnly extends StartupMode
}

final case class Config(
port: Int,
portFile: Option[File],
Expand All @@ -17,8 +25,9 @@ final case class Config(
timeProvider: TimeProvider, // enables use of non-wall-clock time in tests
jdbcUrl: String,
tlsConfig: Option[TlsConfiguration],
participantId: LedgerString,
extraPartipants: Vector[(LedgerString, Int, String)]
participantId: ParticipantId,
extraPartipants: Vector[(ParticipantId, Int, String)],
startupMode: StartupMode
)

object Config {
Expand All @@ -33,6 +42,7 @@ object Config {
"",
None,
LedgerString.assertFromString("standalone-participant"),
Vector.empty
Vector.empty,
StartupMode.MigrateAndStart
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import com.digitalasset.platform.sandbox.banner.Banner
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.services.SandboxResetService
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.sandbox.stores.ledger._
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.sandbox.stores.{
InMemoryActiveContracts,
InMemoryPackageStore,
SandboxIndexAndWriteService
}
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.sandbox.stores.ledger._
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.server.services.testing.TimeServiceBackend
import com.digitalasset.platform.services.time.TimeProviderType
import org.slf4j.LoggerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.ledger.ReadOnlyLedger
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.{
DbType,
JdbcLedgerDao,
LedgerDao,
LedgerReadDao
Expand Down Expand Up @@ -39,9 +40,9 @@ object ReadOnlySqlLedger {
mm: MetricsManager): Future[ReadOnlyLedger] = {
implicit val ec: ExecutionContext = DEC

val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
val dbType = DbType.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
val ledgerReadDao = LedgerDao.meteredRead(
JdbcLedgerDao(
dbDispatcher,
Expand Down Expand Up @@ -90,9 +91,6 @@ private class ReadOnlySqlLedgerFactory(ledgerDao: LedgerReadDao) {
/** *
* Creates a DB backed Ledger implementation.
*
* @param initialLedgerId a random ledger id is generated if none given, if set it's used to initialize the ledger.
* In case the ledger had already been initialized, the given ledger id must not be set or must
* be equal to the one in the database.
* @return a compliant read-only Ledger implementation
*/
def createReadOnlySqlLedger(initialLedgerId: Option[LedgerId])(
Expand Down Expand Up @@ -147,6 +145,7 @@ private class ReadOnlySqlLedgerFactory(ledgerDao: LedgerReadDao) {
logger.info(s"Found existing ledger with id: ${foundLedgerId.unwrap}")
Future.successful(foundLedgerId)
}

}

private object ReadOnlySqlLedgerFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode.{
ContinueIfExists
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao._
import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ContractSerializer,
KeyHasher,
Expand Down Expand Up @@ -77,9 +78,12 @@ object SqlLedger {
mm: MetricsManager): Future[Ledger] = {
implicit val ec: ExecutionContext = DEC

val dbType = JdbcLedgerDao.jdbcType(jdbcUrl)
new FlywayMigrations(jdbcUrl).migrate()

val dbType = DbType.jdbcType(jdbcUrl)
val dbDispatcher =
DbDispatcher(jdbcUrl, dbType, noOfShortLivedConnections, noOfStreamingConnections)
DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)

val ledgerDao = LedgerDao.metered(
JdbcLedgerDao(
dbDispatcher,
Expand Down
Loading

0 comments on commit eefd481

Please sign in to comment.