Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dpp 710 activate interning write side #11614

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,6 @@ final class Metrics(val registry: MetricRegistry) {
val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id")
val getParticipantId: DatabaseMetrics = createDbMetrics("get_participant_id")
val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end")
val getLedgerEndOffsetAndSequentialId: DatabaseMetrics = createDbMetrics(
"get_ledger_end_offset_and_sequential_id"
)
val getInitialLedgerEnd: DatabaseMetrics = createDbMetrics("get_initial_ledger_end")
val initializeLedgerParameters: DatabaseMetrics = createDbMetrics(
"initialize_ledger_parameters"
Expand Down Expand Up @@ -524,6 +521,9 @@ final class Metrics(val registry: MetricRegistry) {
val getContractStateEvents: DatabaseMetrics = createDbMetrics(
"get_contract_state_events"
)
val loadStringInterningEntries: DatabaseMetrics = createDbMetrics(
"loadStringInterningEntries"
)

object translation {
private val Prefix: MetricName = db.Prefix :+ "translation"
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e28b88accecead16dcd67becabf7ad230654b65f93c277e785ba77203c3db8b5
bd9ddd16ae4ecc09923215635442c83b7894e95d23203baccde6df27cb0ce2cf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE parameters (
participant_id VARCHAR NOT NULL,
ledger_end VARCHAR,
ledger_end_sequential_id BIGINT,
ledger_end_string_interning_id INTEGER,
participant_pruned_up_to_inclusive VARCHAR,
participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2131cf0ed208e236ebf0ae68c8153cb8080d3b133114902185393c2b34f0b45d
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

ALTER TABLE parameters
ADD ledger_end_string_interning_id NUMBER;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cc55310126bde9627d59698cee292506e4d80c8303f090ba8649c4f4c6e19fbc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- add string_interning ledger-end tracking column to parameters
ALTER TABLE parameters
ADD COLUMN ledger_end_string_interning_id INTEGER;

-- create temporary sequence to populate the string_interning table as migrating data
CREATE SEQUENCE string_interning_seq_temp;
-- add temporary index to aid string lookups as migrating data
CREATE UNIQUE INDEX string_interning_external_string_temp_idx ON string_interning USING btree (external_string);

-- add temporary migration function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this data migration is verified locally by ensuring it generates the same string_interning table as it was populated by the write side

for reviewers: this needs to confirm to logic in DbDtoToStringsForInterning.

CREATE FUNCTION insert_to_string_interning(prefix TEXT, table_name TEXT, selector_expr TEXT)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
EXECUTE
'INSERT INTO string_interning(internal_id, external_string) ' ||
'SELECT nextval(''string_interning_seq_temp''), id ' ||
'FROM (SELECT DISTINCT ''' || prefix || ''' || ' || selector_expr || ' id FROM ' || table_name || ') distinct_ids ' ||
'WHERE NOT EXISTS (SELECT 1 FROM string_interning WHERE external_string = distinct_ids.id)' ||
'AND id IS NOT NULL';
END
$$;

-- data migrations

-- template_id
SELECT insert_to_string_interning('t|', 'participant_events_create', 'template_id');
rautenrieth-da marked this conversation as resolved.
Show resolved Hide resolved
SELECT insert_to_string_interning('t|', 'participant_events_divulgence', 'template_id');
SELECT insert_to_string_interning('t|', 'participant_events_consuming_exercise', 'template_id');
SELECT insert_to_string_interning('t|', 'participant_events_non_consuming_exercise', 'template_id');

-- party
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_observers)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_signatories)');

SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(submitters)');

SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(exercise_actors)');

SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(exercise_actors)');

SELECT insert_to_string_interning('p|', 'participant_command_completions', 'unnest(submitters)');

SELECT insert_to_string_interning('p|', 'party_entries', 'party');

-- fill ledger-end
UPDATE parameters
SET ledger_end_string_interning_id = (SELECT max(internal_id) FROM string_interning);

-- remove temporary SQL objects
DROP SEQUENCE string_interning_seq_temp;
DROP INDEX string_interning_external_string_temp_idx;
DROP FUNCTION insert_to_string_interning(TEXT, TEXT, TEXT);

Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
context: ResourceContext
): Resource[ReadOnlySqlLedgerWithMutableCache] =
for {
(ledgerEndOffset, ledgerEndSequentialId) <- Resource.fromFuture(
ledgerDao.lookupLedgerEndOffsetAndSequentialId()
ledgerEnd <- Resource.fromFuture(
ledgerDao.lookupLedgerEnd()
)
_ = ledgerEndCache.set((ledgerEndOffset, ledgerEndSequentialId))
_ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId))
prefetchingDispatcher <- dispatcherOffsetSeqIdOwner(
ledgerEndOffset,
ledgerEndSequentialId,
ledgerEnd.lastOffset,
ledgerEnd.lastEventSeqId,
).acquire()
generalDispatcher <- dispatcherOwner(ledgerEndOffset).acquire()
generalDispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire()
dispatcherLagMeter <- Resource.successful(
new DispatcherLagMeter((offset, eventSeqId) => {
ledgerEndCache.set((offset, eventSeqId))
Expand All @@ -79,7 +79,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
prefetchingDispatcher,
generalDispatcher,
dispatcherLagMeter,
ledgerEndOffset -> ledgerEndSequentialId,
ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId,
).acquire()
} yield ledger

Expand Down Expand Up @@ -294,12 +294,14 @@ private final class ReadOnlySqlLedgerWithMutableCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { case newLedgerHead @ (offset, _) =>
dispatcherLagger.startTimer(offset)
contractStateEventsDispatcher.signalNewHead(newLedgerHead)
.toMat(Sink.foreach { case newLedgerHead =>
dispatcherLagger.startTimer(newLedgerHead.lastOffset)
contractStateEventsDispatcher.signalNewHead(
newLedgerHead.lastOffset -> newLedgerHead.lastEventSeqId
)
})(
Keep.both[UniqueKillSwitch, Future[Done]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ private[index] object ReadOnlySqlLedgerWithTranslationCache {
context: ResourceContext
): Resource[ReadOnlySqlLedgerWithTranslationCache] =
for {
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEndOffsetAndSequentialId())
_ = ledgerEndCache.set(ledgerEnd)
dispatcher <- dispatcherOwner(ledgerEnd._1).acquire()
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
_ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
dispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire()
contractsStore <- contractStoreOwner()
ledger <- ledgerOwner(dispatcher, contractsStore).acquire()
} yield ledger
Expand Down Expand Up @@ -89,15 +89,15 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { ledgerEnd =>
ledgerEndCache.set(ledgerEnd)
ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
// the order here is very important: first we need to make data available for point-wise lookups
// and SQL queries, and only then we can make it available on the streams.
// (consider example: completion arrived on a stream, but the transaction cannot be looked up)
dispatcher.signalNewHead(ledgerEnd._1)
dispatcher.signalNewHead(ledgerEnd.lastOffset)
})(
Keep.both[UniqueKillSwitch, Future[Done]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object JdbcIndexer {
val parameterStorageBackend = factory.createParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val resetStorageBackend = factory.createResetStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
jdbcUrl = config.jdbcUrl,
inputMappingParallelism = config.inputMappingParallelism,
Expand Down Expand Up @@ -97,6 +98,7 @@ object JdbcIndexer {
batchWithinMillis = config.batchWithinMillis,
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
mat = materializer,
readService = readService,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.{IngestionStorageBackend, ParameterStorageBackend}
import com.daml.platform.store.interning.UpdatingStringInterningView

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -29,6 +29,7 @@ private[platform] case class InitializeParallelIngestion(

def apply(
dbDispatcher: DbDispatcher,
updatingStringInterningView: UpdatingStringInterningView,
readService: ReadService,
ec: ExecutionContext,
mat: Materializer,
Expand All @@ -54,8 +55,14 @@ private[platform] case class InitializeParallelIngestion(
_ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
ingestionStorageBackend.deletePartiallyIngestedData(ledgerEnd)
)
_ <- ledgerEnd match {
case Some(ledgerEnd) => updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
case None => Future.unit
}
ledgerEndOrBeforeBegin = ledgerEnd.getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin)
rautenrieth-da marked this conversation as resolved.
Show resolved Hide resolved
} yield InitializeParallelIngestion.Initialized(
initialEventSeqId = ledgerEnd.map(_.lastEventSeqId).getOrElse(EventSequentialId.beforeBegin),
initialEventSeqId = ledgerEndOrBeforeBegin.lastEventSeqId,
initialStringInterningId = ledgerEndOrBeforeBegin.lastStringInterningId,
readServiceSource = readService.stateUpdates(beginAfter = ledgerEnd.map(_.lastOffset)),
)
}
Expand All @@ -66,6 +73,7 @@ object InitializeParallelIngestion {

case class Initialized(
initialEventSeqId: Long,
initialStringInterningId: Int,
readServiceSource: Source[(Offset, Update), NotUsed],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.indexer.Indexer
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{DBLockStorageBackend, DataSourceStorageBackend}
import com.daml.platform.store.backend.{
DBLockStorageBackend,
DataSourceStorageBackend,
StringInterningStorageBackend,
}
import com.daml.platform.store.interning.StringInterningView
import com.google.common.util.concurrent.ThreadFactoryBuilder

import scala.concurrent.duration.FiniteDuration
Expand All @@ -39,6 +44,7 @@ object ParallelIndexerFactory {
dataSourceStorageBackend: DataSourceStorageBackend,
initializeParallelIngestion: InitializeParallelIngestion,
parallelIndexerSubscription: ParallelIndexerSubscription[_],
stringInterningStorageBackend: StringInterningStorageBackend,
mat: Materializer,
readService: ReadService,
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] =
Expand Down Expand Up @@ -101,8 +107,19 @@ object ParallelIndexerFactory {
metrics = metrics,
)
) { dbDispatcher =>
val stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
initializeParallelIngestion(
dbDispatcher = dbDispatcher,
updatingStringInterningView = stringInterningView,
readService = readService,
ec = ec,
mat = mat,
Expand All @@ -111,6 +128,7 @@ object ParallelIndexerFactory {
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
stringInterningView = stringInterningView,
materializer = mat,
)
)
Expand Down
Loading