Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire LedgerEndCache to relevant StorageBackend factories [DPP-704] #11549

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.daml.platform.store.appendonlydao.{
LedgerDaoTransactionsReader,
LedgerReadDao,
}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.{BaseLedger, LfValueTranslationCache}
import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy
Expand Down Expand Up @@ -66,7 +66,8 @@ private[platform] object ReadOnlySqlLedger {
override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = {
val ledgerEndCache = MutableLedgerEndCache()
for {
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories).acquire()
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache)
.acquire()
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
} yield ledger
Expand Down Expand Up @@ -142,6 +143,7 @@ private[platform] object ReadOnlySqlLedger {
private def ledgerDaoOwner(
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
Expand All @@ -156,6 +158,7 @@ private[platform] object ReadOnlySqlLedger {
Some(enricher),
participantId,
errorFactories,
ledgerEndCache,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.cache.MutableLedgerEndCache
import scalaz.Tag

import scala.concurrent.duration._
Expand Down Expand Up @@ -59,6 +60,7 @@ object IndexMetadata {
// and this property is not needed for the used ReadDao.
participantId = Ref.ParticipantId.assertFromString("1"),
errorFactories = errorFactories,
ledgerEndCache = MutableLedgerEndCache(), // not used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit-picking: use a prefix suc has TODO Efficient ACS: not used so that it's easy to find comments like this one

Copy link
Contributor Author

@nmarton-da nmarton-da Nov 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment meant to stay

)

private val Empty = "<empty>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import com.daml.platform.store.backend.{
StorageBackendFactory,
UpdateToDbDto,
}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
Expand Down Expand Up @@ -772,6 +772,7 @@ private[platform] object JdbcLedgerDao {
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] =
owner(
serverRole,
Expand All @@ -788,6 +789,7 @@ private[platform] object JdbcLedgerDao {
participantId = participantId,
sequentialWriteDao = NoopSequentialWriteDao,
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerReadDao(_, metrics))

def writeOwner(
Expand Down Expand Up @@ -828,6 +830,7 @@ private[platform] object JdbcLedgerDao {
ledgerEndCache,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}

Expand Down Expand Up @@ -872,6 +875,7 @@ private[platform] object JdbcLedgerDao {
ledgerEndCache,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}

Expand Down Expand Up @@ -917,6 +921,7 @@ private[platform] object JdbcLedgerDao {
participantId: Ref.ParticipantId,
sequentialWriteDao: SequentialWriteDao,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val factory = StorageBackendFactory.of(dbType)
Expand All @@ -940,7 +945,7 @@ private[platform] object JdbcLedgerDao {
enricher,
sequentialWriteDao,
participantId,
StorageBackendFactory.readStorageBackendFor(dbType),
StorageBackendFactory.readStorageBackendFor(dbType, ledgerEndCache),
factory.createParameterStorageBackend,
factory.createDeduplicationStorageBackend,
factory.createResetStorageBackend,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.io.ByteArrayInputStream
import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.serialization.{Compression, ValueSerializer}
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.backend.StorageBackend.RawContractStateEvent
import com.daml.platform.store.backend.ContractStorageBackend.RawContractStateEvent

import scala.util.control.NoStackTrace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao.events
import java.io.ByteArrayInputStream

import com.daml.lf.data.Ref
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent
import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.serialization.{Compression, ValueSerializer}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import com.daml.platform
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw}
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState
Expand All @@ -31,24 +30,7 @@ import scala.util.Try
* Naming convention for the interface methods, which requiring Connection:
* - read operations are represented as nouns (plural, singular form indicates cardinality)
* - write operations are represented as verbs
*
* @tparam DB_BATCH Since parallel ingestion comes also with batching, this implementation specific type allows separation of the CPU intensive batching operation from the pure IO intensive insertBatch operation.
*/
trait StorageBackend[DB_BATCH]
extends IngestionStorageBackend[DB_BATCH]
with ParameterStorageBackend
with ConfigurationStorageBackend
with PartyStorageBackend
with PackageStorageBackend
with DeduplicationStorageBackend
with CompletionStorageBackend
with ContractStorageBackend
with EventStorageBackend
with DataSourceStorageBackend
with DBLockStorageBackend
with IntegrityStorageBackend
with ResetStorageBackend
with StringInterningStorageBackend

trait ResetStorageBackend {

Expand Down Expand Up @@ -224,10 +206,10 @@ trait ContractStorageBackend {
def keyState(key: Key, validAt: Long)(connection: Connection): KeyState
def contractState(contractId: ContractId, before: Long)(
connection: Connection
): Option[StorageBackend.RawContractState]
): Option[ContractStorageBackend.RawContractState]
def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)(
connection: Connection
): Option[StorageBackend.RawContract]
): Option[ContractStorageBackend.RawContract]
def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)(
connection: Connection
): Option[String]
Expand All @@ -236,7 +218,38 @@ trait ContractStorageBackend {
): Option[ContractId]
def contractStateEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[StorageBackend.RawContractStateEvent]
): Vector[ContractStorageBackend.RawContractStateEvent]
}

object ContractStorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Timestamp],
)

class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
offset: Offset,
)
}

trait EventStorageBackend {
Expand Down Expand Up @@ -278,7 +291,7 @@ trait EventStorageBackend {
def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long]
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[RawTransactionEvent]
): Vector[EventStorageBackend.RawTransactionEvent]
}

object EventStorageBackend {
Expand All @@ -293,6 +306,37 @@ object EventStorageBackend {
wildCardParties: Set[Ref.Party],
partiesAndTemplates: Set[(Set[Ref.Party], Set[Ref.Identifier])],
)

case class RawTransactionEvent(
eventKind: Int,
transactionId: String,
nodeIndex: Int,
commandId: Option[String],
workflowId: Option[String],
eventId: EventId,
contractId: platform.store.appendonlydao.events.ContractId,
templateId: Option[platform.store.appendonlydao.events.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
eventSequentialId: Long,
offset: Offset,
) extends NeverEqualsOverride
}

trait DataSourceStorageBackend {
Expand Down Expand Up @@ -358,65 +402,3 @@ trait StringInterningStorageBackend {
connection: Connection
): Iterable[(Int, String)]
}

object StorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Timestamp],
)

class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
offset: Offset,
)

case class RawTransactionEvent(
eventKind: Int,
transactionId: String,
nodeIndex: Int,
commandId: Option[String],
workflowId: Option[String],
eventId: EventId,
contractId: platform.store.appendonlydao.events.ContractId,
templateId: Option[platform.store.appendonlydao.events.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
eventSequentialId: Long,
offset: Offset,
) extends NeverEqualsOverride
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import com.daml.platform.store.DbType
import com.daml.platform.store.backend.h2.H2StorageBackendFactory
import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory
import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory
import com.daml.platform.store.cache.LedgerEndCache

trait StorageBackendFactory {
def createIngestionStorageBackend: IngestionStorageBackend[_]
def createParameterStorageBackend: ParameterStorageBackend
def createConfigurationStorageBackend: ConfigurationStorageBackend
def createPartyStorageBackend: PartyStorageBackend
def createPackageStorageBackend: PackageStorageBackend
def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
def createDeduplicationStorageBackend: DeduplicationStorageBackend
def createCompletionStorageBackend: CompletionStorageBackend
def createContractStorageBackend: ContractStorageBackend
def createEventStorageBackend: EventStorageBackend
def createContractStorageBackend(ledgerEndCache: LedgerEndCache): ContractStorageBackend
def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend
def createDataSourceStorageBackend: DataSourceStorageBackend
def createDBLockStorageBackend: DBLockStorageBackend
def createIntegrityStorageBackend: IntegrityStorageBackend
Expand All @@ -33,15 +34,18 @@ object StorageBackendFactory {
case DbType.Oracle => OracleStorageBackendFactory
}

def readStorageBackendFor(dbType: DbType): ReadStorageBackend = {
def readStorageBackendFor(
dbType: DbType,
ledgerEndCache: LedgerEndCache,
): ReadStorageBackend = {
val factory = of(dbType)
ReadStorageBackend(
configurationStorageBackend = factory.createConfigurationStorageBackend,
partyStorageBackend = factory.createPartyStorageBackend,
packageStorageBackend = factory.createPackageStorageBackend,
configurationStorageBackend = factory.createConfigurationStorageBackend(ledgerEndCache),
partyStorageBackend = factory.createPartyStorageBackend(ledgerEndCache),
packageStorageBackend = factory.createPackageStorageBackend(ledgerEndCache),
completionStorageBackend = factory.createCompletionStorageBackend,
contractStorageBackend = factory.createContractStorageBackend,
eventStorageBackend = factory.createEventStorageBackend,
contractStorageBackend = factory.createContractStorageBackend(ledgerEndCache),
eventStorageBackend = factory.createEventStorageBackend(ledgerEndCache),
)
}
}
Expand Down
Loading