Skip to content

Commit

Permalink
Remove reset service support from the storage backend [DPP-804] (#12477)
Browse files Browse the repository at this point in the history
* Remove reset service support from the storage backend

CHANGELOG_BEGIN
CHANGELOG_END

* format it

* address review comments
  • Loading branch information
mziolekda authored Jan 19, 2022
1 parent f7e2faf commit 7c59728
Show file tree
Hide file tree
Showing 26 changed files with 16 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class IndexerBenchmark() {
val indexerFactory = new JdbcIndexer.Factory(
config.indexerConfig,
readService,
indexerEC,
metrics,
LfValueTranslationCache.Cache.none,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ object IndexerStartupMode {

case object MigrateAndStart extends IndexerStartupMode

case object ResetAndStart extends IndexerStartupMode

case object ValidateAndWaitOnly extends IndexerStartupMode

case object MigrateOnEmptySchemaAndStart extends IndexerStartupMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,26 @@ import com.daml.platform.store.DbType.{
}
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{
DataSourceStorageBackend,
ResetStorageBackend,
StorageBackendFactory,
}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.{DbType, LfValueTranslationCache}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Using
import scala.concurrent.Future

object JdbcIndexer {
private[daml] final class Factory(
config: IndexerConfig,
readService: state.ReadService,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
)(implicit materializer: Materializer) {

def initialized(
resetSchema: Boolean = false
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = {
def initialized()(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = {
val factory = StorageBackendFactory.of(DbType.jdbcType(config.jdbcUrl))
val dataSourceStorageBackend = factory.createDataSourceStorageBackend
val ingestionStorageBackend = factory.createIngestionStorageBackend
val parameterStorageBackend = factory.createParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val resetStorageBackend = factory.createResetStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
jdbcUrl = config.jdbcUrl,
Expand Down Expand Up @@ -104,24 +95,8 @@ object JdbcIndexer {
mat = materializer,
readService = readService,
)
if (resetSchema) {
reset(resetStorageBackend, dataSourceStorageBackend).flatMap(_ => indexer)
} else {
indexer
}
indexer
}

private def reset(
resetStorageBackend: ResetStorageBackend,
dataSourceStorageBackend: DataSourceStorageBackend,
)(implicit loggingContext: LoggingContext): ResourceOwner[Unit] =
ResourceOwner.forFuture(() =>
Future(
Using.resource(dataSourceStorageBackend.createDataSource(config.jdbcUrl).getConnection)(
resetStorageBackend.reset
)
)(servicesExecutionContext)
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.store.{FlywayMigrations, LfValueTranslationCache}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future

final class StandaloneIndexerServer(
readService: state.ReadService,
config: IndexerConfig,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
additionalMigrationPaths: Seq[String] = Seq.empty,
Expand All @@ -34,7 +33,6 @@ final class StandaloneIndexerServer(
val indexerFactory = new JdbcIndexer.Factory(
config,
readService,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
)
Expand All @@ -47,11 +45,10 @@ final class StandaloneIndexerServer(
def startIndexer(
migration: Future[Unit],
initializedDebugLogMessage: String = "Waiting for the indexer to initialize the database.",
resetSchema: Boolean = false,
): Resource[ReportsHealth] =
Resource
.fromFuture(migration)
.flatMap(_ => indexerFactory.initialized(resetSchema).acquire())
.flatMap(_ => indexerFactory.initialized().acquire())
.flatMap(indexer.start)
.map { case (healthReporter, _) =>
logger.debug(initializedDebugLogMessage)
Expand All @@ -64,12 +61,6 @@ final class StandaloneIndexerServer(
migration = flywayMigrations.migrate(config.allowExistingSchema)
)

case IndexerStartupMode.ResetAndStart =>
startIndexer(
migration = Future.unit,
resetSchema = true,
)

case IndexerStartupMode.ValidateAndStart =>
startIndexer(
migration = flywayMigrations.validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ private[platform] class FlywayMigrations(
)
}

def reset(): Future[Unit] = run { configBase =>
val flyway = configBase
.load()
logger.info("Running Flyway clean...")
flyway.clean()
logger.info("Flyway schema clean finished successfully.")
}

@nowarn("msg=method ignoreFutureMigrations .* is deprecated")
def validateAndWaitOnly(retries: Int, retryBackoff: FiniteDuration): Future[Unit] = runF {
configBase =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import com.daml.platform.store.backend.{
DeduplicationStorageBackend,
ParameterStorageBackend,
ReadStorageBackend,
ResetStorageBackend,
}
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.entries.{
Expand Down Expand Up @@ -74,7 +73,6 @@ private class JdbcLedgerDao(
readStorageBackend: ReadStorageBackend,
parameterStorageBackend: ParameterStorageBackend,
deduplicationStorageBackend: DeduplicationStorageBackend,
resetStorageBackend: ResetStorageBackend,
errorFactories: ErrorFactories,
materializer: Materializer,
) extends LedgerDao {
Expand Down Expand Up @@ -647,9 +645,6 @@ private class JdbcLedgerDao(
}(servicesExecutionContext)
}

override def reset()(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.truncateAllTables)(resetStorageBackend.reset)

private val translation: LfValueTranslation =
new LfValueTranslation(
cache = lfValueTranslationCache,
Expand Down Expand Up @@ -825,7 +820,6 @@ private[platform] object JdbcLedgerDao {
dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
dbSupport.storageBackendFactory.createParameterStorageBackend,
dbSupport.storageBackendFactory.createDeduplicationStorageBackend,
dbSupport.storageBackendFactory.createResetStorageBackend,
errorFactories,
materializer = materializer,
),
Expand Down Expand Up @@ -873,7 +867,6 @@ private[platform] object JdbcLedgerDao {
dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
dbSupport.storageBackendFactory.createParameterStorageBackend,
dbSupport.storageBackendFactory.createDeduplicationStorageBackend,
dbSupport.storageBackendFactory.createResetStorageBackend,
errorFactories,
materializer = materializer,
),
Expand Down Expand Up @@ -922,7 +915,6 @@ private[platform] object JdbcLedgerDao {
dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
dbSupport.storageBackendFactory.createParameterStorageBackend,
dbSupport.storageBackendFactory.createDeduplicationStorageBackend,
dbSupport.storageBackendFactory.createResetStorageBackend,
errorFactories,
materializer,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,6 @@ private[platform] trait LedgerWriteDao extends ReportsHealth {
optEntry: Option[PackageLedgerEntry],
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse]

/** Resets the platform into a state as it was never used before. Meant to be used solely for testing. */
def reset()(implicit loggingContext: LoggingContext): Future[Unit]

/** This is a combined store transaction method to support sandbox-classic and tests
* !!! Usage of this is discouraged, with the removal of sandbox-classic this will be removed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
)(implicit loggingContext: LoggingContext): Future[Unit] =
ledgerDao.initialize(ledgerId, participantId)

override def reset()(implicit loggingContext: LoggingContext): Future[Unit] =
ledgerDao.reset()

override def storePartyEntry(
offset: Offset,
partyEntry: PartyLedgerEntry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ import scala.util.Try

trait ResetStorageBackend {

/** Truncates all storage backend tables, EXCEPT the packages table.
* Does not touch other tables, like the Flyway history table.
* Reason: the reset() call is used by the ledger API reset service,
* which is mainly used for application tests in another big project,
* and re-uploading packages after each test significantly slows down their test time.
*/
def reset(connection: Connection): Unit

/** Truncates ALL storage backend tables.
* Does not touch other tables, like the Flyway history table.
* The result is a database that looks the same as a freshly created database with Flyway migrations applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,6 @@ import com.daml.platform.store.backend.ResetStorageBackend

object H2ResetStorageBackend extends ResetStorageBackend {

override def reset(connection: Connection): Unit = {
SQL("""set referential_integrity false;
|truncate table configuration_entries;
|truncate table package_entries;
|truncate table parameters;
|truncate table participant_command_completions;
|truncate table participant_command_submissions;
|truncate table participant_events_divulgence;
|truncate table participant_events_create;
|truncate table participant_events_consuming_exercise;
|truncate table participant_events_non_consuming_exercise;
|truncate table party_entries;
|truncate table string_interning;
|truncate table participant_events_create_filter;
|truncate table participant_users;
|truncate table participant_user_rights;
|set referential_integrity true;""".stripMargin)
.execute()(connection)
()
}

override def resetAll(connection: Connection): Unit = {
SQL("""set referential_integrity false;
|truncate table configuration_entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,6 @@ import anorm.SQL
import com.daml.platform.store.backend.ResetStorageBackend

object OracleResetStorageBackend extends ResetStorageBackend {
override def reset(connection: Connection): Unit =
List(
"truncate table configuration_entries cascade",
"truncate table package_entries cascade",
"truncate table parameters cascade",
"truncate table participant_command_completions cascade",
"truncate table participant_command_submissions cascade",
"truncate table participant_events_divulgence cascade",
"truncate table participant_events_create cascade",
"truncate table participant_events_consuming_exercise cascade",
"truncate table participant_events_non_consuming_exercise cascade",
"truncate table party_entries cascade",
"truncate table string_interning cascade",
"truncate table participant_events_create_filter cascade",
"truncate table participant_users cascade",
"truncate table participant_user_rights cascade",
).map(SQL(_)).foreach(_.execute()(connection))

override def resetAll(connection: Connection): Unit =
List(
"truncate table configuration_entries cascade",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,6 @@ import anorm.SQL
import com.daml.platform.store.backend.ResetStorageBackend

object PostgresResetStorageBackend extends ResetStorageBackend {
override def reset(connection: Connection): Unit = {
SQL("""truncate table configuration_entries cascade;
|truncate table package_entries cascade;
|truncate table parameters cascade;
|truncate table participant_command_completions cascade;
|truncate table participant_command_submissions cascade;
|truncate table participant_events_divulgence cascade;
|truncate table participant_events_create cascade;
|truncate table participant_events_consuming_exercise cascade;
|truncate table participant_events_non_consuming_exercise cascade;
|truncate table party_entries cascade;
|truncate table string_interning cascade;
|truncate table participant_events_create_filter cascade;
|truncate table participant_users cascade;
|truncate table participant_user_rights cascade;
|""".stripMargin)
.execute()(connection)
()
}

override def resetAll(connection: Connection): Unit = {
SQL("""truncate table configuration_entries cascade;
|truncate table packages cascade;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ object IndexerStabilityTestFixture {
indexing <- new StandaloneIndexerServer(
readService = readService,
config = indexerConfig,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
).acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,67 +44,6 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac
config shouldBe None
}

it should "reset everything except packages when using reset" in {
val dtos: Vector[DbDto] = Vector(
// 1: config change
dtoConfiguration(offset(1)),
// 2: party allocation
dtoPartyEntry(offset(2)),
// 3: package upload
dtoPackage(offset(3)),
dtoPackageEntry(offset(3)),
// 4: transaction with create node
dtoCreate(offset(4), 1L, "#4"),
DbDto.CreateFilter(1L, someTemplateId.toString, someParty.toString),
dtoCompletion(offset(4)),
// 5: transaction with exercise node and retroactive divulgence
dtoExercise(offset(5), 2L, true, "#4"),
dtoDivulgence(Some(offset(5)), 3L, "#4"),
dtoCompletion(offset(5)),
DbDto.StringInterningDto(2, "2"),
)

// Initialize and insert some data
executeSql(backend.parameter.initializeParameters(someIdentityParams))
executeSql(ingest(dtos, _))
executeSql(updateLedgerEnd(ledgerEnd(5, 3L)))

// Reset
executeSql(backend.reset.reset)

// Check the contents
val identity = executeSql(backend.parameter.ledgerIdentity)
val end = executeSql(backend.parameter.ledgerEnd)
val events = executeSql(backend.contract.contractStateEvents(0, Long.MaxValue))

// Check the contents (queries that don't read beyond ledger end)
advanceLedgerEndToMakeOldDataVisible()
val parties = executeSql(backend.party.knownParties)
val config = executeSql(backend.configuration.ledgerConfiguration)
val packages = executeSql(backend.packageBackend.lfPackages)
val stringInterningEntries = executeSql(
backend.stringInterning.loadStringInterningEntries(0, 1000)
)
val filterIds = executeSql(
backend.event.activeContractEventIds(
partyFilter = someParty,
templateIdFilter = None,
startExclusive = 0,
endInclusive = 1000,
limit = 1000,
)
)

identity shouldBe None
end shouldBe None
parties shouldBe empty
packages should not be empty // Note: reset() does not delete packages
events shouldBe empty
config shouldBe None
stringInterningEntries shouldBe empty
filterIds shouldBe empty
}

it should "reset everything when using resetAll" in {
val dtos: Vector[DbDto] = Vector(
// 1: config change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ final class Runner[T <: ReadWriteService, Extra](
indexerHealth <- new StandaloneIndexerServer(
readService = readService,
config = configProvider.indexerConfig(participantConfig, config),
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
Expand Down
Loading

0 comments on commit 7c59728

Please sign in to comment.