forked from digital-asset/daml
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Squash ReadOnlyLedger hierarchy [DPP-809] (digital-asset#12748)
* Squash functionality ReadOnlyLedger hierarchy implementation into BaseLedger changelog_begin changelog_end * Renamed BaseLedger to ReadOnlyLedgerImpl * Removed BaseLedgerSpec test as it is redundant * Package shifts * Moves ReadOnlyLedger to package `index` * Moves TransactionConversions and EventFilter to `appendonlydao.events` * Refactorings: * Extracted LedgerEndPoller out of ReadOnlyLedger * Moves DispatcherLagMeter in its own file * ReadOnlyLedgerOwnerBuilder * Replace CacheIndex with MutableCacheLedgerEnd * Rename DispatcherLagMeter to InstrumentedSignalNewLedgerHead
- Loading branch information
Showing
18 changed files
with
466 additions
and
663 deletions.
There are no files selected for viewing
50 changes: 50 additions & 0 deletions
50
...ipant-integration-api/src/main/scala/platform/index/InstrumentedSignalNewLedgerHead.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.index | ||
|
||
import com.codahale.metrics.Timer | ||
import com.daml.ledger.offset.Offset | ||
import com.daml.platform.store.cache.MutableCacheBackedContractStore.SignalNewLedgerHead | ||
import com.daml.scalautil.Statement.discard | ||
|
||
import java.util.concurrent.TimeUnit | ||
import scala.collection.mutable | ||
|
||
/** Computes the lag between the contract state events dispatcher and the general dispatcher. | ||
* | ||
* Internally uses a size bound for preventing memory leaks if misused. | ||
* | ||
* @param delegate The ledger head dispatcher delegate. | ||
* @param timer The timer measuring the delta. | ||
*/ | ||
private[index] class InstrumentedSignalNewLedgerHead( | ||
delegate: SignalNewLedgerHead, | ||
maxSize: Long = 1000L, | ||
)( | ||
timer: Timer | ||
) extends SignalNewLedgerHead { | ||
private val ledgerHeads = mutable.Map.empty[Offset, Long] | ||
|
||
override def apply(offset: Offset, sequentialEventId: Long): Unit = { | ||
delegate(offset, sequentialEventId) | ||
ledgerHeads.synchronized { | ||
ledgerHeads.remove(offset).foreach { startNanos => | ||
val endNanos = System.nanoTime() | ||
timer.update(endNanos - startNanos, TimeUnit.NANOSECONDS) | ||
} | ||
} | ||
} | ||
|
||
private[index] def startTimer(head: Offset): Unit = | ||
ledgerHeads.synchronized { | ||
ensureBounded() | ||
discard(ledgerHeads.getOrElseUpdate(head, System.nanoTime())) | ||
} | ||
|
||
private def ensureBounded(): Unit = | ||
if (ledgerHeads.size > maxSize) { | ||
// If maxSize is reached, remove randomly ANY element. | ||
ledgerHeads.headOption.foreach(head => ledgerHeads.remove(head._1)) | ||
} else () | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
...er/participant-integration-api/src/main/scala/platform/index/LedgerEndCachesUpdater.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.index | ||
|
||
import akka.{Done, NotUsed} | ||
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch} | ||
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} | ||
import com.daml.ledger.offset.Offset | ||
import com.daml.logging.LoggingContext | ||
import com.daml.platform.akkastreams.dispatcher.Dispatcher | ||
import com.daml.platform.store.appendonlydao.LedgerReadDao | ||
import com.daml.platform.store.interning.UpdatingStringInterningView | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
|
||
/** Periodically polls the ledger end from the [[LedgerReadDao]] | ||
* and updates the caches backing the Ledger API. | ||
*/ | ||
private[index] class LedgerEndCachesUpdater( | ||
ledgerReadDao: LedgerReadDao, | ||
updatingStringInterningView: UpdatingStringInterningView, | ||
instrumentedSignalNewLedgerHead: InstrumentedSignalNewLedgerHead, | ||
contractStateEventsDispatcher: Dispatcher[(Offset, Long)], | ||
)(implicit mat: Materializer, loggingContext: LoggingContext) | ||
extends AutoCloseable { | ||
|
||
private val (ledgerEndUpdateKillSwitch, ledgerEndUpdateDone) = | ||
RestartSource | ||
.withBackoff( | ||
RestartSettings(minBackoff = 1.second, maxBackoff = 10.seconds, randomFactor = 0.2) | ||
)(() => | ||
Source | ||
.tick(0.millis, 100.millis, ()) | ||
.mapAsync(1) { | ||
implicit val ec: ExecutionContext = mat.executionContext | ||
_ => | ||
for { | ||
ledgerEnd <- ledgerReadDao.lookupLedgerEnd() | ||
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId) | ||
} yield ledgerEnd | ||
} | ||
) | ||
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) | ||
.toMat(Sink.foreach { newLedgerHead => | ||
instrumentedSignalNewLedgerHead.startTimer(newLedgerHead.lastOffset) | ||
contractStateEventsDispatcher.signalNewHead( | ||
newLedgerHead.lastOffset -> newLedgerHead.lastEventSeqId | ||
) | ||
})(Keep.both[UniqueKillSwitch, Future[Done]]) | ||
.run() | ||
|
||
def close(): Unit = { | ||
ledgerEndUpdateKillSwitch.shutdown() | ||
Await.ready(ledgerEndUpdateDone, 10.seconds) | ||
() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.