Skip to content

Commit

Permalink
Squash ReadOnlyLedger hierarchy [DPP-809] (digital-asset#12748)
Browse files Browse the repository at this point in the history
* Squash functionality ReadOnlyLedger hierarchy implementation into BaseLedger

changelog_begin
changelog_end

* Renamed BaseLedger to ReadOnlyLedgerImpl
* Removed BaseLedgerSpec test as it is redundant

* Package shifts
* Moves ReadOnlyLedger to package `index`
* Moves TransactionConversions and EventFilter to `appendonlydao.events`

* Refactorings:
* Extracted LedgerEndPoller out of ReadOnlyLedger
* Moves DispatcherLagMeter in its own file

* ReadOnlyLedgerOwnerBuilder

* Replace CacheIndex with MutableCacheLedgerEnd

* Rename DispatcherLagMeter to InstrumentedSignalNewLedgerHead
  • Loading branch information
tudor-da authored Feb 17, 2022
1 parent 5e86c75 commit fa89d7e
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 663 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.index

import com.codahale.metrics.Timer
import com.daml.ledger.offset.Offset
import com.daml.platform.store.cache.MutableCacheBackedContractStore.SignalNewLedgerHead
import com.daml.scalautil.Statement.discard

import java.util.concurrent.TimeUnit
import scala.collection.mutable

/** Computes the lag between the contract state events dispatcher and the general dispatcher.
*
* Internally uses a size bound for preventing memory leaks if misused.
*
* @param delegate The ledger head dispatcher delegate.
* @param timer The timer measuring the delta.
*/
private[index] class InstrumentedSignalNewLedgerHead(
delegate: SignalNewLedgerHead,
maxSize: Long = 1000L,
)(
timer: Timer
) extends SignalNewLedgerHead {
private val ledgerHeads = mutable.Map.empty[Offset, Long]

override def apply(offset: Offset, sequentialEventId: Long): Unit = {
delegate(offset, sequentialEventId)
ledgerHeads.synchronized {
ledgerHeads.remove(offset).foreach { startNanos =>
val endNanos = System.nanoTime()
timer.update(endNanos - startNanos, TimeUnit.NANOSECONDS)
}
}
}

private[index] def startTimer(head: Offset): Unit =
ledgerHeads.synchronized {
ensureBounded()
discard(ledgerHeads.getOrElseUpdate(head, System.nanoTime()))
}

private def ensureBounded(): Unit =
if (ledgerHeads.size > maxSize) {
// If maxSize is reached, remove randomly ANY element.
ledgerHeads.headOption.foreach(head => ledgerHeads.remove(head._1))
} else ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[platform] object JdbcIndex {
maxTransactionsInMemoryFanOutBufferSize: Long,
enableInMemoryFanOutForLedgerApi: Boolean,
)(implicit mat: Materializer, loggingContext: LoggingContext): ResourceOwner[IndexService] =
new ReadOnlySqlLedger.Owner(
ReadOnlyLedgerBuilder(
dbSupport = dbSupport,
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
Expand All @@ -57,11 +57,13 @@ private[platform] object JdbcIndex {
participantId = participantId,
maxTransactionsInMemoryFanOutBufferSize = maxTransactionsInMemoryFanOutBufferSize,
errorFactories = ErrorFactories(),
).map { ledger =>
new LedgerBackedIndexService(
MeteredReadOnlyLedger(ledger, metrics),
participantId,
errorFactories = ErrorFactories(),
)
}
)(mat, loggingContext, servicesExecutionContext)
.owner()
.map { ledger =>
new LedgerBackedIndexService(
MeteredReadOnlyLedger(ledger, metrics),
participantId,
errorFactories = ErrorFactories(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.ApiOffset
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.ReadOnlyLedger
import com.daml.platform.store.entries.PartyLedgerEntry
import com.daml.telemetry.{SpanAttribute, Spans}
import scalaz.syntax.tag.ToTagOps

import scala.concurrent.{ExecutionContext, Future}

private[platform] final class LedgerBackedIndexService(
private[index] final class LedgerBackedIndexService(
ledger: ReadOnlyLedger,
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.index

import akka.{Done, NotUsed}
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import com.daml.ledger.offset.Offset
import com.daml.logging.LoggingContext
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.store.appendonlydao.LedgerReadDao
import com.daml.platform.store.interning.UpdatingStringInterningView
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

/** Periodically polls the ledger end from the [[LedgerReadDao]]
* and updates the caches backing the Ledger API.
*/
private[index] class LedgerEndCachesUpdater(
ledgerReadDao: LedgerReadDao,
updatingStringInterningView: UpdatingStringInterningView,
instrumentedSignalNewLedgerHead: InstrumentedSignalNewLedgerHead,
contractStateEventsDispatcher: Dispatcher[(Offset, Long)],
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends AutoCloseable {

private val (ledgerEndUpdateKillSwitch, ledgerEndUpdateDone) =
RestartSource
.withBackoff(
RestartSettings(minBackoff = 1.second, maxBackoff = 10.seconds, randomFactor = 0.2)
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1) {
implicit val ec: ExecutionContext = mat.executionContext
_ =>
for {
ledgerEnd <- ledgerReadDao.lookupLedgerEnd()
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
} yield ledgerEnd
}
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { newLedgerHead =>
instrumentedSignalNewLedgerHead.startTimer(newLedgerHead.lastOffset)
contractStateEventsDispatcher.signalNewHead(
newLedgerHead.lastOffset -> newLedgerHead.lastEventSeqId
)
})(Keep.both[UniqueKillSwitch, Future[Done]])
.run()

def close(): Unit = {
ledgerEndUpdateKillSwitch.shutdown()
Await.ready(ledgerEndUpdateDone, 10.seconds)
()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.ReadOnlyLedger
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}

import scala.concurrent.Future

private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: Metrics)
private[index] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: Metrics)
extends ReadOnlyLedger {

override def ledgerId: LedgerId = ledger.ledgerId
Expand Down Expand Up @@ -143,10 +142,6 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M
)(implicit loggingContext: LoggingContext): Source[(Offset, PackageLedgerEntry), NotUsed] =
ledger.packageEntries(startExclusive)

override def close(): Unit = {
ledger.close()
}

override def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(Offset, Configuration)]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store
package com.daml.platform.index

import akka.NotUsed
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -30,7 +30,7 @@ import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry,
import scala.concurrent.Future

/** Defines all the functionalities a Ledger needs to provide */
private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable {
private[index] trait ReadOnlyLedger extends ReportsHealth {

def ledgerId: LedgerId

Expand Down
Loading

0 comments on commit fa89d7e

Please sign in to comment.