Skip to content

Commit

Permalink
Indexer ValidateAndWaitOnly startup mode for canton participant HA (#…
Browse files Browse the repository at this point in the history
…10290)

* Indexer ValidateAndWaitOnly startup mode for canton participant HA

changelog_begin
changelog_end

* Review feedback
  • Loading branch information
oliverse-da authored Jul 26, 2021
1 parent ad13a86 commit 2094e24
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ object IndexerStartupMode {

case object MigrateOnly extends IndexerStartupMode

case object ValidateAndWaitOnly extends IndexerStartupMode

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ object JdbcIndexer {
resourceContext: ResourceContext
): Future[ResourceOwner[Indexer]] = initialized(resetSchema = true)

def validateAndWaitOnly()(implicit
resourceContext: ResourceContext
): Future[Unit] =
flywayMigrations
.validateAndWaitOnly(config.enableAppendOnlySchema)

private[this] def initializedMutatingSchema(
resetSchema: Boolean
)(implicit resourceContext: ResourceContext): Future[ResourceOwner[Indexer]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ final class StandaloneIndexerServer(
logger.debug("Waiting for the indexer to initialize the database.")
healthReporter
}
case IndexerStartupMode.ValidateAndWaitOnly =>
Resource
.fromFuture(indexerFactory.validateAndWaitOnly())
.map[ReportsHealth] { _ =>
logger.debug("Waiting for the indexer to validate the schema migrations.")
() => HealthStatus.healthy
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.flywaydb.core.Flyway
import org.flywaydb.core.api.MigrationVersion
import org.flywaydb.core.api.configuration.FluentConfiguration

import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

Expand Down Expand Up @@ -68,6 +69,49 @@ private[platform] class FlywayMigrations(jdbcUrl: String)(implicit loggingContex
}
}

def validateAndWaitOnly(
// TODO append-only: remove after removing support for the current (mutating) schema
enableAppendOnlySchema: Boolean = false
)(implicit resourceContext: ResourceContext): Future[Unit] =
dataSource.use { ds =>
val flyway = configurationBase(dbType, enableAppendOnlySchema)
.dataSource(ds)
.ignoreFutureMigrations(false)
.load()
logger.info("Running Flyway validation...")

@tailrec
def flywayMigrationDone(
retries: Int,
pendingMigrationsSoFar: Option[Int],
): Unit = {
val pendingMigrations = flyway.info().pending().length
if (pendingMigrations == 0) {
()
} else if (retries <= 0) {
throw ExhaustedRetries(pendingMigrations)
} else if (pendingMigrationsSoFar.exists(pendingMigrations >= _)) {
throw StoppedProgressing(pendingMigrations)
} else {
logger.debug(
s"Concurrent migration has reduced the pending migrations set to ${pendingMigrations}, waiting until pending set is empty.."
)
Thread.sleep(1000)
flywayMigrationDone(retries - 1, Some(pendingMigrations))
}
}

try {
flywayMigrationDone(10, None)
logger.info("Flyway schema validation finished successfully.")
Future.unit
} catch {
case ex: RuntimeException =>
logger.error(s"Failed to validate and wait only: ${ex.getMessage}", ex)
Future.failed(ex)
}
}

private def dataSource: ResourceOwner[HikariDataSource] =
HikariConnection.owner(
serverRole = ServerRole.IndexMigrations,
Expand Down Expand Up @@ -116,4 +160,9 @@ private[platform] object FlywayMigrations {
Flyway
.configure()
.locations(locations(enableAppendOnlySchema, dbType): _*)

case class ExhaustedRetries(pendingMigrations: Int)
extends RuntimeException(s"Ran out of retries with ${pendingMigrations} migrations remaining")
case class StoppedProgressing(pendingMigrations: Int)
extends RuntimeException(s"Stopped progressing with ${pendingMigrations} migrations")
}

0 comments on commit 2094e24

Please sign in to comment.