forked from digital-asset/daml
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Metering Aggregation [DPP-818] (digital-asset#12723)
* Add support for aggregated transaction metering changelog_begin Support added for aggregated transaction metering changelog_end * Update with review comments
- Loading branch information
1 parent
c2a6397
commit cc4c06c
Showing
26 changed files
with
1,005 additions
and
76 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
...n-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
6b18e00f6dc6d79123a115a080b4092fc94350cf582f9373e122837f7ef741e7 | ||
40d6f8974169a2d8702b31a9edbe2e22527aa837f0f4057922814631f40510ef |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
...on-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sha256
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
45e3500305c86bf7928f78ba6b97572cf8f900e18805e181b44d7837fe0c3c09 |
24 changes: 24 additions & 0 deletions
24
...ation-api/src/main/resources/db/migration/oracle-appendonly/V12__participant_metering.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
-- SPDX-License-Identifier: Apache-2.0 | ||
|
||
-- Transaction metering alterations | ||
ALTER TABLE transaction_metering MODIFY application_id VARCHAR2(4000); | ||
|
||
-- Create metering parameters | ||
CREATE TABLE metering_parameters ( | ||
ledger_metering_end VARCHAR2(4000), | ||
ledger_metering_timestamp NUMBER NOT NULL | ||
); | ||
|
||
-- Create participant metering | ||
CREATE TABLE participant_metering ( | ||
application_id VARCHAR2(4000) NOT NULL, | ||
from_timestamp NUMBER NOT NULL, | ||
to_timestamp NUMBER NOT NULL, | ||
action_count NUMBER NOT NULL, | ||
ledger_offset VARCHAR2(4000) NOT NULL | ||
); | ||
|
||
CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id); | ||
|
||
|
1 change: 1 addition & 0 deletions
1
...api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sha256
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
08ea4e020bc4ed309d0345d1d0b9e4cbe4f25edc79d3415a05538cacdedb1470 |
35 changes: 35 additions & 0 deletions
35
...on-api/src/main/resources/db/migration/postgres-appendonly/V122__participant_metering.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
|
||
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
-- SPDX-License-Identifier: Apache-2.0 | ||
|
||
-- Transaction metering alterations | ||
SELECT 'Harmonise the transaction_metering columns with other examples in the db'; | ||
|
||
ALTER TABLE transaction_metering ALTER application_id TYPE TEXT; | ||
ALTER TABLE transaction_metering ALTER ledger_offset TYPE TEXT; | ||
|
||
-- Parameter alterations | ||
SELECT 'Add Metering Parameters: Creating table...'; | ||
|
||
-- Create metering parameters | ||
CREATE TABLE metering_parameters ( | ||
ledger_metering_end TEXT, | ||
ledger_metering_timestamp BIGINT NOT NULL | ||
); | ||
|
||
-- Create participant metering | ||
SELECT 'Add Participant Metering: Creating table...'; | ||
|
||
CREATE TABLE participant_metering ( | ||
application_id TEXT NOT NULL, | ||
from_timestamp BIGINT NOT NULL, | ||
to_timestamp BIGINT NOT NULL, | ||
action_count INTEGER NOT NULL, | ||
ledger_offset TEXT NOT NULL | ||
); | ||
|
||
SELECT 'Add Participant Metering: Creating indexes...'; | ||
|
||
CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id); | ||
|
||
SELECT 'Add Participant Metering: Done.'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
204 changes: 204 additions & 0 deletions
204
ledger/participant-integration-api/src/main/scala/platform/indexer/MeteringAggregator.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.indexer | ||
|
||
import com.daml.ledger.offset.Offset | ||
import com.daml.ledger.participant.state.index.v2.MeteringStore | ||
import com.daml.ledger.participant.state.index.v2.MeteringStore.ParticipantMetering | ||
import com.daml.ledger.resources.ResourceOwner | ||
import com.daml.lf.data.Time.Timestamp | ||
import com.daml.logging.{ContextualizedLogger, LoggingContext} | ||
import com.daml.metrics.Metrics | ||
import com.daml.platform.indexer.MeteringAggregator.{toOffsetDateTime, toTimestamp} | ||
import com.daml.platform.store.appendonlydao.SqlExecutor | ||
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd | ||
import com.daml.platform.store.backend.{ | ||
MeteringParameterStorageBackend, | ||
MeteringStorageWriteBackend, | ||
ParameterStorageBackend, | ||
} | ||
|
||
import java.sql.Connection | ||
import java.time.temporal.ChronoUnit | ||
import java.time.{OffsetDateTime, ZoneOffset} | ||
import java.util.{Timer, TimerTask} | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
import scala.util.{Failure, Success, Try} | ||
|
||
object MeteringAggregator { | ||
|
||
private val logger = ContextualizedLogger.get(getClass) | ||
|
||
class Owner( | ||
meteringStore: MeteringStorageWriteBackend, | ||
parameterStore: ParameterStorageBackend, | ||
meteringParameterStore: MeteringParameterStorageBackend, | ||
metrics: Metrics, | ||
period: FiniteDuration = 6.minutes, | ||
maxTaskDuration: FiniteDuration = 6.hours, | ||
) { | ||
|
||
private[platform] def apply( | ||
sqlExecutor: SqlExecutor | ||
)(implicit loggingContext: LoggingContext): ResourceOwner[Unit] = { | ||
val aggregator = new MeteringAggregator( | ||
meteringStore, | ||
parameterStore, | ||
meteringParameterStore, | ||
metrics, | ||
sqlExecutor, | ||
) | ||
for { | ||
_ <- ResourceOwner.forFuture(() => aggregator.initialize()) | ||
_ <- ResourceOwner.forTimer(() => new Timer()).map { timer => | ||
timer.scheduleAtFixedRate( | ||
new TimerTask { | ||
override def run(): Unit = { | ||
Try { | ||
Await.ready(aggregator.run(), maxTaskDuration) | ||
} match { | ||
case Success(_) => () | ||
case Failure(e) => | ||
logger.error(s"Metering not aggregated after ${maxTaskDuration}", e) | ||
} | ||
} | ||
}, | ||
period.toMillis, | ||
period.toMillis, | ||
) | ||
} | ||
} yield () | ||
} | ||
} | ||
|
||
private def toTimestamp(dateTime: OffsetDateTime): Timestamp = | ||
Timestamp.assertFromInstant(dateTime.toInstant) | ||
private def toOffsetDateTime(timestamp: Timestamp): OffsetDateTime = | ||
OffsetDateTime.ofInstant(timestamp.toInstant, ZoneOffset.UTC) | ||
|
||
} | ||
|
||
class MeteringAggregator( | ||
meteringStore: MeteringStorageWriteBackend, | ||
parameterStore: ParameterStorageBackend, | ||
meteringParameterStore: MeteringParameterStorageBackend, | ||
metrics: Metrics, | ||
sqlExecutor: SqlExecutor, | ||
clock: () => Timestamp = () => Timestamp.now(), | ||
)(implicit loggingContext: LoggingContext) { | ||
|
||
private val parasitic: ExecutionContext = ExecutionContext.parasitic | ||
|
||
private val logger = ContextualizedLogger.get(getClass) | ||
|
||
private[platform] def initialize(): Future[Unit] = { | ||
val initTimestamp = toOffsetDateTime(clock()).truncatedTo(ChronoUnit.HOURS).minusHours(1) | ||
val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, toTimestamp(initTimestamp)) | ||
sqlExecutor.executeSql(metrics.daml.index.db.initializeMeteringAggregator) { | ||
meteringParameterStore.initializeLedgerMeteringEnd(initLedgerMeteringEnd) | ||
} | ||
} | ||
|
||
private[platform] def run(): Future[Unit] = { | ||
|
||
val future = sqlExecutor.executeSql(metrics.daml.index.db.meteringAggregator) { conn => | ||
val nowUtcTime = toOffsetDateTime(clock()) | ||
val lastLedgerMeteringEnd = getLedgerMeteringEnd(conn) | ||
val startUtcTime: OffsetDateTime = toOffsetDateTime(lastLedgerMeteringEnd.timestamp) | ||
val endUtcTime = startUtcTime.plusHours(1) | ||
|
||
if (nowUtcTime.isAfter(endUtcTime)) { | ||
|
||
val toEndTime = toTimestamp(endUtcTime) | ||
val ingestedLedgerEnd = parameterStore.ledgerEnd(conn).lastOffset | ||
val maybeMaxOffset = | ||
meteringStore.transactionMeteringMaxOffset(lastLedgerMeteringEnd.offset, toEndTime)(conn) | ||
|
||
val ( | ||
periodIngested, // This is true if the time period is closed fully ingested | ||
hasMetering, // This is true if there are transaction_metering records to aggregate | ||
toOffsetEnd, // This is the 'to' offset for the period being aggregated | ||
) = maybeMaxOffset match { | ||
case Some(offset) => (offset <= ingestedLedgerEnd, true, offset) | ||
case None => (true, false, lastLedgerMeteringEnd.offset) | ||
} | ||
|
||
if (periodIngested) { | ||
Some( | ||
aggregate( | ||
conn = conn, | ||
lastLedgerMeteringEnd = lastLedgerMeteringEnd, | ||
thisLedgerMeteringEnd = LedgerMeteringEnd(toOffsetEnd, toEndTime), | ||
hasMetering = hasMetering, | ||
) | ||
) | ||
} else { | ||
logger.info("Not all transaction metering for aggregation time period is yet ingested") | ||
None | ||
} | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
future.onComplete({ | ||
case Success(None) => logger.info("No transaction metering aggregation required") | ||
case Success(Some(lme)) => | ||
logger.info(s"Aggregating transaction metering completed up to $lme") | ||
case Failure(e) => logger.error("Failed to aggregate transaction metering", e) | ||
})(parasitic) | ||
|
||
future.map(_ => ())(parasitic) | ||
} | ||
|
||
private def aggregate( | ||
conn: Connection, | ||
lastLedgerMeteringEnd: LedgerMeteringEnd, | ||
thisLedgerMeteringEnd: LedgerMeteringEnd, | ||
hasMetering: Boolean, | ||
): LedgerMeteringEnd = { | ||
logger.info(s"Aggregating transaction metering for $thisLedgerMeteringEnd") | ||
|
||
if (hasMetering) { | ||
populateParticipantMetering(conn, lastLedgerMeteringEnd, thisLedgerMeteringEnd) | ||
} | ||
|
||
meteringParameterStore.updateLedgerMeteringEnd(thisLedgerMeteringEnd)(conn) | ||
|
||
thisLedgerMeteringEnd | ||
} | ||
|
||
private def getLedgerMeteringEnd(conn: Connection): LedgerMeteringEnd = | ||
meteringParameterStore.ledgerMeteringEnd(conn).getOrElse { | ||
throw new IllegalStateException("Ledger metering is not initialized") | ||
} | ||
|
||
private def populateParticipantMetering( | ||
conn: Connection, | ||
lastLedgerMeteringEnd: LedgerMeteringEnd, | ||
thisLedgerMeteringEnd: LedgerMeteringEnd, | ||
): Unit = { | ||
|
||
val transactionMetering: Seq[MeteringStore.TransactionMetering] = | ||
meteringStore.transactionMetering(lastLedgerMeteringEnd.offset, thisLedgerMeteringEnd.offset)( | ||
conn | ||
) | ||
|
||
val participantMetering = transactionMetering | ||
.groupBy(_.applicationId) | ||
.map { case (applicationId, metering) => | ||
ParticipantMetering( | ||
applicationId = applicationId, | ||
from = lastLedgerMeteringEnd.timestamp, | ||
to = thisLedgerMeteringEnd.timestamp, | ||
actionCount = metering.map(_.actionCount).sum, | ||
ledgerOffset = metering.map(_.ledgerOffset).max, | ||
) | ||
} | ||
.toVector | ||
|
||
meteringStore.insertParticipantMetering(participantMetering)(conn) | ||
} | ||
} |
Oops, something went wrong.