Skip to content

Commit

Permalink
[DPP-142] Explicitly deflate/inflate data outside of the index (digit…
Browse files Browse the repository at this point in the history
…al-asset#8646)

* [DPP-142] Explicitly deflate/inflate data outside of the index
* [DPP-142] Explicitly deflate/inflate data outside of the index - review fixes - exposing prepare update parallelism as param

changelog_begin
[Integration Kit] Compression and decompression of stored DAML-LF values
 is now executed outside of the index database, allowing to make more
  efficient use of the participant resources when indexing.
changelog_end
  • Loading branch information
mzagorski-da authored Feb 1, 2021
1 parent 5bb3a6c commit 77f4279
Show file tree
Hide file tree
Showing 29 changed files with 612 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DatabaseMetrics private[metrics] (
val waitTimer: Timer = registry.timer(dbPrefix :+ "wait")
val executionTimer: Timer = registry.timer(dbPrefix :+ "exec")
val translationTimer: Timer = registry.timer(dbPrefix :+ "translation")
val compressionTimer: Timer = registry.timer(dbPrefix :+ "compression")
val commitTimer: Timer = registry.timer(dbPrefix :+ "commit")
val queryTimer: Timer = registry.timer(dbPrefix :+ "query")
}
23 changes: 23 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,29 @@ final class Metrics(val registry: MetricRegistry) {
private val Prefix: MetricName = db.Prefix :+ "translation"
val cache = new CacheMetrics(registry, Prefix :+ "cache")
}

object compression {
private val Prefix: MetricName = db.Prefix :+ "compression"

val createArgumentCompressed: Histogram =
registry.histogram(Prefix :+ "create_argument_compressed")
val createArgumentUncompressed: Histogram =
registry.histogram(Prefix :+ "create_argument_uncompressed")
val createKeyValueCompressed: Histogram =
registry.histogram(Prefix :+ "create_key_value_compressed")
val createKeyValueUncompressed: Histogram =
registry.histogram(Prefix :+ "create_key_value_uncompressed")
val exerciseArgumentCompressed: Histogram =
registry.histogram(Prefix :+ "exercise_argument_compressed")
val exerciseArgumentUncompressed: Histogram =
registry.histogram(Prefix :+ "exercise_argument_uncompressed")
val exerciseResultCompressed: Histogram =
registry.histogram(Prefix :+ "exercise_result_compressed")
val exerciseResultUncompressed: Histogram =
registry.histogram(Prefix :+ "exercise_result_uncompressed")

}

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6d6c21e239c496a64e88da9ef3397219107f0aaa4ef0fc7782bdf5a18ea18f4d
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

ALTER TABLE participant_events ADD COLUMN create_argument_compression SMALLINT;
ALTER TABLE participant_events ADD COLUMN create_key_value_compression SMALLINT;
ALTER TABLE participant_events ADD COLUMN exercise_argument_compression SMALLINT;
ALTER TABLE participant_events ADD COLUMN exercise_result_compression SMALLINT;

ALTER TABLE participant_contracts ADD COLUMN create_argument_compression SMALLINT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ba2ec4d90b2862abcc718d7a1ce470f05ea2c55200d5d4245606e66a349fd949
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- SET STORAGE EXTERNAL is for external (from the standpoint of TOAST), uncompressed data.
--
-- Note that SET STORAGE doesn't itself change anything in the table,
-- it just sets the strategy to be pursued during future table updates.
--
-- `SET STORAGE EXTERNAL` https://www.postgresql.org/docs/13/sql-altertable.html
-- TOAST https://www.postgresql.org/docs/13/storage-toast.html

ALTER TABLE participant_events
ADD COLUMN create_argument_compression SMALLINT,
ALTER COLUMN create_argument SET STORAGE EXTERNAL,
ADD COLUMN create_key_value_compression SMALLINT,
ALTER COLUMN create_key_value SET STORAGE EXTERNAL,
ADD COLUMN exercise_argument_compression SMALLINT,
ALTER COLUMN exercise_argument SET STORAGE EXTERNAL,
ADD COLUMN exercise_result_compression SMALLINT,
ALTER COLUMN exercise_result SET STORAGE EXTERNAL;

ALTER TABLE participant_contracts
ADD COLUMN create_argument_compression SMALLINT,
ALTER COLUMN create_argument SET STORAGE EXTERNAL;
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.platform.indexer

import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Flow
import com.codahale.metrics.Timer
import com.daml.daml_lf_dev.DamlLf
Expand Down Expand Up @@ -33,6 +34,7 @@ object ExecuteUpdate {
LedgerDao,
Metrics,
v1.ParticipantId,
Int,
ExecutionContext,
LoggingContext,
) => ResourceOwner[ExecuteUpdate]
Expand All @@ -42,6 +44,7 @@ object ExecuteUpdate {
ledgerDao: LedgerDao,
metrics: Metrics,
participantId: v1.ParticipantId,
updatePreparationParallelism: Int,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): ResourceOwner[ExecuteUpdate] =
Expand All @@ -51,6 +54,7 @@ object ExecuteUpdate {
ledgerDao,
metrics,
participantId,
updatePreparationParallelism,
executionContext,
loggingContext,
)
Expand All @@ -59,6 +63,7 @@ object ExecuteUpdate {
ledgerDao,
metrics,
participantId,
updatePreparationParallelism,
executionContext,
loggingContext,
)
Expand All @@ -77,6 +82,8 @@ trait ExecuteUpdate {

private[indexer] def metrics: Metrics

private[indexer] def updatePreparationParallelism: Int

private[indexer] def flow: ExecuteUpdateFlow

private[indexer] def prepareUpdate(
Expand Down Expand Up @@ -299,6 +306,7 @@ class PipelinedExecuteUpdate(
private[indexer] val ledgerDao: LedgerDao,
private[indexer] val metrics: Metrics,
private[indexer] val participantId: v1.ParticipantId,
private[indexer] val updatePreparationParallelism: Int,
)(implicit val executionContext: ExecutionContext, val loggingContext: LoggingContext)
extends ExecuteUpdate {

Expand Down Expand Up @@ -372,7 +380,8 @@ class PipelinedExecuteUpdate(

private[indexer] val flow: ExecuteUpdateFlow =
Flow[OffsetUpdate]
.mapAsync(1)(prepareUpdate)
.mapAsync(updatePreparationParallelism)(prepareUpdate)
.buffer(16, OverflowStrategy.backpressure)
.map(PipelinedUpdateWithTimer(_, metrics.daml.index.db.storeTransaction.time()))
.mapAsync(1)(insertTransactionState)
.mapAsync(1)(insertTransactionEvents)
Expand All @@ -385,11 +394,12 @@ object PipelinedExecuteUpdate {
ledgerDao: LedgerDao,
metrics: Metrics,
participantId: v1.ParticipantId,
updatePreparationParallelism: Int,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): ResourceOwner[PipelinedExecuteUpdate] =
ResourceOwner.forValue(() =>
new PipelinedExecuteUpdate(ledgerDao, metrics, participantId)(
new PipelinedExecuteUpdate(ledgerDao, metrics, participantId, updatePreparationParallelism)(
executionContext,
loggingContext,
)
Expand All @@ -406,14 +416,15 @@ class AtomicExecuteUpdate(
private[indexer] val ledgerDao: LedgerDao,
private[indexer] val metrics: Metrics,
private[indexer] val participantId: v1.ParticipantId,
private[indexer] val updatePreparationParallelism: Int,
)(
private[indexer] implicit val loggingContext: LoggingContext,
private[indexer] val executionContext: ExecutionContext,
) extends ExecuteUpdate {

private[indexer] val flow: ExecuteUpdateFlow =
Flow[OffsetUpdate]
.mapAsync(1)(prepareUpdate)
.mapAsync(updatePreparationParallelism)(prepareUpdate)
.mapAsync(1) { case offsetUpdate @ OffsetUpdate(offsetStep, update) =>
withEnrichedLoggingContext(loggingContextFor(offsetStep.offset, update)) {
implicit loggingContext =>
Expand Down Expand Up @@ -465,10 +476,14 @@ object AtomicExecuteUpdate {
ledgerDao: LedgerDao,
metrics: Metrics,
participantId: v1.ParticipantId,
updatePreparationParallelism: Int,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): ResourceOwner[AtomicExecuteUpdate] =
ResourceOwner.forValue(() =>
new AtomicExecuteUpdate(ledgerDao, metrics, participantId)(loggingContext, executionContext)
new AtomicExecuteUpdate(ledgerDao, metrics, participantId, updatePreparationParallelism)(
loggingContext,
executionContext,
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ case class IndexerConfig(
startupMode: IndexerStartupMode,
restartDelay: FiniteDuration = DefaultRestartDelay,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
updatePreparationParallelism: Int = DefaultUpdatePreparationParallelism,
allowExistingSchema: Boolean = false,
)

object IndexerConfig {

val DefaultUpdatePreparationParallelism = 2
val DefaultRestartDelay: FiniteDuration = 10.seconds

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ object JdbcIndexer {
ledgerDao,
metrics,
config.participantId,
config.updatePreparationParallelism,
materializer.executionContext,
loggingContext,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ private[platform] object Conversions {
}
}

object IntToSmallIntConversions {
implicit object IntOptionArrayArrayToStatement extends ToStatement[Array[Option[Int]]] {
override def set(s: PreparedStatement, index: Int, intOpts: Array[Option[Int]]): Unit = {
val conn = s.getConnection
val intOrNullsArray = intOpts.map(_.map(new Integer(_)).orNull)
val ts = conn.createArrayOf("SMALLINT", intOrNullsArray.asInstanceOf[Array[AnyRef]])
s.setArray(index, ts)
}
}
}

implicit object ByteArrayArrayToStatement extends ArrayToStatement[Array[Byte]]("BYTEA")

implicit object CharArrayToStatement extends ArrayToStatement[String]("VARCHAR")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,21 @@ private class JdbcLedgerDao(
private val translation: LfValueTranslation =
new LfValueTranslation(lfValueTranslationCache)

private val compressionStrategy: CompressionStrategy =
CompressionStrategy.AllGZIP

private val compressionMetrics: CompressionMetrics =
CompressionMetrics(metrics)

private val transactionsWriter: TransactionsWriter =
new TransactionsWriter(dbType, metrics, translation, idempotentEntryInserts)
new TransactionsWriter(
dbType,
metrics,
translation,
compressionStrategy,
compressionMetrics,
idempotentEntryInserts,
)

override val transactionsReader: TransactionsReader =
new TransactionsReader(dbDispatcher, dbType, eventsPageSize, metrics, translation)(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.dao.events

import com.codahale.metrics.Histogram
import com.daml.metrics.Metrics

final class CompressionMetrics(
val createArgument: CompressionMetrics.Field,
val createKeyValue: CompressionMetrics.Field,
val exerciseArgument: CompressionMetrics.Field,
val exerciseResult: CompressionMetrics.Field,
)

object CompressionMetrics {

final class Field(val compressed: Histogram, val uncompressed: Histogram)

def apply(metrics: Metrics): CompressionMetrics = {
new CompressionMetrics(
createArgument = new Field(
compressed = metrics.daml.index.db.compression.createArgumentCompressed,
uncompressed = metrics.daml.index.db.compression.createArgumentUncompressed,
),
createKeyValue = new Field(
compressed = metrics.daml.index.db.compression.createKeyValueCompressed,
uncompressed = metrics.daml.index.db.compression.createKeyValueUncompressed,
),
exerciseArgument = new Field(
compressed = metrics.daml.index.db.compression.exerciseArgumentCompressed,
uncompressed = metrics.daml.index.db.compression.exerciseArgumentUncompressed,
),
exerciseResult = new Field(
compressed = metrics.daml.index.db.compression.exerciseResultCompressed,
uncompressed = metrics.daml.index.db.compression.exerciseResultUncompressed,
),
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.dao.events

import com.daml.platform.store.serialization.Compression

final case class CompressionStrategy(
createArgumentCompression: Compression.Algorithm,
createKeyValueCompression: Compression.Algorithm,
exerciseArgumentCompression: Compression.Algorithm,
exerciseResultCompression: Compression.Algorithm,
)

object CompressionStrategy {

val None: CompressionStrategy = CompressionStrategy(
createArgumentCompression = Compression.Algorithm.None,
createKeyValueCompression = Compression.Algorithm.None,
exerciseArgumentCompression = Compression.Algorithm.None,
exerciseResultCompression = Compression.Algorithm.None,
)

val AllGZIP: CompressionStrategy = CompressionStrategy(
createArgumentCompression = Compression.Algorithm.GZIP,
createKeyValueCompression = Compression.Algorithm.GZIP,
exerciseArgumentCompression = Compression.Algorithm.GZIP,
exerciseResultCompression = Compression.Algorithm.GZIP,
)
}
Loading

0 comments on commit 77f4279

Please sign in to comment.