Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
Prev Previous commit
Next Next commit
participant-integration-api: Store a status gRPC protobuf.
Instead of storing the status code and message, we store a serialized
`google.rpc.Status` protocol buffers message. This allows us to pass
through any additional information reported by the driver `ReadService`.

The migration is only done for the append-only database, and preserves
old data in the existing columns. New data will only be written to the
new column.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar committed Aug 17, 2021
commit 1fd2a372b03de8e03f286e795d335a93014e2dbc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
e64e1bc672d79af26e86ca8e495a31c6ef026ae6c6df86b3101d9d942954e745
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,16 @@ CREATE TABLE participant_command_completions (
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
-- The rejection status is `NULL` if the completion is for an accepted transaction.
This conversation was marked as resolved.
Show resolved Hide resolved
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
This conversation was marked as resolved.
Show resolved Hide resolved
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
This conversation was marked as resolved.
Show resolved Hide resolved
-- driver). The `rejection_status_code` and `rejection_status_message` columns will always be
-- `NULL` in an H2-backed index, but we keep them for parity with old data in other databases.
fabiotudone-da marked this conversation as resolved.
Show resolved Hide resolved
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status BLOB
This conversation was marked as resolved.
Show resolved Hide resolved
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
525584affccdd5c5a6b20eac660b068ba1db392c1da7a6c95c3d3a70e46913d3
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);

CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) not null,
record_time TIMESTAMP not null,
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,

application_id NVARCHAR2(1000) not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,
application_id NVARCHAR2(1000) NOT NULL,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
This conversation was marked as resolved.
Show resolved Hide resolved
rejection_status_code INTEGER, -- null for successful command and checkpoints
rejection_status_message CLOB, -- null for successful command and checkpoints
rejection_status BLOB -- null for successful command and checkpoints
This conversation was marked as resolved.
Show resolved Hide resolved
This conversation was marked as resolved.
Show resolved Hide resolved
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
454de41175e6f9d5b4a7b125ef06a57115342384b39898f85208cce572a1d4d4
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`.

-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver). We do not migrate the `rejection_status_code` and `rejection_status_message` columns,
-- and so they may contain old data, which means we need to keep them.
This conversation was marked as resolved.
Show resolved Hide resolved

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status BYTEA;
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
rejection_status: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
rejection_status = Some(u.reasonTemplate.status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +268,7 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,46 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

def queryStrategy: QueryStrategy

private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
private val rejectedCommandParser: RowParser[CompletionStreamResponse] = {
val parserWithCodeAndMessage = sharedColumns ~
int("rejection_status_code") ~ str("rejection_status_message") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage =>
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we have tests somewhere to test this part?
normally it would not be tested because on the HEAD we would always populate the rejection_status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rautenrieth-da did you maybe encountered a test suit which would test this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are very bad at testing migrations. I have no idea how we would go about testing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compatibility package tests migrations in a way. It does:

  1. install an old SDK version
  2. push some data into a sandbox ledger, using the ledger API
  3. upgrade to the next SDK version (up to and including the head version)
  4. use the ledger API to check that the ledger content still looks the same
  5. go to step 2

The application that uploads and checks data is a custom client application written in Haskell. There are checks for comparing the ACS and contract visibility due to divulgence. It could probably be extended to compare completions as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but do we also test rejections this way? Never mind me, it will probably work...although we would not necessarily would have here dead code if we follow @miklos-da useful suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we test rejections, but we could extend the compatibility tests to cover them.

val parserWithProtobufStatus = sharedColumns ~
binaryStream("rejection_status") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
parserWithCodeAndMessage | parserWithProtobufStatus
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -55,8 +65,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
rejection_status_code,
rejection_status_message,
rejection_status
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.daml.platform.store.appendonlydao.events.{
}
import com.daml.platform.store.dao.DeduplicationKeyMaker
import com.google.protobuf.ByteString
import com.google.rpc.status.{Status => RpcStatus}
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.scalactic.TripleEquals._
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -258,12 +258,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optDeduplicationPeriod = None,
submissionId = someSubmissionId,
)
val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
val update = state.Update.CommandRejected(
someRecordTime,
completionInfo,
new state.Update.CommandRejected.FinalReason(
RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
),
new state.Update.CommandRejected.FinalReason(status),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
Expand All @@ -277,8 +276,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = Set(someParty),
command_id = someCommandId,
transaction_id = None,
status_code = Some(Status.Code.ABORTED.value()),
status_message = Some("test reason"),
rejection_status = Some(status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -348,8 +346,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -415,8 +412,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -494,8 +490,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -573,8 +568,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -735,8 +729,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -831,8 +824,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -951,8 +943,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -1048,8 +1039,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -1122,8 +1112,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down