Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command dedup: add columns to completions in append-only schemas [KVL-1057] #10652

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8dfedb4
Augment completion.proto with deduplication-related info
fabiotudone-da Aug 19, 2021
9768373
Merge branch 'main' into fabiotudone-da/command-dedup/ledger-api-comp…
fabiotudone-da Aug 23, 2021
e43f2fc
Explicitly specify fields not yet filled in when building Completion
fabiotudone-da Aug 23, 2021
d566b97
Time-based deduplication periods are measured in record time of compl…
fabiotudone-da Aug 23, 2021
89336ab
Add deduplication_offset as a deduplication_period option
fabiotudone-da Aug 23, 2021
eb14dd9
Don't skip proto field numbers
fabiotudone-da Aug 23, 2021
1ba2045
CompletionFromTransaction: use default Completion constructor
fabiotudone-da Aug 23, 2021
efa01ff
submission_rank: reserve proto field for future use
fabiotudone-da Aug 23, 2021
73fd7a2
Add comment about reserved proto field
fabiotudone-da Aug 23, 2021
8e54bb5
Add command deduplication columns to completions in append-only schemas
fabiotudone-da Aug 24, 2021
35c6ded
Merge branch 'main' into fabiotudone-da/command-dedup/index-db-add-in…
fabiotudone-da Aug 24, 2021
8079673
Remove duplicated `application_id` field from Postgres and H2 schemas
fabiotudone-da Aug 24, 2021
3e5fa10
Make `application_id` and `submission_id` nullable
fabiotudone-da Aug 24, 2021
f68a130
Re-generate migrations' sha256
fabiotudone-da Aug 24, 2021
abf65e3
Merge branch 'main' into fabiotudone-da/command-dedup/index-db-add-in…
fabiotudone-da Aug 24, 2021
0cb77e4
Tidy-up
fabiotudone-da Aug 25, 2021
1cab32b
Add RW logic for new command dedup columns in completions
fabiotudone-da Aug 25, 2021
156dae8
Merge branch 'main' into fabiotudone-da/command-dedup/index-db-add-in…
fabiotudone-da Aug 25, 2021
6db6a86
Declare `deduplication_start` as a nullable column
fabiotudone-da Aug 25, 2021
484db4a
Fix UpdateToDbDtoSpec
fabiotudone-da Aug 25, 2021
8dbe592
Fix merge from main
fabiotudone-da Aug 25, 2021
19856a2
Replace Either3 with an ADT in StorageBackendTestValues
fabiotudone-da Aug 25, 2021
2ba045d
Test with non-zero nanos
fabiotudone-da Aug 25, 2021
477e219
Fix UpdateToDbDtoSpec
fabiotudone-da Aug 25, 2021
1432308
Also change Schema.scala
fabiotudone-da Aug 26, 2021
0e27dbe
Simplify DbDto.CommandCompletion construction
fabiotudone-da Aug 26, 2021
b7c86d7
Merge branch 'main' into fabiotudone-da/command-dedup/index-db-add-in…
fabiotudone-da Aug 26, 2021
39ded6d
Fix merge from `main` (migration SHAs and filename)
fabiotudone-da Aug 26, 2021
15223de
Fix UpdateToDbDto: restore transactionId
fabiotudone-da Aug 30, 2021
24d1d6e
Fix completions' application_id: it must be non-nullable in all schem…
fabiotudone-da Aug 30, 2021
eb44d3c
Actually use newly read columns
fabiotudone-da Aug 30, 2021
5643b95
Add DB round-trip tests for new fields in StorageBackendTestValues
fabiotudone-da Aug 30, 2021
51ab254
Format
fabiotudone-da Aug 30, 2021
87da1dd
StorageBackendTestValues: property type someSubmissionId
fabiotudone-da Aug 30, 2021
482891e
UpdateToDbDto: remove duplicated logic
fabiotudone-da Aug 30, 2021
804c99e
Simplify UpdateToDbDto.commandCompletion
fabiotudone-da Aug 30, 2021
f32346e
Fix comment in CompletionFromTransaction.toApiDeduplicationPeriod
fabiotudone-da Aug 30, 2021
4ebd1e0
Fix CompletionFromTransaction.toApiCompletion
fabiotudone-da Aug 30, 2021
b1bc3d4
Fix deduplication_offset in Schema
fabiotudone-da Aug 30, 2021
85ed113
Make parameters explicit in CompletionFromTransaction and CompletionS…
fabiotudone-da Aug 30, 2021
8da5af0
Shorten error message in CompletionFromTransaction
fabiotudone-da Aug 30, 2021
166cd83
Fix StorageBackendTestsCompletions
fabiotudone-da Aug 30, 2021
3b7a9e5
Fix CompletionFromTransaction
fabiotudone-da Aug 30, 2021
bde3d1e
Separate command dedup-related test cases in UpdateToDbDtoSpec
fabiotudone-da Aug 31, 2021
4a94562
Simplify further UpdateToDbDto.commandCompletion
fabiotudone-da Aug 31, 2021
2749c75
Simplify CompletionFromTransaction.toApiCompletion
fabiotudone-da Aug 31, 2021
db74dd0
Format
fabiotudone-da Aug 31, 2021
9c305fc
Merge branch 'main' into fabiotudone-da/command-dedup/index-db-add-in…
fabiotudone-da Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add DB round-trip tests for new fields in StorageBackendTestValues
  • Loading branch information
fabiotudone-da committed Aug 30, 2021
commit 5643b95f2c15ba95bde2a0e5c4edbe21b5fbc4aa
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private[backend] object StorageBackendTestValues {
ParameterStorageBackend.IdentityParams(someLedgerId, someParticipantId)
val someParty: Ref.Party = Ref.Party.assertFromString("party")
val someApplicationId: Ref.ApplicationId = Ref.ApplicationId.assertFromString("application_id")
val someSubmissionId: String = "submission_id"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a Ref.SubmissionId? All other some* values are properly typed as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val someArchive: DamlLf.Archive = DamlLf.Archive.newBuilder
.setHash("00001")
Expand Down Expand Up @@ -205,44 +206,16 @@ private[backend] object StorageBackendTestValues {
)
}

sealed trait Deduplication extends Product with Serializable {
import Deduplication.{Offset => DedupOffset, _}

def startOffset: Option[String] = this match {
case DedupOffset(offset) => Some(offset)
case _ => None
}
def duration: Option[SecondsAndNanos] = this match {
case Span(span) => Some(span)
case _ => None
}
def startInstant: Option[Instant] = this match {
case Start(instant) => Some(instant)
case _ => None
}
}
object Deduplication {
final case class SecondsAndNanos(seconds: Long, nanos: Int) {
def allComponentsAreNonNegative: Boolean = seconds >= 0 && nanos >= 0
}

final case class Offset(offset: String) extends Deduplication
final case class Span(span: SecondsAndNanos) extends Deduplication {
require(
span.allComponentsAreNonNegative,
s"All the deduplication window components must not be negative: $span",
)
}
final case class Start(instant: Instant) extends Deduplication
}

def dtoCompletion(
fabiotudone-da marked this conversation as resolved.
Show resolved Hide resolved
offset: Offset,
submitter: String = "signatory",
commandId: String = UUID.randomUUID().toString,
applicationId: String = someApplicationId,
submissionId: Option[String] = Some(UUID.randomUUID().toString),
deduplication: Option[Deduplication] = None,
deduplicationOffset: Option[String] = None,
deduplicationTimeSeconds: Option[Long] = None,
deduplicationTimeNanos: Option[Int] = None,
deduplicationStart: Option[Instant] = None,
): DbDto.CommandCompletion =
DbDto.CommandCompletion(
completion_offset = offset.toHexString,
Expand All @@ -255,10 +228,10 @@ private[backend] object StorageBackendTestValues {
rejection_status_message = None,
rejection_status_details = None,
submission_id = submissionId,
deduplication_offset = deduplication.flatMap(_.startOffset),
deduplication_time_seconds = deduplication.flatMap(_.duration.map(_.seconds)),
deduplication_time_nanos = deduplication.flatMap(_.duration.map(_.nanos)),
deduplication_start = deduplication.flatMap(_.startInstant),
deduplication_offset = deduplicationOffset,
deduplication_time_seconds = deduplicationTimeSeconds,
deduplication_time_nanos = deduplicationTimeNanos,
deduplication_start = deduplicationStart,
)

def dtoTransactionId(dto: DbDto): Ref.TransactionId = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.platform.store.backend

import java.time.Duration

import com.daml.ledger.offset.Offset
import org.scalatest.Inside
import org.scalatest.flatspec.AsyncFlatSpec
Expand Down Expand Up @@ -53,4 +55,183 @@ private[backend] trait StorageBackendTestsCompletions
}
}

it should "correctly persist and retrieve application IDs" in {
val party = someParty
val applicationId = someApplicationId

val dtos = Vector(
dtoConfiguration(offset(1)),
dtoCompletion(offset(2), submitter = party),
)

for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(dtos, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)))
completions <- executeSql(
backend.commandCompletions(offset(1), offset(2), applicationId, Set(party))
)
} yield {
completions should have length 1
completions.head.completions should have length 1
completions.head.completions.head.applicationId should be(applicationId)
}
}

it should "correctly persist and retrieve submission IDs" in {
val party = someParty
val submissionId = Some(someSubmissionId)

val dtos = Vector(
dtoConfiguration(offset(1)),
dtoCompletion(offset(2), submitter = party, submissionId = submissionId),
dtoCompletion(offset(3), submitter = party, submissionId = None),
)

for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(dtos, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)))
completions <- executeSql(
backend.commandCompletions(offset(1), offset(3), someApplicationId, Set(party))
)
} yield {
completions should have length 2
val List(completionWithSubmissionId, completionWithoutSubmissionId) = completions
completionWithSubmissionId.completions should have length 1
completionWithSubmissionId.completions.head.submissionId should be(submissionId)
completionWithoutSubmissionId.completions should have length 1
completionWithoutSubmissionId.completions.head.submissionId should be("")
}
}

it should "correctly persist and retrieve command deduplication offsets" in {
val party = someParty
val anOffset = "someOffset"

val dtos = Vector(
dtoConfiguration(offset(1)),
dtoCompletion(
offset(2),
submitter = party,
deduplicationOffset = Some(anOffset),
),
dtoCompletion(offset(3), submitter = party, deduplicationOffset = None),
)

for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(dtos, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)))
completions <- executeSql(
backend.commandCompletions(offset(1), offset(3), someApplicationId, Set(party))
)
} yield {
completions should have length 2
val List(completionWithDeduplicationOffset, completionWithoutDeduplicationOffset) =
completions
completionWithDeduplicationOffset.completions should have length 1
completionWithDeduplicationOffset.completions.head.deduplicationPeriod.deduplicationOffset should be(
Some(anOffset)
)
completionWithoutDeduplicationOffset.completions should have length 1
completionWithoutDeduplicationOffset.completions.head.deduplicationPeriod.deduplicationOffset should not be defined
}
}

it should "correctly persist and retrieve command deduplication times" in {
val party = someParty
val seconds = 100L
val nanos = 10
val expectedDuration = Duration.ofSeconds(seconds).plusNanos(nanos.toLong)

val dtos = Vector(
dtoConfiguration(offset(1)),
dtoCompletion(
offset(2),
submitter = party,
deduplicationTimeSeconds = Some(seconds),
deduplicationTimeNanos = Some(nanos),
),
dtoCompletion(
offset(3),
submitter = party,
deduplicationTimeSeconds = None,
deduplicationTimeNanos = None,
),
)

for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(dtos, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)))
completions <- executeSql(
backend.commandCompletions(offset(1), offset(3), someApplicationId, Set(party))
)
} yield {
completions should have length 2
val List(completionWithDeduplicationOffset, completionWithoutDeduplicationOffset) =
completions
completionWithDeduplicationOffset.completions should have length 1
completionWithDeduplicationOffset.completions.head.deduplicationPeriod.deduplicationOffset should be(
expectedDuration
)
completionWithoutDeduplicationOffset.completions should have length 1
completionWithoutDeduplicationOffset.completions.head.deduplicationPeriod.deduplicationTime should not be defined
}
}

it should "fail on broken command deduplication times in DB" in {
val party = someParty
val seconds = 100L
val nanos = 10

val dtos1 = Vector(
dtoConfiguration(offset(1)),
dtoCompletion(
offset(2),
submitter = party,
deduplicationTimeSeconds = Some(seconds),
deduplicationTimeNanos = None,
),
)

for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(dtos1, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)))
result <- executeSql(
backend.commandCompletions(offset(1), offset(2), someApplicationId, Set(party))
).failed
} yield {
result.getCause shouldBe an[IllegalArgumentException]
result.getCause.getMessage should be(
"One of deduplication time seconds and nanos has been provided " +
"but they must be either both provided or both absent"
)
}

val dtos2 = Vector(
dtoCompletion(
offset(3),
submitter = party,
deduplicationTimeSeconds = None,
deduplicationTimeNanos = Some(nanos),
)
)

for {
_ <- executeSql(ingest(dtos2, _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)))
result <- executeSql(
backend.commandCompletions(offset(2), offset(3), someApplicationId, Set(party))
).failed
} yield {
result.getCause shouldBe an[IllegalArgumentException]
result.getCause.getMessage should be(
"One of deduplication time seconds and nanos has been provided " +
"but they must be either both provided or both absent"
)
}
}
}