Skip to content

Commit

Permalink
TODO Cleanup: store.backend package (#9693)
Browse files Browse the repository at this point in the history
* Adds NeverEqualsOverride, makes DTOs case classes once again
* Remove unit-test (comes later), function separation (authors opinion: current level of abstraction is good enough) TODO
* Remove TODO regarding immutable deduplications: Not worth to invest more time and effort into deduplications since upcoming porject is targeting the same
* Keep TODO related to TransactionAccepted case: to guide later work with unit-testing
* Make DBDTOV1 sealed
* Add scaladoc to StorageBackend interface

Co-authored-by: Stephen Compall <stephen.compall@daml.com>
Co-authored-by: mziolekda <marcin.ziolek@digitalasset.com>

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored May 18, 2021
1 parent 679ce3b commit ac9b8ad
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ package com.daml.platform.store.backend

import java.time.Instant

trait DBDTOV1
import com.daml.scalautil.NeverEqualsOverride

sealed trait DBDTOV1
extends NeverEqualsOverride
with Product
with Serializable // to aid type inference for case class implementors

object DBDTOV1 {

case class EventDivulgence(
final case class EventDivulgence(
event_offset: Option[String],
command_id: Option[String],
workflow_id: Option[String],
Expand All @@ -23,7 +28,7 @@ object DBDTOV1 {
event_sequential_id: Long,
) extends DBDTOV1

case class EventCreate(
final case class EventCreate(
event_offset: Option[String],
transaction_id: Option[String],
ledger_effective_time: Option[Instant],
Expand All @@ -48,7 +53,7 @@ object DBDTOV1 {
event_sequential_id: Long,
) extends DBDTOV1

case class EventExercise(
final case class EventExercise(
consuming: Boolean,
event_offset: Option[String],
transaction_id: Option[String],
Expand All @@ -75,67 +80,63 @@ object DBDTOV1 {
event_sequential_id: Long,
) extends DBDTOV1

// TODO append-only: wartremover complained about having Array-s in case classes. I would prefer case classes. can we work that somehow around? Similarly in other DTO cases...
// TODO append-only: there are some options:
// - mixing in SomeArrayEquals if we need array equality for some reason: would be proper if we move SomeArrayEquals out from speedy codebase to scalalib first.
// - spawning somewhere something like trait NeverEqualsOverride { override equals(o: Object): Boolean = false }, and mixing in these classes
class ConfigurationEntry(
val ledger_offset: String,
val recorded_at: Instant,
val submission_id: String,
val typ: String,
val configuration: Array[Byte],
val rejection_reason: Option[String],
final case class ConfigurationEntry(
ledger_offset: String,
recorded_at: Instant,
submission_id: String,
typ: String,
configuration: Array[Byte],
rejection_reason: Option[String],
) extends DBDTOV1

class PackageEntry(
val ledger_offset: String,
val recorded_at: Instant,
val submission_id: Option[String],
val typ: String,
val rejection_reason: Option[String],
final case class PackageEntry(
ledger_offset: String,
recorded_at: Instant,
submission_id: Option[String],
typ: String,
rejection_reason: Option[String],
) extends DBDTOV1

class Package(
val package_id: String,
val upload_id: String,
val source_description: Option[String],
val size: Long,
val known_since: Instant,
val ledger_offset: String,
val _package: Array[Byte],
final case class Package(
package_id: String,
upload_id: String,
source_description: Option[String],
size: Long,
known_since: Instant,
ledger_offset: String,
_package: Array[Byte],
) extends DBDTOV1

class PartyEntry(
val ledger_offset: String,
val recorded_at: Instant,
val submission_id: Option[String],
val party: Option[String],
val display_name: Option[String],
val typ: String,
val rejection_reason: Option[String],
val is_local: Option[Boolean],
final case class PartyEntry(
ledger_offset: String,
recorded_at: Instant,
submission_id: Option[String],
party: Option[String],
display_name: Option[String],
typ: String,
rejection_reason: Option[String],
is_local: Option[Boolean],
) extends DBDTOV1

class Party(
val party: String,
val display_name: Option[String],
val explicit: Boolean,
val ledger_offset: Option[String],
val is_local: Boolean,
final case class Party(
party: String,
display_name: Option[String],
explicit: Boolean,
ledger_offset: Option[String],
is_local: Boolean,
) extends DBDTOV1

class CommandCompletion(
val completion_offset: String,
val record_time: Instant,
val application_id: String,
val submitters: Set[String],
val command_id: String,
val transaction_id: Option[String],
val status_code: Option[Int],
val status_message: Option[String],
final case class CommandCompletion(
completion_offset: String,
record_time: Instant,
application_id: String,
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
) extends DBDTOV1

class CommandDeduplication(val deduplication_key: String) extends DBDTOV1
final case class CommandDeduplication(deduplication_key: String) extends DBDTOV1

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,51 @@ import com.daml.ledger.participant.state.v1.Offset
import com.daml.platform.store.DbType
import com.daml.platform.store.backend.postgresql.PostgresStorageBackend

// TODO append-only: add detailed scaladoc
/** Encapsulates the interface which hides database technology specific implementations for parallel ingestion.
*
* @tparam DB_BATCH Since parallel ingestion comes also with batching, this implementation specific type allows separation of the CPU intensive batching operation from the pure IO intensive insertBatch operation.
*/
trait StorageBackend[DB_BATCH] {

/** The CPU intensive batching operation hides the batching logic, and the mapping to the database specific representation of the inserted data.
* This should be pure CPU logic without IO.
*
* @param dbDtos is a collection of DBDTOV1 from which the batch is formed
* @return the database-specific batch DTO, which can be inserted via insertBatch
*/
def batch(dbDtos: Vector[DBDTOV1]): DB_BATCH

/** Using a JDBC connection, a batch will be inserted into the database.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used when inserting the batch
* @param batch to be inserted
*/
def insertBatch(connection: Connection, batch: DB_BATCH): Unit

/** This method is used to update the parameters table: setting the new observable ledger-end, and other parameters.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used when updating the parameters table
* @param params the parameters
*/
def updateParams(connection: Connection, params: StorageBackend.Params): Unit

/** Custom initialization code before the start of an ingestion.
* This method is responsible for the recovery after a possibly non-graceful stop of previous indexing.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used when initializing
* @return the LedgerEnd, which should be the basis for further indexing.
*/
def initialize(connection: Connection): StorageBackend.LedgerEnd

/** Query the ledgerEnd, read from the parameters table.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used to get the LedgerEnd
* @return the LedgerEnd, which should be the basis for further indexing
*/
def ledgerEnd(connection: Connection): StorageBackend.LedgerEnd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.dao.DeduplicationKeyMaker

// TODO append-only: target to separation per update-type to it's own function + unit tests
object UpdateToDBDTOV1 {

def apply(
Expand All @@ -24,9 +23,8 @@ object UpdateToDBDTOV1 {
): Offset => Update => Iterator[DBDTOV1] = { offset =>
{
case u: Update.CommandRejected =>
// TODO append-only: we might want to tune up deduplications so it is also a temporal query
Iterator(
new DBDTOV1.CommandCompletion(
DBDTOV1.CommandCompletion(
completion_offset = offset.toHexString,
record_time = u.recordTime.toInstant,
application_id = u.submitterInfo.applicationId,
Expand All @@ -36,7 +34,7 @@ object UpdateToDBDTOV1 {
status_code = Some(Conversions.participantRejectionReasonToErrorCode(u.reason).value()),
status_message = Some(u.reason.description),
),
new DBDTOV1.CommandDeduplication(
DBDTOV1.CommandDeduplication(
DeduplicationKeyMaker.make(
domain.CommandId(u.submitterInfo.commandId),
u.submitterInfo.actAs,
Expand All @@ -46,7 +44,7 @@ object UpdateToDBDTOV1 {

case u: Update.ConfigurationChanged =>
Iterator(
new DBDTOV1.ConfigurationEntry(
DBDTOV1.ConfigurationEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = u.submissionId,
Expand All @@ -58,7 +56,7 @@ object UpdateToDBDTOV1 {

case u: Update.ConfigurationChangeRejected =>
Iterator(
new DBDTOV1.ConfigurationEntry(
DBDTOV1.ConfigurationEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = u.submissionId,
Expand All @@ -70,7 +68,7 @@ object UpdateToDBDTOV1 {

case u: Update.PartyAddedToParticipant =>
Iterator(
new DBDTOV1.PartyEntry(
DBDTOV1.PartyEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = u.submissionId,
Expand All @@ -80,7 +78,7 @@ object UpdateToDBDTOV1 {
rejection_reason = None,
is_local = Some(u.participantId == participantId),
),
new DBDTOV1.Party(
DBDTOV1.Party(
party = u.party,
display_name = Some(u.displayName),
explicit = true,
Expand All @@ -91,7 +89,7 @@ object UpdateToDBDTOV1 {

case u: Update.PartyAllocationRejected =>
Iterator(
new DBDTOV1.PartyEntry(
DBDTOV1.PartyEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = Some(u.submissionId),
Expand All @@ -106,7 +104,7 @@ object UpdateToDBDTOV1 {
case u: Update.PublicPackageUpload =>
val uploadId = u.submissionId.getOrElse(UUID.randomUUID().toString)
val packages = u.archives.iterator.map { archive =>
new DBDTOV1.Package(
DBDTOV1.Package(
package_id = archive.getHash,
upload_id = uploadId,
source_description = u.sourceDescription,
Expand All @@ -117,7 +115,7 @@ object UpdateToDBDTOV1 {
)
}
val packageEntries = u.submissionId.iterator.map(submissionId =>
new DBDTOV1.PackageEntry(
DBDTOV1.PackageEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = Some(submissionId),
Expand All @@ -129,7 +127,7 @@ object UpdateToDBDTOV1 {

case u: Update.PublicPackageUploadRejected =>
Iterator(
new DBDTOV1.PackageEntry(
DBDTOV1.PackageEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
submission_id = Some(u.submissionId),
Expand Down Expand Up @@ -261,7 +259,7 @@ object UpdateToDBDTOV1 {
}

val completions = u.optSubmitterInfo.iterator.map { submitterInfo =>
new DBDTOV1.CommandCompletion(
DBDTOV1.CommandCompletion(
completion_offset = offset.toHexString,
record_time = u.recordTime.toInstant,
application_id = submitterInfo.applicationId,
Expand Down
Loading

0 comments on commit ac9b8ad

Please sign in to comment.