Skip to content

Commit

Permalink
Metering Aggregation [DPP-818] (digital-asset#12723)
Browse files Browse the repository at this point in the history
* Add support for aggregated transaction metering

changelog_begin
Support added for aggregated transaction metering
changelog_end

* Update with review comments
  • Loading branch information
simonmaxen-da authored Feb 14, 2022
1 parent c2a6397 commit cc4c06c
Show file tree
Hide file tree
Showing 26 changed files with 1,005 additions and 76 deletions.
5 changes: 5 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ final class Metrics(val registry: MetricRegistry) {
"loadStringInterningEntries"
)

val meteringAggregator: DatabaseMetrics = createDbMetrics("metering_aggregator")
val initializeMeteringAggregator: DatabaseMetrics = createDbMetrics(
"initialize_metering_aggregator"
)

object translation {
private val Prefix: MetricName = db.Prefix :+ "translation"
val cache = new CacheMetrics(registry, Prefix :+ "cache")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6b18e00f6dc6d79123a115a080b4092fc94350cf582f9373e122837f7ef741e7
40d6f8974169a2d8702b31a9edbe2e22527aa837f0f4057922814631f40510ef
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,18 @@ CREATE TABLE transaction_metering (
);

CREATE INDEX transaction_metering_ledger_offset ON transaction_metering(ledger_offset);

CREATE TABLE metering_parameters (
ledger_metering_end VARCHAR,
ledger_metering_timestamp BIGINT NOT NULL
);

CREATE TABLE participant_metering (
application_id VARCHAR NOT NULL,
from_timestamp BIGINT NOT NULL,
to_timestamp BIGINT NOT NULL,
action_count INTEGER NOT NULL,
ledger_offset VARCHAR NOT NULL
);

CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
45e3500305c86bf7928f78ba6b97572cf8f900e18805e181b44d7837fe0c3c09
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Transaction metering alterations
ALTER TABLE transaction_metering MODIFY application_id VARCHAR2(4000);

-- Create metering parameters
CREATE TABLE metering_parameters (
ledger_metering_end VARCHAR2(4000),
ledger_metering_timestamp NUMBER NOT NULL
);

-- Create participant metering
CREATE TABLE participant_metering (
application_id VARCHAR2(4000) NOT NULL,
from_timestamp NUMBER NOT NULL,
to_timestamp NUMBER NOT NULL,
action_count NUMBER NOT NULL,
ledger_offset VARCHAR2(4000) NOT NULL
);

CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);


Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
08ea4e020bc4ed309d0345d1d0b9e4cbe4f25edc79d3415a05538cacdedb1470
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Transaction metering alterations
SELECT 'Harmonise the transaction_metering columns with other examples in the db';

ALTER TABLE transaction_metering ALTER application_id TYPE TEXT;
ALTER TABLE transaction_metering ALTER ledger_offset TYPE TEXT;

-- Parameter alterations
SELECT 'Add Metering Parameters: Creating table...';

-- Create metering parameters
CREATE TABLE metering_parameters (
ledger_metering_end TEXT,
ledger_metering_timestamp BIGINT NOT NULL
);

-- Create participant metering
SELECT 'Add Participant Metering: Creating table...';

CREATE TABLE participant_metering (
application_id TEXT NOT NULL,
from_timestamp BIGINT NOT NULL,
to_timestamp BIGINT NOT NULL,
action_count INTEGER NOT NULL,
ledger_offset TEXT NOT NULL
);

SELECT 'Add Participant Metering: Creating indexes...';

CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);

SELECT 'Add Participant Metering: Done.';
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object JdbcIndexer {
val factory = StorageBackendFactory.of(DbType.jdbcType(config.jdbcUrl))
val dataSourceStorageBackend = factory.createDataSourceStorageBackend
val ingestionStorageBackend = factory.createIngestionStorageBackend
val meteringStoreBackend = factory.createMeteringStorageWriteBackend
val parameterStorageBackend = factory.createParameterStorageBackend
val meteringParameterStorageBackend = factory.createMeteringParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
Expand Down Expand Up @@ -92,9 +94,16 @@ object JdbcIndexer {
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
meteringAggregator = new MeteringAggregator.Owner(
meteringStore = meteringStoreBackend,
meteringParameterStore = meteringParameterStorageBackend,
parameterStore = parameterStorageBackend,
metrics = metrics,
).apply,
mat = materializer,
readService = readService,
)

indexer
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer

import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore
import com.daml.ledger.participant.state.index.v2.MeteringStore.ParticipantMetering
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.indexer.MeteringAggregator.{toOffsetDateTime, toTimestamp}
import com.daml.platform.store.appendonlydao.SqlExecutor
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.{
MeteringParameterStorageBackend,
MeteringStorageWriteBackend,
ParameterStorageBackend,
}

import java.sql.Connection
import java.time.temporal.ChronoUnit
import java.time.{OffsetDateTime, ZoneOffset}
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object MeteringAggregator {

private val logger = ContextualizedLogger.get(getClass)

class Owner(
meteringStore: MeteringStorageWriteBackend,
parameterStore: ParameterStorageBackend,
meteringParameterStore: MeteringParameterStorageBackend,
metrics: Metrics,
period: FiniteDuration = 6.minutes,
maxTaskDuration: FiniteDuration = 6.hours,
) {

private[platform] def apply(
sqlExecutor: SqlExecutor
)(implicit loggingContext: LoggingContext): ResourceOwner[Unit] = {
val aggregator = new MeteringAggregator(
meteringStore,
parameterStore,
meteringParameterStore,
metrics,
sqlExecutor,
)
for {
_ <- ResourceOwner.forFuture(() => aggregator.initialize())
_ <- ResourceOwner.forTimer(() => new Timer()).map { timer =>
timer.scheduleAtFixedRate(
new TimerTask {
override def run(): Unit = {
Try {
Await.ready(aggregator.run(), maxTaskDuration)
} match {
case Success(_) => ()
case Failure(e) =>
logger.error(s"Metering not aggregated after ${maxTaskDuration}", e)
}
}
},
period.toMillis,
period.toMillis,
)
}
} yield ()
}
}

private def toTimestamp(dateTime: OffsetDateTime): Timestamp =
Timestamp.assertFromInstant(dateTime.toInstant)
private def toOffsetDateTime(timestamp: Timestamp): OffsetDateTime =
OffsetDateTime.ofInstant(timestamp.toInstant, ZoneOffset.UTC)

}

class MeteringAggregator(
meteringStore: MeteringStorageWriteBackend,
parameterStore: ParameterStorageBackend,
meteringParameterStore: MeteringParameterStorageBackend,
metrics: Metrics,
sqlExecutor: SqlExecutor,
clock: () => Timestamp = () => Timestamp.now(),
)(implicit loggingContext: LoggingContext) {

private val parasitic: ExecutionContext = ExecutionContext.parasitic

private val logger = ContextualizedLogger.get(getClass)

private[platform] def initialize(): Future[Unit] = {
val initTimestamp = toOffsetDateTime(clock()).truncatedTo(ChronoUnit.HOURS).minusHours(1)
val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, toTimestamp(initTimestamp))
sqlExecutor.executeSql(metrics.daml.index.db.initializeMeteringAggregator) {
meteringParameterStore.initializeLedgerMeteringEnd(initLedgerMeteringEnd)
}
}

private[platform] def run(): Future[Unit] = {

val future = sqlExecutor.executeSql(metrics.daml.index.db.meteringAggregator) { conn =>
val nowUtcTime = toOffsetDateTime(clock())
val lastLedgerMeteringEnd = getLedgerMeteringEnd(conn)
val startUtcTime: OffsetDateTime = toOffsetDateTime(lastLedgerMeteringEnd.timestamp)
val endUtcTime = startUtcTime.plusHours(1)

if (nowUtcTime.isAfter(endUtcTime)) {

val toEndTime = toTimestamp(endUtcTime)
val ingestedLedgerEnd = parameterStore.ledgerEnd(conn).lastOffset
val maybeMaxOffset =
meteringStore.transactionMeteringMaxOffset(lastLedgerMeteringEnd.offset, toEndTime)(conn)

val (
periodIngested, // This is true if the time period is closed fully ingested
hasMetering, // This is true if there are transaction_metering records to aggregate
toOffsetEnd, // This is the 'to' offset for the period being aggregated
) = maybeMaxOffset match {
case Some(offset) => (offset <= ingestedLedgerEnd, true, offset)
case None => (true, false, lastLedgerMeteringEnd.offset)
}

if (periodIngested) {
Some(
aggregate(
conn = conn,
lastLedgerMeteringEnd = lastLedgerMeteringEnd,
thisLedgerMeteringEnd = LedgerMeteringEnd(toOffsetEnd, toEndTime),
hasMetering = hasMetering,
)
)
} else {
logger.info("Not all transaction metering for aggregation time period is yet ingested")
None
}
} else {
None
}
}

future.onComplete({
case Success(None) => logger.info("No transaction metering aggregation required")
case Success(Some(lme)) =>
logger.info(s"Aggregating transaction metering completed up to $lme")
case Failure(e) => logger.error("Failed to aggregate transaction metering", e)
})(parasitic)

future.map(_ => ())(parasitic)
}

private def aggregate(
conn: Connection,
lastLedgerMeteringEnd: LedgerMeteringEnd,
thisLedgerMeteringEnd: LedgerMeteringEnd,
hasMetering: Boolean,
): LedgerMeteringEnd = {
logger.info(s"Aggregating transaction metering for $thisLedgerMeteringEnd")

if (hasMetering) {
populateParticipantMetering(conn, lastLedgerMeteringEnd, thisLedgerMeteringEnd)
}

meteringParameterStore.updateLedgerMeteringEnd(thisLedgerMeteringEnd)(conn)

thisLedgerMeteringEnd
}

private def getLedgerMeteringEnd(conn: Connection): LedgerMeteringEnd =
meteringParameterStore.ledgerMeteringEnd(conn).getOrElse {
throw new IllegalStateException("Ledger metering is not initialized")
}

private def populateParticipantMetering(
conn: Connection,
lastLedgerMeteringEnd: LedgerMeteringEnd,
thisLedgerMeteringEnd: LedgerMeteringEnd,
): Unit = {

val transactionMetering: Seq[MeteringStore.TransactionMetering] =
meteringStore.transactionMetering(lastLedgerMeteringEnd.offset, thisLedgerMeteringEnd.offset)(
conn
)

val participantMetering = transactionMetering
.groupBy(_.applicationId)
.map { case (applicationId, metering) =>
ParticipantMetering(
applicationId = applicationId,
from = lastLedgerMeteringEnd.timestamp,
to = thisLedgerMeteringEnd.timestamp,
actionCount = metering.map(_.actionCount).sum,
ledgerOffset = metering.map(_.ledgerOffset).max,
)
}
.toVector

meteringStore.insertParticipantMetering(participantMetering)(conn)
}
}
Loading

0 comments on commit cc4c06c

Please sign in to comment.