diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
index 8e5b02e75514..c1bacd833a88 100644
--- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
+++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
@@ -515,6 +515,11 @@ final class Metrics(val registry: MetricRegistry) {
"loadStringInterningEntries"
)
+ val meteringAggregator: DatabaseMetrics = createDbMetrics("metering_aggregator")
+ val initializeMeteringAggregator: DatabaseMetrics = createDbMetrics(
+ "initialize_metering_aggregator"
+ )
+
object translation {
private val Prefix: MetricName = db.Prefix :+ "translation"
val cache = new CacheMetrics(registry, Prefix :+ "cache")
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256
index 51da90ee89e5..72ab29514e70 100644
--- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256
@@ -1 +1 @@
-6b18e00f6dc6d79123a115a080b4092fc94350cf582f9373e122837f7ef741e7
+40d6f8974169a2d8702b31a9edbe2e22527aa837f0f4057922814631f40510ef
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql
index 6e4020219dcf..6840987a2466 100644
--- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql
@@ -538,3 +538,18 @@ CREATE TABLE transaction_metering (
);
CREATE INDEX transaction_metering_ledger_offset ON transaction_metering(ledger_offset);
+
+CREATE TABLE metering_parameters (
+ ledger_metering_end VARCHAR,
+ ledger_metering_timestamp BIGINT NOT NULL
+);
+
+CREATE TABLE participant_metering (
+ application_id VARCHAR NOT NULL,
+ from_timestamp BIGINT NOT NULL,
+ to_timestamp BIGINT NOT NULL,
+ action_count INTEGER NOT NULL,
+ ledger_offset VARCHAR NOT NULL
+);
+
+CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256
new file mode 100644
index 000000000000..fbe5d8a4d66b
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256
@@ -0,0 +1 @@
+45e3500305c86bf7928f78ba6b97572cf8f900e18805e181b44d7837fe0c3c09
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql
new file mode 100644
index 000000000000..300dc5bbb339
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql
@@ -0,0 +1,24 @@
+-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+-- SPDX-License-Identifier: Apache-2.0
+
+-- Transaction metering alterations
+ALTER TABLE transaction_metering MODIFY application_id VARCHAR2(4000);
+
+-- Create metering parameters
+CREATE TABLE metering_parameters (
+ ledger_metering_end VARCHAR2(4000),
+ ledger_metering_timestamp NUMBER NOT NULL
+);
+
+-- Create participant metering
+CREATE TABLE participant_metering (
+ application_id VARCHAR2(4000) NOT NULL,
+ from_timestamp NUMBER NOT NULL,
+ to_timestamp NUMBER NOT NULL,
+ action_count NUMBER NOT NULL,
+ ledger_offset VARCHAR2(4000) NOT NULL
+);
+
+CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);
+
+
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256
new file mode 100644
index 000000000000..5ed53cfcafb1
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256
@@ -0,0 +1 @@
+08ea4e020bc4ed309d0345d1d0b9e4cbe4f25edc79d3415a05538cacdedb1470
diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql
new file mode 100644
index 000000000000..c57f5077ead3
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql
@@ -0,0 +1,35 @@
+
+-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+-- SPDX-License-Identifier: Apache-2.0
+
+-- Transaction metering alterations
+SELECT 'Harmonise the transaction_metering columns with other examples in the db';
+
+ALTER TABLE transaction_metering ALTER application_id TYPE TEXT;
+ALTER TABLE transaction_metering ALTER ledger_offset TYPE TEXT;
+
+-- Parameter alterations
+SELECT 'Add Metering Parameters: Creating table...';
+
+-- Create metering parameters
+CREATE TABLE metering_parameters (
+ ledger_metering_end TEXT,
+ ledger_metering_timestamp BIGINT NOT NULL
+);
+
+-- Create participant metering
+SELECT 'Add Participant Metering: Creating table...';
+
+CREATE TABLE participant_metering (
+ application_id TEXT NOT NULL,
+ from_timestamp BIGINT NOT NULL,
+ to_timestamp BIGINT NOT NULL,
+ action_count INTEGER NOT NULL,
+ ledger_offset TEXT NOT NULL
+);
+
+SELECT 'Add Participant Metering: Creating indexes...';
+
+CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);
+
+SELECT 'Add Participant Metering: Done.';
diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala
index 0445d0ee6964..1f8362f701ee 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala
@@ -38,7 +38,9 @@ object JdbcIndexer {
val factory = StorageBackendFactory.of(DbType.jdbcType(config.jdbcUrl))
val dataSourceStorageBackend = factory.createDataSourceStorageBackend
val ingestionStorageBackend = factory.createIngestionStorageBackend
+ val meteringStoreBackend = factory.createMeteringStorageWriteBackend
val parameterStorageBackend = factory.createParameterStorageBackend
+ val meteringParameterStorageBackend = factory.createMeteringParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
@@ -92,9 +94,16 @@ object JdbcIndexer {
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
+ meteringAggregator = new MeteringAggregator.Owner(
+ meteringStore = meteringStoreBackend,
+ meteringParameterStore = meteringParameterStorageBackend,
+ parameterStore = parameterStorageBackend,
+ metrics = metrics,
+ ).apply,
mat = materializer,
readService = readService,
)
+
indexer
}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala
new file mode 100644
index 000000000000..20291eb042b3
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala
@@ -0,0 +1,204 @@
+// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer
+
+import com.daml.ledger.offset.Offset
+import com.daml.ledger.participant.state.index.v2.MeteringStore
+import com.daml.ledger.participant.state.index.v2.MeteringStore.ParticipantMetering
+import com.daml.ledger.resources.ResourceOwner
+import com.daml.lf.data.Time.Timestamp
+import com.daml.logging.{ContextualizedLogger, LoggingContext}
+import com.daml.metrics.Metrics
+import com.daml.platform.indexer.MeteringAggregator.{toOffsetDateTime, toTimestamp}
+import com.daml.platform.store.appendonlydao.SqlExecutor
+import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
+import com.daml.platform.store.backend.{
+ MeteringParameterStorageBackend,
+ MeteringStorageWriteBackend,
+ ParameterStorageBackend,
+}
+
+import java.sql.Connection
+import java.time.temporal.ChronoUnit
+import java.time.{OffsetDateTime, ZoneOffset}
+import java.util.{Timer, TimerTask}
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+object MeteringAggregator {
+
+ private val logger = ContextualizedLogger.get(getClass)
+
+ class Owner(
+ meteringStore: MeteringStorageWriteBackend,
+ parameterStore: ParameterStorageBackend,
+ meteringParameterStore: MeteringParameterStorageBackend,
+ metrics: Metrics,
+ period: FiniteDuration = 6.minutes,
+ maxTaskDuration: FiniteDuration = 6.hours,
+ ) {
+
+ private[platform] def apply(
+ sqlExecutor: SqlExecutor
+ )(implicit loggingContext: LoggingContext): ResourceOwner[Unit] = {
+ val aggregator = new MeteringAggregator(
+ meteringStore,
+ parameterStore,
+ meteringParameterStore,
+ metrics,
+ sqlExecutor,
+ )
+ for {
+ _ <- ResourceOwner.forFuture(() => aggregator.initialize())
+ _ <- ResourceOwner.forTimer(() => new Timer()).map { timer =>
+ timer.scheduleAtFixedRate(
+ new TimerTask {
+ override def run(): Unit = {
+ Try {
+ Await.ready(aggregator.run(), maxTaskDuration)
+ } match {
+ case Success(_) => ()
+ case Failure(e) =>
+ logger.error(s"Metering not aggregated after ${maxTaskDuration}", e)
+ }
+ }
+ },
+ period.toMillis,
+ period.toMillis,
+ )
+ }
+ } yield ()
+ }
+ }
+
+ private def toTimestamp(dateTime: OffsetDateTime): Timestamp =
+ Timestamp.assertFromInstant(dateTime.toInstant)
+ private def toOffsetDateTime(timestamp: Timestamp): OffsetDateTime =
+ OffsetDateTime.ofInstant(timestamp.toInstant, ZoneOffset.UTC)
+
+}
+
+class MeteringAggregator(
+ meteringStore: MeteringStorageWriteBackend,
+ parameterStore: ParameterStorageBackend,
+ meteringParameterStore: MeteringParameterStorageBackend,
+ metrics: Metrics,
+ sqlExecutor: SqlExecutor,
+ clock: () => Timestamp = () => Timestamp.now(),
+)(implicit loggingContext: LoggingContext) {
+
+ private val parasitic: ExecutionContext = ExecutionContext.parasitic
+
+ private val logger = ContextualizedLogger.get(getClass)
+
+ private[platform] def initialize(): Future[Unit] = {
+ val initTimestamp = toOffsetDateTime(clock()).truncatedTo(ChronoUnit.HOURS).minusHours(1)
+ val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, toTimestamp(initTimestamp))
+ sqlExecutor.executeSql(metrics.daml.index.db.initializeMeteringAggregator) {
+ meteringParameterStore.initializeLedgerMeteringEnd(initLedgerMeteringEnd)
+ }
+ }
+
+ private[platform] def run(): Future[Unit] = {
+
+ val future = sqlExecutor.executeSql(metrics.daml.index.db.meteringAggregator) { conn =>
+ val nowUtcTime = toOffsetDateTime(clock())
+ val lastLedgerMeteringEnd = getLedgerMeteringEnd(conn)
+ val startUtcTime: OffsetDateTime = toOffsetDateTime(lastLedgerMeteringEnd.timestamp)
+ val endUtcTime = startUtcTime.plusHours(1)
+
+ if (nowUtcTime.isAfter(endUtcTime)) {
+
+ val toEndTime = toTimestamp(endUtcTime)
+ val ingestedLedgerEnd = parameterStore.ledgerEnd(conn).lastOffset
+ val maybeMaxOffset =
+ meteringStore.transactionMeteringMaxOffset(lastLedgerMeteringEnd.offset, toEndTime)(conn)
+
+ val (
+ periodIngested, // This is true if the time period is closed fully ingested
+ hasMetering, // This is true if there are transaction_metering records to aggregate
+ toOffsetEnd, // This is the 'to' offset for the period being aggregated
+ ) = maybeMaxOffset match {
+ case Some(offset) => (offset <= ingestedLedgerEnd, true, offset)
+ case None => (true, false, lastLedgerMeteringEnd.offset)
+ }
+
+ if (periodIngested) {
+ Some(
+ aggregate(
+ conn = conn,
+ lastLedgerMeteringEnd = lastLedgerMeteringEnd,
+ thisLedgerMeteringEnd = LedgerMeteringEnd(toOffsetEnd, toEndTime),
+ hasMetering = hasMetering,
+ )
+ )
+ } else {
+ logger.info("Not all transaction metering for aggregation time period is yet ingested")
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+ future.onComplete({
+ case Success(None) => logger.info("No transaction metering aggregation required")
+ case Success(Some(lme)) =>
+ logger.info(s"Aggregating transaction metering completed up to $lme")
+ case Failure(e) => logger.error("Failed to aggregate transaction metering", e)
+ })(parasitic)
+
+ future.map(_ => ())(parasitic)
+ }
+
+ private def aggregate(
+ conn: Connection,
+ lastLedgerMeteringEnd: LedgerMeteringEnd,
+ thisLedgerMeteringEnd: LedgerMeteringEnd,
+ hasMetering: Boolean,
+ ): LedgerMeteringEnd = {
+ logger.info(s"Aggregating transaction metering for $thisLedgerMeteringEnd")
+
+ if (hasMetering) {
+ populateParticipantMetering(conn, lastLedgerMeteringEnd, thisLedgerMeteringEnd)
+ }
+
+ meteringParameterStore.updateLedgerMeteringEnd(thisLedgerMeteringEnd)(conn)
+
+ thisLedgerMeteringEnd
+ }
+
+ private def getLedgerMeteringEnd(conn: Connection): LedgerMeteringEnd =
+ meteringParameterStore.ledgerMeteringEnd(conn).getOrElse {
+ throw new IllegalStateException("Ledger metering is not initialized")
+ }
+
+ private def populateParticipantMetering(
+ conn: Connection,
+ lastLedgerMeteringEnd: LedgerMeteringEnd,
+ thisLedgerMeteringEnd: LedgerMeteringEnd,
+ ): Unit = {
+
+ val transactionMetering: Seq[MeteringStore.TransactionMetering] =
+ meteringStore.transactionMetering(lastLedgerMeteringEnd.offset, thisLedgerMeteringEnd.offset)(
+ conn
+ )
+
+ val participantMetering = transactionMetering
+ .groupBy(_.applicationId)
+ .map { case (applicationId, metering) =>
+ ParticipantMetering(
+ applicationId = applicationId,
+ from = lastLedgerMeteringEnd.timestamp,
+ to = thisLedgerMeteringEnd.timestamp,
+ actionCount = metering.map(_.actionCount).sum,
+ ledgerOffset = metering.map(_.ledgerOffset).max,
+ )
+ }
+ .toVector
+
+ meteringStore.insertParticipantMetering(participantMetering)(conn)
+ }
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala
index b689ccf8976e..c75cf6f4d428 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala
@@ -3,18 +3,15 @@
package com.daml.platform.indexer.parallel
-import java.util.Timer
-import java.util.concurrent.Executors
-
import akka.stream.{KillSwitch, Materializer}
import com.daml.ledger.participant.state.v2.ReadService
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
+import com.daml.platform.indexer.Indexer
import com.daml.platform.indexer.ha.{HaConfig, HaCoordinator, Handle, NoopHaCoordinator}
import com.daml.platform.indexer.parallel.AsyncSupport._
-import com.daml.platform.indexer.Indexer
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{
@@ -25,10 +22,12 @@ import com.daml.platform.store.backend.{
import com.daml.platform.store.interning.StringInterningView
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import java.util.Timer
+import java.util.concurrent.Executors
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
object ParallelIndexerFactory {
@@ -45,6 +44,7 @@ object ParallelIndexerFactory {
initializeParallelIngestion: InitializeParallelIngestion,
parallelIndexerSubscription: ParallelIndexerSubscription[_],
stringInterningStorageBackend: StringInterningStorageBackend,
+ meteringAggregator: DbDispatcher => ResourceOwner[Unit],
mat: Materializer,
readService: ReadService,
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] =
@@ -95,24 +95,27 @@ object ParallelIndexerFactory {
implicit val ec: ExecutionContext = resourceContext.executionContext
haCoordinator.protectedExecution(connectionInitializer =>
initializeHandle(
- DbDispatcher
- .owner(
- // this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
- // therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
- dataSource = dataSourceStorageBackend.createDataSource(
- jdbcUrl = jdbcUrl,
- dataSourceConfig = dataSourceConfig,
- connectionInitHook = Some(connectionInitializer.initialize),
- ),
- serverRole = ServerRole.Indexer,
- connectionPoolSize =
- ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
- connectionTimeout = FiniteDuration(
- 250,
- "millis",
- ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
- metrics = metrics,
- )
+ for {
+ dbDispatcher <- DbDispatcher
+ .owner(
+ // this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
+ // therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
+ dataSource = dataSourceStorageBackend.createDataSource(
+ jdbcUrl = jdbcUrl,
+ dataSourceConfig = dataSourceConfig,
+ connectionInitHook = Some(connectionInitializer.initialize),
+ ),
+ serverRole = ServerRole.Indexer,
+ connectionPoolSize =
+ ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
+ connectionTimeout = FiniteDuration(
+ 250,
+ "millis",
+ ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
+ metrics = metrics,
+ )
+ _ <- meteringAggregator(dbDispatcher)
+ } yield dbDispatcher
) { dbDispatcher =>
val stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala
index 8e08ed34bb1e..c3bea1a001ab 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala
@@ -8,6 +8,7 @@ import anorm._
import com.daml.ledger.offset.Offset
import com.daml.lf.crypto.Hash
import com.daml.lf.data.Ref
+import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId
import com.daml.lf.value.Value
import spray.json.DefaultJsonProtocol._
@@ -354,6 +355,11 @@ private[platform] object Conversions {
def offset(name: String): RowParser[Offset] =
SqlParser.get[String](name).map(v => Offset.fromHexString(Ref.HexString.assertFromString(v)))
+ def offset(position: Int): RowParser[Offset] =
+ SqlParser
+ .get[String](position)
+ .map(v => Offset.fromHexString(Ref.HexString.assertFromString(v)))
+
implicit val columnToOffset: Column[Offset] =
Column.nonNull((value: Any, meta) =>
Column
@@ -363,9 +369,17 @@ private[platform] object Conversions {
// Timestamp
+ implicit object TimestampToStatement extends ToStatement[Timestamp] {
+ override def set(s: PreparedStatement, index: Int, v: Timestamp): Unit =
+ s.setLong(index, v.micros)
+ }
+
def timestampFromMicros(name: String): RowParser[com.daml.lf.data.Time.Timestamp] =
SqlParser.get[Long](name).map(com.daml.lf.data.Time.Timestamp.assertFromLong)
+ def timestampFromMicros(position: Int): RowParser[com.daml.lf.data.Time.Timestamp] =
+ SqlParser.get[Long](position).map(com.daml.lf.data.Time.Timestamp.assertFromLong)
+
// Hash
implicit object HashToStatement extends ToStatement[Hash] {
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala
index d1389d37b891..4b744d90a1ac 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala
@@ -26,7 +26,8 @@ private[platform] final class DbDispatcher private (
overallWaitTimer: Timer,
overallExecutionTimer: Timer,
)(implicit loggingContext: LoggingContext)
- extends ReportsHealth {
+ extends SqlExecutor
+ with ReportsHealth {
private val logger = ContextualizedLogger.get(this.getClass)
private val executionContext = ExecutionContext.fromExecutor(
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala
new file mode 100644
index 000000000000..97f1107ab79a
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SqlExecutor.scala
@@ -0,0 +1,16 @@
+// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.store.appendonlydao
+
+import com.daml.logging.LoggingContext
+import com.daml.metrics.DatabaseMetrics
+
+import java.sql.Connection
+import scala.concurrent.Future
+
+private[platform] trait SqlExecutor {
+ def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
+ loggingContext: LoggingContext
+ ): Future[T]
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
index 004fc8e013a9..89e7788abc8b 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala
@@ -7,7 +7,10 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails, User,
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
-import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
+import com.daml.ledger.participant.state.index.v2.MeteringStore.{
+ ParticipantMetering,
+ TransactionMetering,
+}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{ApplicationId, UserId}
@@ -18,6 +21,7 @@ import com.daml.platform
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw}
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
+import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState
@@ -93,10 +97,13 @@ trait ParameterStorageBackend {
/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit
+
def prunedUpToInclusive(connection: Connection): Option[Offset]
+
def updatePrunedAllDivulgedContractsUpToInclusive(
prunedUpToInclusive: Offset
)(connection: Connection): Unit
+
def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset]
@@ -106,9 +113,9 @@ trait ParameterStorageBackend {
* - If no identity parameters are stored, then they are set to the given value.
* - If identity parameters are stored, then they are compared to the given ones.
* - Ledger identity parameters are written at most once, and are never overwritten.
- * No significant CPU load, mostly blocking JDBC communication with the database backend.
+ * No significant CPU load, mostly blocking JDBC communication with the database backend.
*
- * This method is NOT safe to call concurrently.
+ * This method is NOT safe to call concurrently.
*/
def initializeParameters(params: ParameterStorageBackend.IdentityParams)(connection: Connection)(
implicit loggingContext: LoggingContext
@@ -118,6 +125,24 @@ trait ParameterStorageBackend {
def ledgerIdentity(connection: Connection): Option[ParameterStorageBackend.IdentityParams]
}
+object MeteringParameterStorageBackend {
+ case class LedgerMeteringEnd(offset: Offset, timestamp: Timestamp)
+}
+
+trait MeteringParameterStorageBackend {
+
+ /** Initialize the ledger metering end parameters if unset */
+ def initializeLedgerMeteringEnd(init: LedgerMeteringEnd)(connection: Connection)(implicit
+ loggingContext: LoggingContext
+ ): Unit
+
+ /** The timestamp and offset for which billable metering is available */
+ def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd]
+
+ /** Update the timestamp and offset for which billable metering is available */
+ def updateLedgerMeteringEnd(ledgerMeteringEnd: LedgerMeteringEnd)(connection: Connection): Unit
+}
+
object ParameterStorageBackend {
case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int) {
def lastOffsetOption: Option[Offset] =
@@ -128,6 +153,7 @@ object ParameterStorageBackend {
ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0)
}
case class IdentityParams(ledgerId: LedgerId, participantId: ParticipantId)
+
}
trait ConfigurationStorageBackend {
@@ -424,7 +450,7 @@ object UserManagementStorageBackend {
case class DbUserRight(domainRight: UserRight, grantedAt: Long)
}
-trait MeteringStorageBackend {
+trait MeteringStorageReadBackend {
def transactionMetering(
from: Timestamp,
@@ -432,3 +458,30 @@ trait MeteringStorageBackend {
applicationId: Option[ApplicationId],
)(connection: Connection): Vector[TransactionMetering]
}
+
+trait MeteringStorageWriteBackend {
+
+ /** This method will return the maximum offset of the transaction_metering record
+ * which has an offset greater than the from offset and a timestamp prior to the
+ * to timestamp, if any.
+ *
+ * Note that the offset returned may not have been fully ingested. This is to allow the metering to wait if there
+ * are still un-fully ingested records withing the time window.
+ */
+ def transactionMeteringMaxOffset(from: Offset, to: Timestamp)(
+ connection: Connection
+ ): Option[Offset]
+
+ /** This method will return all transaction metering records between the from offset (exclusive)
+ * and the to offset (inclusive).
+ */
+ def transactionMetering(from: Offset, to: Offset)(
+ connection: Connection
+ ): Vector[TransactionMetering]
+
+ def insertParticipantMetering(metering: Vector[ParticipantMetering])(connection: Connection): Unit
+
+ /** Test Only - will be removed once reporting can be based if participant metering */
+ def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering]
+
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala
index d8dbf2c86f6d..59365f9f5cde 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala
@@ -13,6 +13,7 @@ import com.daml.platform.store.interning.StringInterning
trait StorageBackendFactory {
def createIngestionStorageBackend: IngestionStorageBackend[_]
def createParameterStorageBackend: ParameterStorageBackend
+ def createMeteringParameterStorageBackend: MeteringParameterStorageBackend
def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
@@ -31,7 +32,8 @@ trait StorageBackendFactory {
def createResetStorageBackend: ResetStorageBackend
def createStringInterningStorageBackend: StringInterningStorageBackend
def createUserManagementStorageBackend: UserManagementStorageBackend
- def createMeteringStorageBackend(ledgerEndCache: LedgerEndCache): MeteringStorageBackend
+ def createMeteringStorageReadBackend(ledgerEndCache: LedgerEndCache): MeteringStorageReadBackend
+ def createMeteringStorageWriteBackend: MeteringStorageWriteBackend
final def readStorageBackend(
ledgerEndCache: LedgerEndCache,
@@ -44,7 +46,7 @@ trait StorageBackendFactory {
completionStorageBackend = createCompletionStorageBackend(stringInterning),
contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning),
eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning),
- meteringStorageBackend = createMeteringStorageBackend(ledgerEndCache),
+ meteringStorageBackend = createMeteringStorageReadBackend(ledgerEndCache),
)
}
@@ -64,5 +66,5 @@ case class ReadStorageBackend(
completionStorageBackend: CompletionStorageBackend,
contractStorageBackend: ContractStorageBackend,
eventStorageBackend: EventStorageBackend,
- meteringStorageBackend: MeteringStorageBackend,
+ meteringStorageBackend: MeteringStorageReadBackend,
)
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala
index 5d9653d903dc..95cb6a919fb0 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala
@@ -14,6 +14,9 @@ trait CommonStorageBackendFactory extends StorageBackendFactory {
override val createParameterStorageBackend: ParameterStorageBackend =
ParameterStorageBackendTemplate
+ override val createMeteringParameterStorageBackend: MeteringParameterStorageBackend =
+ MeteringParameterStorageBackendTemplate
+
override def createConfigurationStorageBackend(
ledgerEndCache: LedgerEndCache
): ConfigurationStorageBackend =
@@ -28,8 +31,13 @@ trait CommonStorageBackendFactory extends StorageBackendFactory {
override val createUserManagementStorageBackend: UserManagementStorageBackend =
UserManagementStorageBackendTemplate
- override def createMeteringStorageBackend(
+ override def createMeteringStorageReadBackend(
ledgerEndCache: LedgerEndCache
- ): MeteringStorageBackend =
- new MeteringStorageBackendTemplate(ledgerEndCache)
+ ): MeteringStorageReadBackend =
+ new MeteringStorageBackendReadTemplate(ledgerEndCache)
+
+ def createMeteringStorageWriteBackend: MeteringStorageWriteBackend = {
+ MeteringStorageBackendWriteTemplate
+ }
+
}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala
new file mode 100644
index 000000000000..8cfe9050e8cb
--- /dev/null
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringParameterStorageBackendTemplate.scala
@@ -0,0 +1,76 @@
+// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com
+package daml.platform.store.backend.common
+
+import daml.ledger.offset.Offset
+import daml.logging.{ContextualizedLogger, LoggingContext}
+import daml.platform.store.Conversions.{offset, timestampFromMicros}
+import daml.platform.store.backend.MeteringParameterStorageBackend
+import daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
+import daml.scalautil.Statement.discard
+import anorm.{RowParser, ~}
+import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
+
+import java.sql.Connection
+
+private[backend] object MeteringParameterStorageBackendTemplate
+ extends MeteringParameterStorageBackend {
+
+ private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
+
+ def initializeLedgerMeteringEnd(
+ init: LedgerMeteringEnd
+ )(connection: Connection)(implicit loggingContext: LoggingContext): Unit = {
+ import com.daml.platform.store.Conversions.OffsetToStatement
+ import com.daml.platform.store.Conversions.TimestampToStatement
+ ledgerMeteringEnd(connection) match {
+ case None =>
+ logger.info(s"Initializing ledger metering end to $init")
+ discard(
+ SQL"""insert into metering_parameters(
+ ledger_metering_end,
+ ledger_metering_timestamp
+ ) values (
+ ${init.offset},
+ ${init.timestamp}
+ )"""
+ .execute()(connection)
+ )
+ case Some(existing) =>
+ logger.info(s"Found existing ledger metering end $existing")
+ }
+ }
+
+ def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd] = {
+
+ val LedgerMeteringEndParser: RowParser[LedgerMeteringEnd] = (
+ offset("ledger_metering_end").?.map(_.getOrElse(Offset.beforeBegin)) ~
+ timestampFromMicros("ledger_metering_timestamp")
+ ) map { case ledgerMeteringEnd ~ ledgerMeteringTimestamp =>
+ LedgerMeteringEnd(ledgerMeteringEnd, ledgerMeteringTimestamp)
+ }
+
+ SQL"""SELECT ledger_metering_end, ledger_metering_timestamp FROM metering_parameters"""
+ .as(LedgerMeteringEndParser.singleOpt)(connection)
+
+ }
+
+ def updateLedgerMeteringEnd(
+ ledgerMeteringEnd: LedgerMeteringEnd
+ )(connection: Connection): Unit = {
+ import com.daml.platform.store.Conversions.OffsetToStatement
+ import com.daml.platform.store.Conversions.TimestampToStatement
+ SQL"""
+ UPDATE
+ metering_parameters
+ SET
+ ledger_metering_end = ${ledgerMeteringEnd.offset},
+ ledger_metering_timestamp = ${ledgerMeteringEnd.timestamp}
+ """
+ .execute()(connection)
+ ()
+ }
+
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala
index b12e31cb98ce..a11c9473b03f 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala
@@ -5,21 +5,26 @@ package com.daml.platform.store.backend.common
import anorm.SqlParser.int
import anorm.{RowParser, ~}
+import com.daml.ledger.offset.Offset
+import com.daml.ledger.participant.state.index.v2.MeteringStore.{
+ ParticipantMetering,
+ TransactionMetering,
+}
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time
+import com.daml.lf.data.Time.Timestamp
import com.daml.platform.store.Conversions.{applicationId, offset, timestampFromMicros}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
-import com.daml.platform.store.backend.MeteringStorageBackend
-import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
+import com.daml.platform.store.backend.common.MeteringStorageBackendTemplate.transactionMeteringParser
+import com.daml.platform.store.backend.{MeteringStorageReadBackend, MeteringStorageWriteBackend}
import com.daml.platform.store.cache.LedgerEndCache
import java.sql.Connection
-private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndCache)
- extends MeteringStorageBackend {
+private[backend] object MeteringStorageBackendTemplate {
- private val transactionMeteringParser: RowParser[TransactionMetering] = {
+ val transactionMeteringParser: RowParser[TransactionMetering] = {
(
applicationId("application_id") ~
int("action_count") ~
@@ -31,15 +36,20 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC
meteringTimestamp ~
ledgerOffset =>
TransactionMetering(
- applicationId,
- actionCount,
- meteringTimestamp,
- ledgerOffset,
+ applicationId = applicationId,
+ actionCount = actionCount,
+ meteringTimestamp = meteringTimestamp,
+ ledgerOffset = ledgerOffset,
)
}
}
+}
+
+private[backend] class MeteringStorageBackendReadTemplate(ledgerEndCache: LedgerEndCache)
+ extends MeteringStorageReadBackend {
+
override def transactionMetering(
from: Time.Timestamp,
to: Option[Time.Timestamp],
@@ -66,3 +76,95 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC
private def isSet(o: Option[_]): Int = o.fold(0)(_ => 1)
}
+private[backend] object MeteringStorageBackendWriteTemplate extends MeteringStorageWriteBackend {
+
+ val participantMeteringParser: RowParser[ParticipantMetering] = {
+ (
+ applicationId("application_id") ~
+ timestampFromMicros("from_timestamp") ~
+ timestampFromMicros("to_timestamp") ~
+ int("action_count") ~
+ offset("ledger_offset")
+ ).map {
+ case applicationId ~
+ from ~
+ to ~
+ actionCount ~
+ ledgerOffset =>
+ ParticipantMetering(
+ applicationId,
+ from,
+ to,
+ actionCount,
+ ledgerOffset,
+ )
+ }
+
+ }
+
+ def transactionMeteringMaxOffset(from: Offset, to: Timestamp)(
+ connection: Connection
+ ): Option[Offset] = {
+
+ import com.daml.platform.store.Conversions.OffsetToStatement
+ import com.daml.platform.store.Conversions.TimestampToStatement
+
+ SQL"""
+ select max(ledger_offset)
+ from transaction_metering
+ where ledger_offset > $from
+ and metering_timestamp < $to
+ """
+ .as(offset(1).?.single)(connection)
+ }
+
+ def transactionMetering(from: Offset, to: Offset)(
+ connection: Connection
+ ): Vector[TransactionMetering] = {
+
+ import com.daml.platform.store.Conversions.OffsetToStatement
+
+ SQL"""
+ select
+ application_id,
+ action_count,
+ metering_timestamp,
+ ledger_offset
+ from transaction_metering
+ where ledger_offset > $from
+ and ledger_offset <= $to
+ """
+ .asVectorOf(transactionMeteringParser)(connection)
+ }
+
+ def insertParticipantMetering(metering: Vector[ParticipantMetering])(
+ connection: Connection
+ ): Unit = {
+
+ import com.daml.platform.store.Conversions.OffsetToStatement
+ import com.daml.platform.store.Conversions.TimestampToStatement
+
+ metering.foreach { participantMetering =>
+ import participantMetering._
+ SQL"""
+ insert into participant_metering(application_id, from_timestamp, to_timestamp, action_count, ledger_offset)
+ values (${participantMetering.applicationId.toString}, $from, $to, $actionCount, $ledgerOffset)
+ """.execute()(connection)
+ }
+
+ }
+
+ def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering] = {
+ SQL"""
+ select
+ application_id,
+ from_timestamp,
+ to_timestamp,
+ action_count,
+ ledger_offset
+ from participant_metering
+ """
+ .asVectorOf(participantMeteringParser)(connection)
+ }
+
+}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala
index a2f95886351f..db1f26159410 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala
@@ -3,21 +3,21 @@
package com.daml.platform.store.backend.common
-import java.sql.Connection
-
import anorm.SqlParser.{int, long}
import anorm.{RowParser, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.offset.Offset
-import com.daml.platform.store.Conversions.{ledgerString, offset}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.common.MismatchException
import com.daml.platform.store.Conversions
+import com.daml.platform.store.Conversions.{ledgerString, offset}
import com.daml.platform.store.backend.ParameterStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.scalautil.Statement.discard
import scalaz.syntax.tag._
+import java.sql.Connection
+
private[backend] object ParameterStorageBackendTemplate extends ParameterStorageBackend {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
@@ -193,4 +193,5 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage
connection
)
}
+
}
diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala
index ac2958417826..bf930aa9081f 100644
--- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala
+++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala
@@ -3,24 +3,8 @@
package com.daml.platform.store.backend.postgresql
-import com.daml.platform.store.backend.common.{
- CommonStorageBackendFactory,
- CompletionStorageBackendTemplate,
- ContractStorageBackendTemplate,
- IngestionStorageBackendTemplate,
- PartyStorageBackendTemplate,
-}
-import com.daml.platform.store.backend.{
- CompletionStorageBackend,
- ContractStorageBackend,
- DBLockStorageBackend,
- DataSourceStorageBackend,
- EventStorageBackend,
- IngestionStorageBackend,
- PartyStorageBackend,
- ResetStorageBackend,
- StorageBackendFactory,
-}
+import com.daml.platform.store.backend.common._
+import com.daml.platform.store.backend._
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.interning.StringInterning
@@ -59,4 +43,5 @@ object PostgresStorageBackendFactory
override val createResetStorageBackend: ResetStorageBackend =
PostgresResetStorageBackend
+
}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala
index 8077c154adad..17d8be294901 100644
--- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala
@@ -3,8 +3,6 @@
package com.daml.platform.store.backend
-import java.sql.Connection
-
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.h2.H2StorageBackendFactory
@@ -16,6 +14,8 @@ import com.daml.testing.oracle.OracleAroundAll
import com.daml.testing.postgresql.PostgresAroundAll
import org.scalatest.Suite
+import java.sql.Connection
+
/** Creates a database and a [[TestBackend]].
* Used by [[StorageBackendSpec]] to run all StorageBackend tests on different databases.
*/
@@ -76,6 +76,7 @@ private[backend] trait StorageBackendProviderOracle
case class TestBackend(
ingestion: IngestionStorageBackend[_],
parameter: ParameterStorageBackend,
+ meteringParameter: MeteringParameterStorageBackend,
configuration: ConfigurationStorageBackend,
party: PartyStorageBackend,
packageBackend: PackageStorageBackend,
@@ -90,16 +91,30 @@ case class TestBackend(
ledgerEndCache: MutableLedgerEndCache,
stringInterningSupport: MockStringInterning,
userManagement: UserManagementStorageBackend,
- metering: MeteringStorageBackend,
+ metering: TestMeteringBackend,
+)
+
+case class TestMeteringBackend(
+ read: MeteringStorageReadBackend,
+ write: MeteringStorageWriteBackend,
)
object TestBackend {
def apply(storageBackendFactory: StorageBackendFactory): TestBackend = {
val ledgerEndCache = MutableLedgerEndCache()
val stringInterning = new MockStringInterning
+
+ def createTestMeteringBackend: TestMeteringBackend = {
+ TestMeteringBackend(
+ read = storageBackendFactory.createMeteringStorageReadBackend(ledgerEndCache),
+ write = storageBackendFactory.createMeteringStorageWriteBackend,
+ )
+ }
+
TestBackend(
ingestion = storageBackendFactory.createIngestionStorageBackend,
parameter = storageBackendFactory.createParameterStorageBackend,
+ meteringParameter = storageBackendFactory.createMeteringParameterStorageBackend,
configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache),
party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache),
packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache),
@@ -115,7 +130,8 @@ object TestBackend {
ledgerEndCache = ledgerEndCache,
stringInterningSupport = stringInterning,
userManagement = storageBackendFactory.createUserManagementStorageBackend,
- metering = storageBackendFactory.createMeteringStorageBackend(ledgerEndCache),
+ metering = createTestMeteringBackend,
)
}
+
}
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala
index 768d74db65bc..e81a4a283b8b 100644
--- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala
@@ -119,7 +119,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
)
)
- val metering1 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
+ val metering1 =
+ executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
// Restart the indexer - should delete data from the partial insert above
val end2 = executeSql(backend.parameter.ledgerEnd)
@@ -154,7 +155,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
)
)
- val metering2 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
+ val metering2 =
+ executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
parties1 should have length 1
packages1 should have size 1
diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala
index a3b7ef2d621c..5225bc6ed8f7 100644
--- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala
+++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala
@@ -3,10 +3,15 @@
package com.daml.platform.store.backend
-import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
+import com.daml.ledger.offset.Offset
+import com.daml.ledger.participant.state.index.v2.MeteringStore.{
+ ParticipantMetering,
+ TransactionMetering,
+}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
+import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}
@@ -20,7 +25,7 @@ private[backend] trait StorageBackendTestsMetering
import StorageBackendTestValues._
{
- behavior of "StorageBackend (metering)"
+ behavior of "StorageBackend (read metering)"
it should "persist transaction metering" in {
@@ -38,7 +43,7 @@ private[backend] trait StorageBackendTestsMetering
executeSql(ingest(Vector(dtoTransactionMetering(metering)), _))
executeSql(updateLedgerEnd(toOffset, 5L))
val Vector(actual) =
- executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
+ executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
actual shouldBe expected
}
@@ -90,7 +95,9 @@ private[backend] trait StorageBackendTestsMetering
populate()
val from = someTime.addMicros(fromIdx)
val to = toIdx.map(someTime.addMicros)
- val actual = executeSql(backend.metering.transactionMetering(from, to, applicationId)).toSet
+ val actual = executeSql(
+ backend.metering.read.transactionMetering(from, to, applicationId)
+ ).toSet
val expected = execute(from, to, applicationId)
actual.map(_.ledgerOffset) shouldBe expected.map(_.ledgerOffset)
actual shouldBe expected
@@ -106,4 +113,109 @@ private[backend] trait StorageBackendTestsMetering
check(2, None, Some(appIdA))
}
}
+
+ {
+ behavior of "StorageBackend (metering parameters)"
+
+ val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch)
+
+ it should "fetch un-initialized ledger metering end" in {
+ executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe None
+ }
+
+ it should "initialized ledger metering end" in {
+ val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch)
+ executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(expected))
+ executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
+ }
+
+ it should "update ledger metering end with `before begin` offset" in {
+ executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd))
+ val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.now())
+ executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected))
+ executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
+ }
+
+ it should "update ledger metering end with valid offset" in {
+ executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd))
+ val expected = LedgerMeteringEnd(
+ Offset.fromHexString(Ref.HexString.assertFromString("07")),
+ Timestamp.now(),
+ )
+ executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected))
+ executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
+ }
+
+ }
+
+ {
+ behavior of "StorageBackend (write metering)"
+
+ val metering = Vector(7L, 8L, 9L, 10L).map { i =>
+ TransactionMetering(
+ someApplicationId,
+ actionCount = 1,
+ meteringTimestamp = someTime.addMicros(i),
+ ledgerOffset = offset(i),
+ )
+ }
+
+ val meteringOffsets = metering.map(_.ledgerOffset)
+ val firstOffset = meteringOffsets.min
+ val lastOffset = meteringOffsets.max
+ val lastTime = metering.map(_.meteringTimestamp).max
+
+ it should "return the maximum transaction metering offset" in {
+
+ def check(from: Offset, to: Timestamp): Assertion = {
+ val expected = metering
+ .filter(_.ledgerOffset > from)
+ .filter(_.meteringTimestamp < to)
+ .map(_.ledgerOffset)
+ .maxOption
+ val actual = executeSql(backend.metering.write.transactionMeteringMaxOffset(from, to))
+ actual shouldBe expected
+ }
+
+ executeSql(ingest(metering.map(dtoTransactionMetering), _))
+
+ check(firstOffset, lastTime) // 9
+ check(firstOffset, lastTime.addMicros(1)) // 10
+ check(lastOffset, lastTime.addMicros(1)) // Unset
+
+ }
+
+ it should "select transaction metering for aggregation" in {
+
+ executeSql(ingest(metering.map(dtoTransactionMetering), _))
+
+ val nextLastOffset: Offset = meteringOffsets.filter(_ < lastOffset).max
+ val expected = meteringOffsets.filter(_ > firstOffset).filter(_ <= nextLastOffset).toSet
+ val actual = executeSql(
+ backend.metering.write.transactionMetering(firstOffset, nextLastOffset)
+ ).map(_.ledgerOffset).toSet
+ actual shouldBe expected
+ }
+
+ it should "insert new participant metering records" in {
+
+ val expected = Vector(7L, 8L, 9L).map { i =>
+ ParticipantMetering(
+ someApplicationId,
+ someTime.addMicros(i),
+ someTime.addMicros(i + 1),
+ actionCount = 1,
+ ledgerOffset = offset(i),
+ )
+ }
+
+ executeSql(backend.metering.write.insertParticipantMetering(expected))
+ val actual =
+ executeSql(backend.metering.write.allParticipantMetering()).sortBy(_.ledgerOffset)
+ actual shouldBe expected
+
+ }
+
+ }
+
}
diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala
new file mode 100644
index 000000000000..e7e31d260565
--- /dev/null
+++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/MeteringAggregatorSpec.scala
@@ -0,0 +1,235 @@
+// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+package com.daml.platform.indexer
+
+import com.codahale.metrics.MetricRegistry
+import com.daml.ledger.offset.Offset
+import com.daml.ledger.participant.state.index.v2.MeteringStore.{
+ ParticipantMetering,
+ TransactionMetering,
+}
+import com.daml.lf.data.Ref
+import com.daml.lf.data.Time.Timestamp
+import com.daml.logging.LoggingContext
+import com.daml.metrics.{DatabaseMetrics, Metrics}
+import com.daml.platform.store.appendonlydao.SqlExecutor
+import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
+import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
+import com.daml.platform.store.backend.{
+ MeteringParameterStorageBackend,
+ MeteringStorageWriteBackend,
+ ParameterStorageBackend,
+}
+import org.mockito.ArgumentMatchersSugar.any
+import org.mockito.MockitoSugar
+import org.mockito.captor.ArgCaptor
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import java.sql.Connection
+import java.time.temporal.ChronoUnit
+import java.time.{LocalDate, LocalTime, OffsetDateTime, ZoneOffset}
+import scala.concurrent.Future
+
+//noinspection TypeAnnotation
+final class MeteringAggregatorSpec extends AnyWordSpecLike with MockitoSugar with Matchers {
+
+ private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
+// private implicit val ec = scala.concurrent.ExecutionContext.global
+ private val metrics = new Metrics(new MetricRegistry)
+ private def toTS(t: OffsetDateTime): Timestamp = Timestamp.assertFromInstant(t.toInstant)
+
+ "MeteringAggregator" should {
+
+ val applicationA = Ref.ApplicationId.assertFromString("appA")
+ val applicationB = Ref.ApplicationId.assertFromString("appB")
+
+ class TestSetup {
+
+ val lastAggEndTime: OffsetDateTime =
+ OffsetDateTime.of(LocalDate.now(), LocalTime.of(15, 0), ZoneOffset.UTC)
+ val nextAggEndTime: OffsetDateTime = lastAggEndTime.plusHours(1)
+ val timeNow: OffsetDateTime = lastAggEndTime.plusHours(1).plusMinutes(+5)
+ val lastAggOffset: Offset = Offset.fromHexString(Ref.HexString.assertFromString("01"))
+
+ val conn: Connection = mock[Connection]
+ val dispatcher: SqlExecutor = new SqlExecutor {
+ override def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
+ loggingContext: LoggingContext
+ ): Future[T] = Future.successful {
+ sql(conn)
+ }
+ }
+
+ val parameterStore: ParameterStorageBackend = mock[ParameterStorageBackend]
+ val meteringParameterStore: MeteringParameterStorageBackend =
+ mock[MeteringParameterStorageBackend]
+ val meteringStore: MeteringStorageWriteBackend = mock[MeteringStorageWriteBackend]
+
+ def runUnderTest(
+ transactionMetering: Vector[TransactionMetering],
+ maybeLedgerEnd: Option[Offset] = None,
+ ): Future[Unit] = {
+
+ val ledgerEndOffset = (maybeLedgerEnd, transactionMetering.lastOption) match {
+ case (Some(le), _) => le
+ case (None, Some(t)) => t.ledgerOffset
+ case (None, None) => lastAggOffset
+ }
+
+ when(meteringParameterStore.ledgerMeteringEnd(conn))
+ .thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime))))
+
+ when(meteringStore.transactionMeteringMaxOffset(lastAggOffset, toTS(nextAggEndTime))(conn))
+ .thenReturn(transactionMetering.lastOption.map(_.ledgerOffset))
+
+ when(parameterStore.ledgerEnd(conn))
+ .thenReturn(LedgerEnd(ledgerEndOffset, 0L, 0))
+
+ transactionMetering.lastOption.map { last =>
+ when(meteringStore.transactionMetering(lastAggOffset, last.ledgerOffset)(conn))
+ .thenReturn(transactionMetering)
+ }
+
+ new MeteringAggregator(
+ meteringStore,
+ parameterStore,
+ meteringParameterStore,
+ metrics,
+ dispatcher,
+ () => toTS(timeNow),
+ )
+ .run()
+
+ }
+ }
+
+ "aggregate transaction metering records" in new TestSetup {
+
+ val transactionMetering = Vector(10, 15, 20).map { i =>
+ TransactionMetering(
+ applicationId = applicationA,
+ actionCount = i,
+ meteringTimestamp = toTS(lastAggEndTime.plusMinutes(i.toLong)),
+ ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString(i.toString)),
+ )
+ }
+
+ val expected: ParticipantMetering = ParticipantMetering(
+ applicationA,
+ from = toTS(lastAggEndTime),
+ to = toTS(nextAggEndTime),
+ actionCount = transactionMetering.map(_.actionCount).sum,
+ transactionMetering.last.ledgerOffset,
+ )
+
+ runUnderTest(transactionMetering)
+
+ verify(meteringStore).insertParticipantMetering(Vector(expected))(conn)
+ verify(meteringParameterStore).updateLedgerMeteringEnd(
+ LedgerMeteringEnd(expected.ledgerOffset, expected.to)
+ )(conn)
+
+ }
+
+ "not aggregate if there is not a full period" in new TestSetup {
+ override val timeNow = lastAggEndTime.plusHours(1).plusMinutes(-5)
+ when(meteringParameterStore.ledgerMeteringEnd(conn))
+ .thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime))))
+ runUnderTest(Vector.empty)
+ verifyNoMoreInteractions(meteringStore)
+ }
+
+ "aggregate over multiple applications" in new TestSetup {
+
+ val expected = Set(applicationA, applicationB)
+
+ val transactionMetering = expected.toVector.map { a =>
+ TransactionMetering(
+ applicationId = a,
+ actionCount = 1,
+ meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)),
+ ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("10")),
+ )
+ }
+
+ runUnderTest(transactionMetering)
+
+ val participantMeteringCaptor = ArgCaptor[Vector[ParticipantMetering]]
+ verify(meteringStore).insertParticipantMetering(participantMeteringCaptor)(any[Connection])
+ participantMeteringCaptor.value.map(_.applicationId).toSet shouldBe expected
+
+ }
+
+ "increase ledger metering end even if there are not transaction metering records" in new TestSetup {
+
+ runUnderTest(Vector.empty[TransactionMetering])
+
+ verify(meteringParameterStore).updateLedgerMeteringEnd(
+ LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime.plusHours(1)))
+ )(conn)
+
+ }
+
+ "skip aggregation if the last transaction metering offset within the time range has not been fully ingested" in new TestSetup {
+
+ val transactionMetering = Vector(
+ TransactionMetering(
+ applicationId = applicationA,
+ actionCount = 1,
+ meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)),
+ ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("03")),
+ )
+ )
+
+ runUnderTest(
+ transactionMetering,
+ maybeLedgerEnd = Some(Offset.fromHexString(Ref.HexString.assertFromString("02"))),
+ )
+
+ verify(meteringParameterStore, never).updateLedgerMeteringEnd(any[LedgerMeteringEnd])(
+ any[Connection]
+ )
+
+ }
+
+ "fail if an attempt is made to run un-initialized" in new TestSetup {
+ // Note this only works as we do not use a real future for testing
+ intercept[IllegalStateException] {
+ when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None)
+ val underTest =
+ new MeteringAggregator(
+ meteringStore,
+ parameterStore,
+ meteringParameterStore,
+ metrics,
+ dispatcher,
+ () => toTS(timeNow),
+ )
+ underTest.run()
+ }
+ }
+
+ "initialize the metering ledger end to the hour before the current hour" in new TestSetup {
+ when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None)
+ val underTest =
+ new MeteringAggregator(
+ meteringStore,
+ parameterStore,
+ meteringParameterStore,
+ metrics,
+ dispatcher,
+ () => toTS(timeNow),
+ )
+ underTest.initialize()
+ val expected = LedgerMeteringEnd(
+ Offset.beforeBegin,
+ toTS(timeNow.truncatedTo(ChronoUnit.HOURS).minusHours(1)),
+ )
+ verify(meteringParameterStore).initializeLedgerMeteringEnd(expected)(conn)
+ }
+
+ }
+
+}
diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala
index 79c43e369e13..4e8ed60ca2b3 100644
--- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala
+++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala
@@ -4,7 +4,6 @@
package com.daml.platform.store.appendonlydao
import java.sql.Connection
-
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala
index c7b248b558b6..cda36f035a09 100644
--- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala
+++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala
@@ -21,10 +21,20 @@ trait MeteringStore {
}
object MeteringStore {
+
case class TransactionMetering(
applicationId: Ref.ApplicationId,
actionCount: Int,
meteringTimestamp: Timestamp,
ledgerOffset: Offset,
)
+
+ case class ParticipantMetering(
+ applicationId: Ref.ApplicationId,
+ from: Timestamp,
+ to: Timestamp,
+ actionCount: Int,
+ ledgerOffset: Offset,
+ )
+
}