diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index 8e5b02e75514..c1bacd833a88 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -515,6 +515,11 @@ final class Metrics(val registry: MetricRegistry) { "loadStringInterningEntries" ) + val meteringAggregator: DatabaseMetrics = createDbMetrics("metering_aggregator") + val initializeMeteringAggregator: DatabaseMetrics = createDbMetrics( + "initialize_metering_aggregator" + ) + object translation { private val Prefix: MetricName = db.Prefix :+ "translation" val cache = new CacheMetrics(registry, Prefix :+ "cache") diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index 51da90ee89e5..72ab29514e70 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -6b18e00f6dc6d79123a115a080b4092fc94350cf582f9373e122837f7ef741e7 +40d6f8974169a2d8702b31a9edbe2e22527aa837f0f4057922814631f40510ef diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 6e4020219dcf..6840987a2466 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -538,3 +538,18 @@ CREATE TABLE transaction_metering ( ); CREATE INDEX transaction_metering_ledger_offset ON transaction_metering(ledger_offset); + +CREATE TABLE metering_parameters ( + ledger_metering_end VARCHAR, + ledger_metering_timestamp BIGINT NOT NULL +); + +CREATE TABLE participant_metering ( + application_id VARCHAR NOT NULL, + from_timestamp BIGINT NOT NULL, + to_timestamp BIGINT NOT NULL, + action_count INTEGER NOT NULL, + ledger_offset VARCHAR NOT NULL +); + +CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256 new file mode 100644 index 000000000000..fbe5d8a4d66b --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256 @@ -0,0 +1 @@ +45e3500305c86bf7928f78ba6b97572cf8f900e18805e181b44d7837fe0c3c09 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql new file mode 100644 index 000000000000..300dc5bbb339 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql @@ -0,0 +1,24 @@ +-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Transaction metering alterations +ALTER TABLE transaction_metering MODIFY application_id VARCHAR2(4000); + +-- Create metering parameters +CREATE TABLE metering_parameters ( + ledger_metering_end VARCHAR2(4000), + ledger_metering_timestamp NUMBER NOT NULL +); + +-- Create participant metering +CREATE TABLE participant_metering ( + application_id VARCHAR2(4000) NOT NULL, + from_timestamp NUMBER NOT NULL, + to_timestamp NUMBER NOT NULL, + action_count NUMBER NOT NULL, + ledger_offset VARCHAR2(4000) NOT NULL +); + +CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id); + + diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256 new file mode 100644 index 000000000000..5ed53cfcafb1 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256 @@ -0,0 +1 @@ +08ea4e020bc4ed309d0345d1d0b9e4cbe4f25edc79d3415a05538cacdedb1470 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql new file mode 100644 index 000000000000..c57f5077ead3 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql @@ -0,0 +1,35 @@ + +-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Transaction metering alterations +SELECT 'Harmonise the transaction_metering columns with other examples in the db'; + +ALTER TABLE transaction_metering ALTER application_id TYPE TEXT; +ALTER TABLE transaction_metering ALTER ledger_offset TYPE TEXT; + +-- Parameter alterations +SELECT 'Add Metering Parameters: Creating table...'; + +-- Create metering parameters +CREATE TABLE metering_parameters ( + ledger_metering_end TEXT, + ledger_metering_timestamp BIGINT NOT NULL +); + +-- Create participant metering +SELECT 'Add Participant Metering: Creating table...'; + +CREATE TABLE participant_metering ( + application_id TEXT NOT NULL, + from_timestamp BIGINT NOT NULL, + to_timestamp BIGINT NOT NULL, + action_count INTEGER NOT NULL, + ledger_offset TEXT NOT NULL +); + +SELECT 'Add Participant Metering: Creating indexes...'; + +CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id); + +SELECT 'Add Participant Metering: Done.'; diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala index 0445d0ee6964..1f8362f701ee 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala @@ -38,7 +38,9 @@ object JdbcIndexer { val factory = StorageBackendFactory.of(DbType.jdbcType(config.jdbcUrl)) val dataSourceStorageBackend = factory.createDataSourceStorageBackend val ingestionStorageBackend = factory.createIngestionStorageBackend + val meteringStoreBackend = factory.createMeteringStorageWriteBackend val parameterStorageBackend = factory.createParameterStorageBackend + val meteringParameterStorageBackend = factory.createMeteringParameterStorageBackend val DBLockStorageBackend = factory.createDBLockStorageBackend val stringInterningStorageBackend = factory.createStringInterningStorageBackend val indexer = ParallelIndexerFactory( @@ -92,9 +94,16 @@ object JdbcIndexer { metrics = metrics, ), stringInterningStorageBackend = stringInterningStorageBackend, + meteringAggregator = new MeteringAggregator.Owner( + meteringStore = meteringStoreBackend, + meteringParameterStore = meteringParameterStorageBackend, + parameterStore = parameterStorageBackend, + metrics = metrics, + ).apply, mat = materializer, readService = readService, ) + indexer } diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala new file mode 100644 index 000000000000..20291eb042b3 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala @@ -0,0 +1,204 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer + +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore +import com.daml.ledger.participant.state.index.v2.MeteringStore.ParticipantMetering +import com.daml.ledger.resources.ResourceOwner +import com.daml.lf.data.Time.Timestamp +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.metrics.Metrics +import com.daml.platform.indexer.MeteringAggregator.{toOffsetDateTime, toTimestamp} +import com.daml.platform.store.appendonlydao.SqlExecutor +import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd +import com.daml.platform.store.backend.{ + MeteringParameterStorageBackend, + MeteringStorageWriteBackend, + ParameterStorageBackend, +} + +import java.sql.Connection +import java.time.temporal.ChronoUnit +import java.time.{OffsetDateTime, ZoneOffset} +import java.util.{Timer, TimerTask} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +object MeteringAggregator { + + private val logger = ContextualizedLogger.get(getClass) + + class Owner( + meteringStore: MeteringStorageWriteBackend, + parameterStore: ParameterStorageBackend, + meteringParameterStore: MeteringParameterStorageBackend, + metrics: Metrics, + period: FiniteDuration = 6.minutes, + maxTaskDuration: FiniteDuration = 6.hours, + ) { + + private[platform] def apply( + sqlExecutor: SqlExecutor + )(implicit loggingContext: LoggingContext): ResourceOwner[Unit] = { + val aggregator = new MeteringAggregator( + meteringStore, + parameterStore, + meteringParameterStore, + metrics, + sqlExecutor, + ) + for { + _ <- ResourceOwner.forFuture(() => aggregator.initialize()) + _ <- ResourceOwner.forTimer(() => new Timer()).map { timer => + timer.scheduleAtFixedRate( + new TimerTask { + override def run(): Unit = { + Try { + Await.ready(aggregator.run(), maxTaskDuration) + } match { + case Success(_) => () + case Failure(e) => + logger.error(s"Metering not aggregated after ${maxTaskDuration}", e) + } + } + }, + period.toMillis, + period.toMillis, + ) + } + } yield () + } + } + + private def toTimestamp(dateTime: OffsetDateTime): Timestamp = + Timestamp.assertFromInstant(dateTime.toInstant) + private def toOffsetDateTime(timestamp: Timestamp): OffsetDateTime = + OffsetDateTime.ofInstant(timestamp.toInstant, ZoneOffset.UTC) + +} + +class MeteringAggregator( + meteringStore: MeteringStorageWriteBackend, + parameterStore: ParameterStorageBackend, + meteringParameterStore: MeteringParameterStorageBackend, + metrics: Metrics, + sqlExecutor: SqlExecutor, + clock: () => Timestamp = () => Timestamp.now(), +)(implicit loggingContext: LoggingContext) { + + private val parasitic: ExecutionContext = ExecutionContext.parasitic + + private val logger = ContextualizedLogger.get(getClass) + + private[platform] def initialize(): Future[Unit] = { + val initTimestamp = toOffsetDateTime(clock()).truncatedTo(ChronoUnit.HOURS).minusHours(1) + val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, toTimestamp(initTimestamp)) + sqlExecutor.executeSql(metrics.daml.index.db.initializeMeteringAggregator) { + meteringParameterStore.initializeLedgerMeteringEnd(initLedgerMeteringEnd) + } + } + + private[platform] def run(): Future[Unit] = { + + val future = sqlExecutor.executeSql(metrics.daml.index.db.meteringAggregator) { conn => + val nowUtcTime = toOffsetDateTime(clock()) + val lastLedgerMeteringEnd = getLedgerMeteringEnd(conn) + val startUtcTime: OffsetDateTime = toOffsetDateTime(lastLedgerMeteringEnd.timestamp) + val endUtcTime = startUtcTime.plusHours(1) + + if (nowUtcTime.isAfter(endUtcTime)) { + + val toEndTime = toTimestamp(endUtcTime) + val ingestedLedgerEnd = parameterStore.ledgerEnd(conn).lastOffset + val maybeMaxOffset = + meteringStore.transactionMeteringMaxOffset(lastLedgerMeteringEnd.offset, toEndTime)(conn) + + val ( + periodIngested, // This is true if the time period is closed fully ingested + hasMetering, // This is true if there are transaction_metering records to aggregate + toOffsetEnd, // This is the 'to' offset for the period being aggregated + ) = maybeMaxOffset match { + case Some(offset) => (offset <= ingestedLedgerEnd, true, offset) + case None => (true, false, lastLedgerMeteringEnd.offset) + } + + if (periodIngested) { + Some( + aggregate( + conn = conn, + lastLedgerMeteringEnd = lastLedgerMeteringEnd, + thisLedgerMeteringEnd = LedgerMeteringEnd(toOffsetEnd, toEndTime), + hasMetering = hasMetering, + ) + ) + } else { + logger.info("Not all transaction metering for aggregation time period is yet ingested") + None + } + } else { + None + } + } + + future.onComplete({ + case Success(None) => logger.info("No transaction metering aggregation required") + case Success(Some(lme)) => + logger.info(s"Aggregating transaction metering completed up to $lme") + case Failure(e) => logger.error("Failed to aggregate transaction metering", e) + })(parasitic) + + future.map(_ => ())(parasitic) + } + + private def aggregate( + conn: Connection, + lastLedgerMeteringEnd: LedgerMeteringEnd, + thisLedgerMeteringEnd: LedgerMeteringEnd, + hasMetering: Boolean, + ): LedgerMeteringEnd = { + logger.info(s"Aggregating transaction metering for $thisLedgerMeteringEnd") + + if (hasMetering) { + populateParticipantMetering(conn, lastLedgerMeteringEnd, thisLedgerMeteringEnd) + } + + meteringParameterStore.updateLedgerMeteringEnd(thisLedgerMeteringEnd)(conn) + + thisLedgerMeteringEnd + } + + private def getLedgerMeteringEnd(conn: Connection): LedgerMeteringEnd = + meteringParameterStore.ledgerMeteringEnd(conn).getOrElse { + throw new IllegalStateException("Ledger metering is not initialized") + } + + private def populateParticipantMetering( + conn: Connection, + lastLedgerMeteringEnd: LedgerMeteringEnd, + thisLedgerMeteringEnd: LedgerMeteringEnd, + ): Unit = { + + val transactionMetering: Seq[MeteringStore.TransactionMetering] = + meteringStore.transactionMetering(lastLedgerMeteringEnd.offset, thisLedgerMeteringEnd.offset)( + conn + ) + + val participantMetering = transactionMetering + .groupBy(_.applicationId) + .map { case (applicationId, metering) => + ParticipantMetering( + applicationId = applicationId, + from = lastLedgerMeteringEnd.timestamp, + to = thisLedgerMeteringEnd.timestamp, + actionCount = metering.map(_.actionCount).sum, + ledgerOffset = metering.map(_.ledgerOffset).max, + ) + } + .toVector + + meteringStore.insertParticipantMetering(participantMetering)(conn) + } +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala index b689ccf8976e..c75cf6f4d428 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala @@ -3,18 +3,15 @@ package com.daml.platform.indexer.parallel -import java.util.Timer -import java.util.concurrent.Executors - import akka.stream.{KillSwitch, Materializer} import com.daml.ledger.participant.state.v2.ReadService import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics import com.daml.platform.configuration.ServerRole +import com.daml.platform.indexer.Indexer import com.daml.platform.indexer.ha.{HaConfig, HaCoordinator, Handle, NoopHaCoordinator} import com.daml.platform.indexer.parallel.AsyncSupport._ -import com.daml.platform.indexer.Indexer import com.daml.platform.store.appendonlydao.DbDispatcher import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig import com.daml.platform.store.backend.{ @@ -25,10 +22,12 @@ import com.daml.platform.store.backend.{ import com.daml.platform.store.interning.StringInterningView import com.google.common.util.concurrent.ThreadFactoryBuilder +import java.util.Timer +import java.util.concurrent.Executors import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} import scala.util.control.NonFatal +import scala.util.{Failure, Success} object ParallelIndexerFactory { @@ -45,6 +44,7 @@ object ParallelIndexerFactory { initializeParallelIngestion: InitializeParallelIngestion, parallelIndexerSubscription: ParallelIndexerSubscription[_], stringInterningStorageBackend: StringInterningStorageBackend, + meteringAggregator: DbDispatcher => ResourceOwner[Unit], mat: Materializer, readService: ReadService, )(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = @@ -95,24 +95,27 @@ object ParallelIndexerFactory { implicit val ec: ExecutionContext = resourceContext.executionContext haCoordinator.protectedExecution(connectionInitializer => initializeHandle( - DbDispatcher - .owner( - // this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion - // therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator - dataSource = dataSourceStorageBackend.createDataSource( - jdbcUrl = jdbcUrl, - dataSourceConfig = dataSourceConfig, - connectionInitHook = Some(connectionInitializer.initialize), - ), - serverRole = ServerRole.Indexer, - connectionPoolSize = - ingestionParallelism + 1, // + 1 for the tailing ledger_end updates - connectionTimeout = FiniteDuration( - 250, - "millis", - ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc) - metrics = metrics, - ) + for { + dbDispatcher <- DbDispatcher + .owner( + // this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion + // therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator + dataSource = dataSourceStorageBackend.createDataSource( + jdbcUrl = jdbcUrl, + dataSourceConfig = dataSourceConfig, + connectionInitHook = Some(connectionInitializer.initialize), + ), + serverRole = ServerRole.Indexer, + connectionPoolSize = + ingestionParallelism + 1, // + 1 for the tailing ledger_end updates + connectionTimeout = FiniteDuration( + 250, + "millis", + ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc) + metrics = metrics, + ) + _ <- meteringAggregator(dbDispatcher) + } yield dbDispatcher ) { dbDispatcher => val stringInterningView = new StringInterningView( loadPrefixedEntries = (fromExclusive, toInclusive) => diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala index 8e08ed34bb1e..c3bea1a001ab 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala @@ -8,6 +8,7 @@ import anorm._ import com.daml.ledger.offset.Offset import com.daml.lf.crypto.Hash import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp import com.daml.lf.ledger.EventId import com.daml.lf.value.Value import spray.json.DefaultJsonProtocol._ @@ -354,6 +355,11 @@ private[platform] object Conversions { def offset(name: String): RowParser[Offset] = SqlParser.get[String](name).map(v => Offset.fromHexString(Ref.HexString.assertFromString(v))) + def offset(position: Int): RowParser[Offset] = + SqlParser + .get[String](position) + .map(v => Offset.fromHexString(Ref.HexString.assertFromString(v))) + implicit val columnToOffset: Column[Offset] = Column.nonNull((value: Any, meta) => Column @@ -363,9 +369,17 @@ private[platform] object Conversions { // Timestamp + implicit object TimestampToStatement extends ToStatement[Timestamp] { + override def set(s: PreparedStatement, index: Int, v: Timestamp): Unit = + s.setLong(index, v.micros) + } + def timestampFromMicros(name: String): RowParser[com.daml.lf.data.Time.Timestamp] = SqlParser.get[Long](name).map(com.daml.lf.data.Time.Timestamp.assertFromLong) + def timestampFromMicros(position: Int): RowParser[com.daml.lf.data.Time.Timestamp] = + SqlParser.get[Long](position).map(com.daml.lf.data.Time.Timestamp.assertFromLong) + // Hash implicit object HashToStatement extends ToStatement[Hash] { diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala index d1389d37b891..4b744d90a1ac 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala @@ -26,7 +26,8 @@ private[platform] final class DbDispatcher private ( overallWaitTimer: Timer, overallExecutionTimer: Timer, )(implicit loggingContext: LoggingContext) - extends ReportsHealth { + extends SqlExecutor + with ReportsHealth { private val logger = ContextualizedLogger.get(this.getClass) private val executionContext = ExecutionContext.fromExecutor( diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala new file mode 100644 index 000000000000..97f1107ab79a --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala @@ -0,0 +1,16 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.appendonlydao + +import com.daml.logging.LoggingContext +import com.daml.metrics.DatabaseMetrics + +import java.sql.Connection +import scala.concurrent.Future + +private[platform] trait SqlExecutor { + def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit + loggingContext: LoggingContext + ): Future[T] +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 004fc8e013a9..89e7788abc8b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -7,7 +7,10 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails, User, import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering +import com.daml.ledger.participant.state.index.v2.MeteringStore.{ + ParticipantMetering, + TransactionMetering, +} import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref import com.daml.lf.data.Ref.{ApplicationId, UserId} @@ -18,6 +21,7 @@ import com.daml.platform import com.daml.platform.store.EventSequentialId import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} +import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState @@ -93,10 +97,13 @@ trait ParameterStorageBackend { /** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations */ def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit + def prunedUpToInclusive(connection: Connection): Option[Offset] + def updatePrunedAllDivulgedContractsUpToInclusive( prunedUpToInclusive: Offset )(connection: Connection): Unit + def participantAllDivulgedContractsPrunedUpToInclusive( connection: Connection ): Option[Offset] @@ -106,9 +113,9 @@ trait ParameterStorageBackend { * - If no identity parameters are stored, then they are set to the given value. * - If identity parameters are stored, then they are compared to the given ones. * - Ledger identity parameters are written at most once, and are never overwritten. - * No significant CPU load, mostly blocking JDBC communication with the database backend. + * No significant CPU load, mostly blocking JDBC communication with the database backend. * - * This method is NOT safe to call concurrently. + * This method is NOT safe to call concurrently. */ def initializeParameters(params: ParameterStorageBackend.IdentityParams)(connection: Connection)( implicit loggingContext: LoggingContext @@ -118,6 +125,24 @@ trait ParameterStorageBackend { def ledgerIdentity(connection: Connection): Option[ParameterStorageBackend.IdentityParams] } +object MeteringParameterStorageBackend { + case class LedgerMeteringEnd(offset: Offset, timestamp: Timestamp) +} + +trait MeteringParameterStorageBackend { + + /** Initialize the ledger metering end parameters if unset */ + def initializeLedgerMeteringEnd(init: LedgerMeteringEnd)(connection: Connection)(implicit + loggingContext: LoggingContext + ): Unit + + /** The timestamp and offset for which billable metering is available */ + def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd] + + /** Update the timestamp and offset for which billable metering is available */ + def updateLedgerMeteringEnd(ledgerMeteringEnd: LedgerMeteringEnd)(connection: Connection): Unit +} + object ParameterStorageBackend { case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int) { def lastOffsetOption: Option[Offset] = @@ -128,6 +153,7 @@ object ParameterStorageBackend { ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0) } case class IdentityParams(ledgerId: LedgerId, participantId: ParticipantId) + } trait ConfigurationStorageBackend { @@ -424,7 +450,7 @@ object UserManagementStorageBackend { case class DbUserRight(domainRight: UserRight, grantedAt: Long) } -trait MeteringStorageBackend { +trait MeteringStorageReadBackend { def transactionMetering( from: Timestamp, @@ -432,3 +458,30 @@ trait MeteringStorageBackend { applicationId: Option[ApplicationId], )(connection: Connection): Vector[TransactionMetering] } + +trait MeteringStorageWriteBackend { + + /** This method will return the maximum offset of the transaction_metering record + * which has an offset greater than the from offset and a timestamp prior to the + * to timestamp, if any. + * + * Note that the offset returned may not have been fully ingested. This is to allow the metering to wait if there + * are still un-fully ingested records withing the time window. + */ + def transactionMeteringMaxOffset(from: Offset, to: Timestamp)( + connection: Connection + ): Option[Offset] + + /** This method will return all transaction metering records between the from offset (exclusive) + * and the to offset (inclusive). + */ + def transactionMetering(from: Offset, to: Offset)( + connection: Connection + ): Vector[TransactionMetering] + + def insertParticipantMetering(metering: Vector[ParticipantMetering])(connection: Connection): Unit + + /** Test Only - will be removed once reporting can be based if participant metering */ + def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering] + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala index d8dbf2c86f6d..59365f9f5cde 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala @@ -13,6 +13,7 @@ import com.daml.platform.store.interning.StringInterning trait StorageBackendFactory { def createIngestionStorageBackend: IngestionStorageBackend[_] def createParameterStorageBackend: ParameterStorageBackend + def createMeteringParameterStorageBackend: MeteringParameterStorageBackend def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend @@ -31,7 +32,8 @@ trait StorageBackendFactory { def createResetStorageBackend: ResetStorageBackend def createStringInterningStorageBackend: StringInterningStorageBackend def createUserManagementStorageBackend: UserManagementStorageBackend - def createMeteringStorageBackend(ledgerEndCache: LedgerEndCache): MeteringStorageBackend + def createMeteringStorageReadBackend(ledgerEndCache: LedgerEndCache): MeteringStorageReadBackend + def createMeteringStorageWriteBackend: MeteringStorageWriteBackend final def readStorageBackend( ledgerEndCache: LedgerEndCache, @@ -44,7 +46,7 @@ trait StorageBackendFactory { completionStorageBackend = createCompletionStorageBackend(stringInterning), contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning), eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning), - meteringStorageBackend = createMeteringStorageBackend(ledgerEndCache), + meteringStorageBackend = createMeteringStorageReadBackend(ledgerEndCache), ) } @@ -64,5 +66,5 @@ case class ReadStorageBackend( completionStorageBackend: CompletionStorageBackend, contractStorageBackend: ContractStorageBackend, eventStorageBackend: EventStorageBackend, - meteringStorageBackend: MeteringStorageBackend, + meteringStorageBackend: MeteringStorageReadBackend, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala index 5d9653d903dc..95cb6a919fb0 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala @@ -14,6 +14,9 @@ trait CommonStorageBackendFactory extends StorageBackendFactory { override val createParameterStorageBackend: ParameterStorageBackend = ParameterStorageBackendTemplate + override val createMeteringParameterStorageBackend: MeteringParameterStorageBackend = + MeteringParameterStorageBackendTemplate + override def createConfigurationStorageBackend( ledgerEndCache: LedgerEndCache ): ConfigurationStorageBackend = @@ -28,8 +31,13 @@ trait CommonStorageBackendFactory extends StorageBackendFactory { override val createUserManagementStorageBackend: UserManagementStorageBackend = UserManagementStorageBackendTemplate - override def createMeteringStorageBackend( + override def createMeteringStorageReadBackend( ledgerEndCache: LedgerEndCache - ): MeteringStorageBackend = - new MeteringStorageBackendTemplate(ledgerEndCache) + ): MeteringStorageReadBackend = + new MeteringStorageBackendReadTemplate(ledgerEndCache) + + def createMeteringStorageWriteBackend: MeteringStorageWriteBackend = { + MeteringStorageBackendWriteTemplate + } + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala new file mode 100644 index 000000000000..8cfe9050e8cb --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala @@ -0,0 +1,76 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com +package daml.platform.store.backend.common + +import daml.ledger.offset.Offset +import daml.logging.{ContextualizedLogger, LoggingContext} +import daml.platform.store.Conversions.{offset, timestampFromMicros} +import daml.platform.store.backend.MeteringParameterStorageBackend +import daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation +import daml.scalautil.Statement.discard +import anorm.{RowParser, ~} +import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd + +import java.sql.Connection + +private[backend] object MeteringParameterStorageBackendTemplate + extends MeteringParameterStorageBackend { + + private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) + + def initializeLedgerMeteringEnd( + init: LedgerMeteringEnd + )(connection: Connection)(implicit loggingContext: LoggingContext): Unit = { + import com.daml.platform.store.Conversions.OffsetToStatement + import com.daml.platform.store.Conversions.TimestampToStatement + ledgerMeteringEnd(connection) match { + case None => + logger.info(s"Initializing ledger metering end to $init") + discard( + SQL"""insert into metering_parameters( + ledger_metering_end, + ledger_metering_timestamp + ) values ( + ${init.offset}, + ${init.timestamp} + )""" + .execute()(connection) + ) + case Some(existing) => + logger.info(s"Found existing ledger metering end $existing") + } + } + + def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd] = { + + val LedgerMeteringEndParser: RowParser[LedgerMeteringEnd] = ( + offset("ledger_metering_end").?.map(_.getOrElse(Offset.beforeBegin)) ~ + timestampFromMicros("ledger_metering_timestamp") + ) map { case ledgerMeteringEnd ~ ledgerMeteringTimestamp => + LedgerMeteringEnd(ledgerMeteringEnd, ledgerMeteringTimestamp) + } + + SQL"""SELECT ledger_metering_end, ledger_metering_timestamp FROM metering_parameters""" + .as(LedgerMeteringEndParser.singleOpt)(connection) + + } + + def updateLedgerMeteringEnd( + ledgerMeteringEnd: LedgerMeteringEnd + )(connection: Connection): Unit = { + import com.daml.platform.store.Conversions.OffsetToStatement + import com.daml.platform.store.Conversions.TimestampToStatement + SQL""" + UPDATE + metering_parameters + SET + ledger_metering_end = ${ledgerMeteringEnd.offset}, + ledger_metering_timestamp = ${ledgerMeteringEnd.timestamp} + """ + .execute()(connection) + () + } + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala index b12e31cb98ce..a11c9473b03f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala @@ -5,21 +5,26 @@ package com.daml.platform.store.backend.common import anorm.SqlParser.int import anorm.{RowParser, ~} +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.{ + ParticipantMetering, + TransactionMetering, +} import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time +import com.daml.lf.data.Time.Timestamp import com.daml.platform.store.Conversions.{applicationId, offset, timestampFromMicros} import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf -import com.daml.platform.store.backend.MeteringStorageBackend -import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation +import com.daml.platform.store.backend.common.MeteringStorageBackendTemplate.transactionMeteringParser +import com.daml.platform.store.backend.{MeteringStorageReadBackend, MeteringStorageWriteBackend} import com.daml.platform.store.cache.LedgerEndCache import java.sql.Connection -private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndCache) - extends MeteringStorageBackend { +private[backend] object MeteringStorageBackendTemplate { - private val transactionMeteringParser: RowParser[TransactionMetering] = { + val transactionMeteringParser: RowParser[TransactionMetering] = { ( applicationId("application_id") ~ int("action_count") ~ @@ -31,15 +36,20 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC meteringTimestamp ~ ledgerOffset => TransactionMetering( - applicationId, - actionCount, - meteringTimestamp, - ledgerOffset, + applicationId = applicationId, + actionCount = actionCount, + meteringTimestamp = meteringTimestamp, + ledgerOffset = ledgerOffset, ) } } +} + +private[backend] class MeteringStorageBackendReadTemplate(ledgerEndCache: LedgerEndCache) + extends MeteringStorageReadBackend { + override def transactionMetering( from: Time.Timestamp, to: Option[Time.Timestamp], @@ -66,3 +76,95 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC private def isSet(o: Option[_]): Int = o.fold(0)(_ => 1) } +private[backend] object MeteringStorageBackendWriteTemplate extends MeteringStorageWriteBackend { + + val participantMeteringParser: RowParser[ParticipantMetering] = { + ( + applicationId("application_id") ~ + timestampFromMicros("from_timestamp") ~ + timestampFromMicros("to_timestamp") ~ + int("action_count") ~ + offset("ledger_offset") + ).map { + case applicationId ~ + from ~ + to ~ + actionCount ~ + ledgerOffset => + ParticipantMetering( + applicationId, + from, + to, + actionCount, + ledgerOffset, + ) + } + + } + + def transactionMeteringMaxOffset(from: Offset, to: Timestamp)( + connection: Connection + ): Option[Offset] = { + + import com.daml.platform.store.Conversions.OffsetToStatement + import com.daml.platform.store.Conversions.TimestampToStatement + + SQL""" + select max(ledger_offset) + from transaction_metering + where ledger_offset > $from + and metering_timestamp < $to + """ + .as(offset(1).?.single)(connection) + } + + def transactionMetering(from: Offset, to: Offset)( + connection: Connection + ): Vector[TransactionMetering] = { + + import com.daml.platform.store.Conversions.OffsetToStatement + + SQL""" + select + application_id, + action_count, + metering_timestamp, + ledger_offset + from transaction_metering + where ledger_offset > $from + and ledger_offset <= $to + """ + .asVectorOf(transactionMeteringParser)(connection) + } + + def insertParticipantMetering(metering: Vector[ParticipantMetering])( + connection: Connection + ): Unit = { + + import com.daml.platform.store.Conversions.OffsetToStatement + import com.daml.platform.store.Conversions.TimestampToStatement + + metering.foreach { participantMetering => + import participantMetering._ + SQL""" + insert into participant_metering(application_id, from_timestamp, to_timestamp, action_count, ledger_offset) + values (${participantMetering.applicationId.toString}, $from, $to, $actionCount, $ledgerOffset) + """.execute()(connection) + } + + } + + def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering] = { + SQL""" + select + application_id, + from_timestamp, + to_timestamp, + action_count, + ledger_offset + from participant_metering + """ + .asVectorOf(participantMeteringParser)(connection) + } + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala index a2f95886351f..db1f26159410 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala @@ -3,21 +3,21 @@ package com.daml.platform.store.backend.common -import java.sql.Connection - import anorm.SqlParser.{int, long} import anorm.{RowParser, ~} import com.daml.ledger.api.domain.{LedgerId, ParticipantId} import com.daml.ledger.offset.Offset -import com.daml.platform.store.Conversions.{ledgerString, offset} import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.platform.common.MismatchException import com.daml.platform.store.Conversions +import com.daml.platform.store.Conversions.{ledgerString, offset} import com.daml.platform.store.backend.ParameterStorageBackend import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation import com.daml.scalautil.Statement.discard import scalaz.syntax.tag._ +import java.sql.Connection + private[backend] object ParameterStorageBackendTemplate extends ParameterStorageBackend { private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) @@ -193,4 +193,5 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage connection ) } + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala index ac2958417826..bf930aa9081f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala @@ -3,24 +3,8 @@ package com.daml.platform.store.backend.postgresql -import com.daml.platform.store.backend.common.{ - CommonStorageBackendFactory, - CompletionStorageBackendTemplate, - ContractStorageBackendTemplate, - IngestionStorageBackendTemplate, - PartyStorageBackendTemplate, -} -import com.daml.platform.store.backend.{ - CompletionStorageBackend, - ContractStorageBackend, - DBLockStorageBackend, - DataSourceStorageBackend, - EventStorageBackend, - IngestionStorageBackend, - PartyStorageBackend, - ResetStorageBackend, - StorageBackendFactory, -} +import com.daml.platform.store.backend.common._ +import com.daml.platform.store.backend._ import com.daml.platform.store.cache.LedgerEndCache import com.daml.platform.store.interning.StringInterning @@ -59,4 +43,5 @@ object PostgresStorageBackendFactory override val createResetStorageBackend: ResetStorageBackend = PostgresResetStorageBackend + } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala index 8077c154adad..17d8be294901 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala @@ -3,8 +3,6 @@ package com.daml.platform.store.backend -import java.sql.Connection - import com.daml.ledger.offset.Offset import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.h2.H2StorageBackendFactory @@ -16,6 +14,8 @@ import com.daml.testing.oracle.OracleAroundAll import com.daml.testing.postgresql.PostgresAroundAll import org.scalatest.Suite +import java.sql.Connection + /** Creates a database and a [[TestBackend]]. * Used by [[StorageBackendSpec]] to run all StorageBackend tests on different databases. */ @@ -76,6 +76,7 @@ private[backend] trait StorageBackendProviderOracle case class TestBackend( ingestion: IngestionStorageBackend[_], parameter: ParameterStorageBackend, + meteringParameter: MeteringParameterStorageBackend, configuration: ConfigurationStorageBackend, party: PartyStorageBackend, packageBackend: PackageStorageBackend, @@ -90,16 +91,30 @@ case class TestBackend( ledgerEndCache: MutableLedgerEndCache, stringInterningSupport: MockStringInterning, userManagement: UserManagementStorageBackend, - metering: MeteringStorageBackend, + metering: TestMeteringBackend, +) + +case class TestMeteringBackend( + read: MeteringStorageReadBackend, + write: MeteringStorageWriteBackend, ) object TestBackend { def apply(storageBackendFactory: StorageBackendFactory): TestBackend = { val ledgerEndCache = MutableLedgerEndCache() val stringInterning = new MockStringInterning + + def createTestMeteringBackend: TestMeteringBackend = { + TestMeteringBackend( + read = storageBackendFactory.createMeteringStorageReadBackend(ledgerEndCache), + write = storageBackendFactory.createMeteringStorageWriteBackend, + ) + } + TestBackend( ingestion = storageBackendFactory.createIngestionStorageBackend, parameter = storageBackendFactory.createParameterStorageBackend, + meteringParameter = storageBackendFactory.createMeteringParameterStorageBackend, configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache), party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache), packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache), @@ -115,7 +130,8 @@ object TestBackend { ledgerEndCache = ledgerEndCache, stringInterningSupport = stringInterning, userManagement = storageBackendFactory.createUserManagementStorageBackend, - metering = storageBackendFactory.createMeteringStorageBackend(ledgerEndCache), + metering = createTestMeteringBackend, ) } + } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala index 768d74db65bc..e81a4a283b8b 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala @@ -119,7 +119,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion ) ) - val metering1 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) + val metering1 = + executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None)) // Restart the indexer - should delete data from the partial insert above val end2 = executeSql(backend.parameter.ledgerEnd) @@ -154,7 +155,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion ) ) - val metering2 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) + val metering2 = + executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None)) parties1 should have length 1 packages1 should have size 1 diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala index a3b7ef2d621c..5225bc6ed8f7 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala @@ -3,10 +3,15 @@ package com.daml.platform.store.backend -import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.{ + ParticipantMetering, + TransactionMetering, +} import com.daml.lf.data.Ref import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp +import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{Assertion, Inside} @@ -20,7 +25,7 @@ private[backend] trait StorageBackendTestsMetering import StorageBackendTestValues._ { - behavior of "StorageBackend (metering)" + behavior of "StorageBackend (read metering)" it should "persist transaction metering" in { @@ -38,7 +43,7 @@ private[backend] trait StorageBackendTestsMetering executeSql(ingest(Vector(dtoTransactionMetering(metering)), _)) executeSql(updateLedgerEnd(toOffset, 5L)) val Vector(actual) = - executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) + executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None)) actual shouldBe expected } @@ -90,7 +95,9 @@ private[backend] trait StorageBackendTestsMetering populate() val from = someTime.addMicros(fromIdx) val to = toIdx.map(someTime.addMicros) - val actual = executeSql(backend.metering.transactionMetering(from, to, applicationId)).toSet + val actual = executeSql( + backend.metering.read.transactionMetering(from, to, applicationId) + ).toSet val expected = execute(from, to, applicationId) actual.map(_.ledgerOffset) shouldBe expected.map(_.ledgerOffset) actual shouldBe expected @@ -106,4 +113,109 @@ private[backend] trait StorageBackendTestsMetering check(2, None, Some(appIdA)) } } + + { + behavior of "StorageBackend (metering parameters)" + + val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch) + + it should "fetch un-initialized ledger metering end" in { + executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe None + } + + it should "initialized ledger metering end" in { + val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch) + executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(expected)) + executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected) + } + + it should "update ledger metering end with `before begin` offset" in { + executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd)) + val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.now()) + executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected)) + executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected) + } + + it should "update ledger metering end with valid offset" in { + executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd)) + val expected = LedgerMeteringEnd( + Offset.fromHexString(Ref.HexString.assertFromString("07")), + Timestamp.now(), + ) + executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected)) + executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected) + } + + } + + { + behavior of "StorageBackend (write metering)" + + val metering = Vector(7L, 8L, 9L, 10L).map { i => + TransactionMetering( + someApplicationId, + actionCount = 1, + meteringTimestamp = someTime.addMicros(i), + ledgerOffset = offset(i), + ) + } + + val meteringOffsets = metering.map(_.ledgerOffset) + val firstOffset = meteringOffsets.min + val lastOffset = meteringOffsets.max + val lastTime = metering.map(_.meteringTimestamp).max + + it should "return the maximum transaction metering offset" in { + + def check(from: Offset, to: Timestamp): Assertion = { + val expected = metering + .filter(_.ledgerOffset > from) + .filter(_.meteringTimestamp < to) + .map(_.ledgerOffset) + .maxOption + val actual = executeSql(backend.metering.write.transactionMeteringMaxOffset(from, to)) + actual shouldBe expected + } + + executeSql(ingest(metering.map(dtoTransactionMetering), _)) + + check(firstOffset, lastTime) // 9 + check(firstOffset, lastTime.addMicros(1)) // 10 + check(lastOffset, lastTime.addMicros(1)) // Unset + + } + + it should "select transaction metering for aggregation" in { + + executeSql(ingest(metering.map(dtoTransactionMetering), _)) + + val nextLastOffset: Offset = meteringOffsets.filter(_ < lastOffset).max + val expected = meteringOffsets.filter(_ > firstOffset).filter(_ <= nextLastOffset).toSet + val actual = executeSql( + backend.metering.write.transactionMetering(firstOffset, nextLastOffset) + ).map(_.ledgerOffset).toSet + actual shouldBe expected + } + + it should "insert new participant metering records" in { + + val expected = Vector(7L, 8L, 9L).map { i => + ParticipantMetering( + someApplicationId, + someTime.addMicros(i), + someTime.addMicros(i + 1), + actionCount = 1, + ledgerOffset = offset(i), + ) + } + + executeSql(backend.metering.write.insertParticipantMetering(expected)) + val actual = + executeSql(backend.metering.write.allParticipantMetering()).sortBy(_.ledgerOffset) + actual shouldBe expected + + } + + } + } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala new file mode 100644 index 000000000000..e7e31d260565 --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala @@ -0,0 +1,235 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer + +import com.codahale.metrics.MetricRegistry +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.{ + ParticipantMetering, + TransactionMetering, +} +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext +import com.daml.metrics.{DatabaseMetrics, Metrics} +import com.daml.platform.store.appendonlydao.SqlExecutor +import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd +import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd +import com.daml.platform.store.backend.{ + MeteringParameterStorageBackend, + MeteringStorageWriteBackend, + ParameterStorageBackend, +} +import org.mockito.ArgumentMatchersSugar.any +import org.mockito.MockitoSugar +import org.mockito.captor.ArgCaptor +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import java.sql.Connection +import java.time.temporal.ChronoUnit +import java.time.{LocalDate, LocalTime, OffsetDateTime, ZoneOffset} +import scala.concurrent.Future + +//noinspection TypeAnnotation +final class MeteringAggregatorSpec extends AnyWordSpecLike with MockitoSugar with Matchers { + + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting +// private implicit val ec = scala.concurrent.ExecutionContext.global + private val metrics = new Metrics(new MetricRegistry) + private def toTS(t: OffsetDateTime): Timestamp = Timestamp.assertFromInstant(t.toInstant) + + "MeteringAggregator" should { + + val applicationA = Ref.ApplicationId.assertFromString("appA") + val applicationB = Ref.ApplicationId.assertFromString("appB") + + class TestSetup { + + val lastAggEndTime: OffsetDateTime = + OffsetDateTime.of(LocalDate.now(), LocalTime.of(15, 0), ZoneOffset.UTC) + val nextAggEndTime: OffsetDateTime = lastAggEndTime.plusHours(1) + val timeNow: OffsetDateTime = lastAggEndTime.plusHours(1).plusMinutes(+5) + val lastAggOffset: Offset = Offset.fromHexString(Ref.HexString.assertFromString("01")) + + val conn: Connection = mock[Connection] + val dispatcher: SqlExecutor = new SqlExecutor { + override def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit + loggingContext: LoggingContext + ): Future[T] = Future.successful { + sql(conn) + } + } + + val parameterStore: ParameterStorageBackend = mock[ParameterStorageBackend] + val meteringParameterStore: MeteringParameterStorageBackend = + mock[MeteringParameterStorageBackend] + val meteringStore: MeteringStorageWriteBackend = mock[MeteringStorageWriteBackend] + + def runUnderTest( + transactionMetering: Vector[TransactionMetering], + maybeLedgerEnd: Option[Offset] = None, + ): Future[Unit] = { + + val ledgerEndOffset = (maybeLedgerEnd, transactionMetering.lastOption) match { + case (Some(le), _) => le + case (None, Some(t)) => t.ledgerOffset + case (None, None) => lastAggOffset + } + + when(meteringParameterStore.ledgerMeteringEnd(conn)) + .thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime)))) + + when(meteringStore.transactionMeteringMaxOffset(lastAggOffset, toTS(nextAggEndTime))(conn)) + .thenReturn(transactionMetering.lastOption.map(_.ledgerOffset)) + + when(parameterStore.ledgerEnd(conn)) + .thenReturn(LedgerEnd(ledgerEndOffset, 0L, 0)) + + transactionMetering.lastOption.map { last => + when(meteringStore.transactionMetering(lastAggOffset, last.ledgerOffset)(conn)) + .thenReturn(transactionMetering) + } + + new MeteringAggregator( + meteringStore, + parameterStore, + meteringParameterStore, + metrics, + dispatcher, + () => toTS(timeNow), + ) + .run() + + } + } + + "aggregate transaction metering records" in new TestSetup { + + val transactionMetering = Vector(10, 15, 20).map { i => + TransactionMetering( + applicationId = applicationA, + actionCount = i, + meteringTimestamp = toTS(lastAggEndTime.plusMinutes(i.toLong)), + ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString(i.toString)), + ) + } + + val expected: ParticipantMetering = ParticipantMetering( + applicationA, + from = toTS(lastAggEndTime), + to = toTS(nextAggEndTime), + actionCount = transactionMetering.map(_.actionCount).sum, + transactionMetering.last.ledgerOffset, + ) + + runUnderTest(transactionMetering) + + verify(meteringStore).insertParticipantMetering(Vector(expected))(conn) + verify(meteringParameterStore).updateLedgerMeteringEnd( + LedgerMeteringEnd(expected.ledgerOffset, expected.to) + )(conn) + + } + + "not aggregate if there is not a full period" in new TestSetup { + override val timeNow = lastAggEndTime.plusHours(1).plusMinutes(-5) + when(meteringParameterStore.ledgerMeteringEnd(conn)) + .thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime)))) + runUnderTest(Vector.empty) + verifyNoMoreInteractions(meteringStore) + } + + "aggregate over multiple applications" in new TestSetup { + + val expected = Set(applicationA, applicationB) + + val transactionMetering = expected.toVector.map { a => + TransactionMetering( + applicationId = a, + actionCount = 1, + meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)), + ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("10")), + ) + } + + runUnderTest(transactionMetering) + + val participantMeteringCaptor = ArgCaptor[Vector[ParticipantMetering]] + verify(meteringStore).insertParticipantMetering(participantMeteringCaptor)(any[Connection]) + participantMeteringCaptor.value.map(_.applicationId).toSet shouldBe expected + + } + + "increase ledger metering end even if there are not transaction metering records" in new TestSetup { + + runUnderTest(Vector.empty[TransactionMetering]) + + verify(meteringParameterStore).updateLedgerMeteringEnd( + LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime.plusHours(1))) + )(conn) + + } + + "skip aggregation if the last transaction metering offset within the time range has not been fully ingested" in new TestSetup { + + val transactionMetering = Vector( + TransactionMetering( + applicationId = applicationA, + actionCount = 1, + meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)), + ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("03")), + ) + ) + + runUnderTest( + transactionMetering, + maybeLedgerEnd = Some(Offset.fromHexString(Ref.HexString.assertFromString("02"))), + ) + + verify(meteringParameterStore, never).updateLedgerMeteringEnd(any[LedgerMeteringEnd])( + any[Connection] + ) + + } + + "fail if an attempt is made to run un-initialized" in new TestSetup { + // Note this only works as we do not use a real future for testing + intercept[IllegalStateException] { + when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None) + val underTest = + new MeteringAggregator( + meteringStore, + parameterStore, + meteringParameterStore, + metrics, + dispatcher, + () => toTS(timeNow), + ) + underTest.run() + } + } + + "initialize the metering ledger end to the hour before the current hour" in new TestSetup { + when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None) + val underTest = + new MeteringAggregator( + meteringStore, + parameterStore, + meteringParameterStore, + metrics, + dispatcher, + () => toTS(timeNow), + ) + underTest.initialize() + val expected = LedgerMeteringEnd( + Offset.beforeBegin, + toTS(timeNow.truncatedTo(ChronoUnit.HOURS).minusHours(1)), + ) + verify(meteringParameterStore).initializeLedgerMeteringEnd(expected)(conn) + } + + } + +} diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala index 79c43e369e13..4e8ed60ca2b3 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala @@ -4,7 +4,6 @@ package com.daml.platform.store.appendonlydao import java.sql.Connection - import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala index c7b248b558b6..cda36f035a09 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala @@ -21,10 +21,20 @@ trait MeteringStore { } object MeteringStore { + case class TransactionMetering( applicationId: Ref.ApplicationId, actionCount: Int, meteringTimestamp: Timestamp, ledgerOffset: Offset, ) + + case class ParticipantMetering( + applicationId: Ref.ApplicationId, + from: Timestamp, + to: Timestamp, + actionCount: Int, + ledgerOffset: Offset, + ) + }