Skip to content

Commit

Permalink
participant-integration-api: Store a status gRPC protobuf.
Browse files Browse the repository at this point in the history
Instead of storing the status code and message, we store a serialized
`google.rpc.Status` protocol buffers message. This allows us to pass
through any additional information reported by the driver `ReadService`.

The migration is only done for the append-only database, and preserves
old data in the existing columns. New data will only be written to the
new column.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar committed Aug 17, 2021
1 parent 2527ec8 commit 1fd2a37
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
e64e1bc672d79af26e86ca8e495a31c6ef026ae6c6df86b3101d9d942954e745
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,16 @@ CREATE TABLE participant_command_completions (
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
-- The rejection status is `NULL` if the completion is for an accepted transaction.
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver). The `rejection_status_code` and `rejection_status_message` columns will always be
-- `NULL` in an H2-backed index, but we keep them for parity with old data in other databases.
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status BLOB
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
525584affccdd5c5a6b20eac660b068ba1db392c1da7a6c95c3d3a70e46913d3
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);

CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) not null,
record_time TIMESTAMP not null,
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,

application_id NVARCHAR2(1000) not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,
application_id NVARCHAR2(1000) NOT NULL,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
rejection_status_code INTEGER, -- null for successful command and checkpoints
rejection_status_message CLOB, -- null for successful command and checkpoints
rejection_status BLOB -- null for successful command and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
454de41175e6f9d5b4a7b125ef06a57115342384b39898f85208cce572a1d4d4
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`.

-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver). We do not migrate the `rejection_status_code` and `rejection_status_message` columns,
-- and so they may contain old data, which means we need to keep them.

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status BYTEA;
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
rejection_status: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
rejection_status = Some(u.reasonTemplate.status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +268,7 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,46 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

def queryStrategy: QueryStrategy

private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
private val rejectedCommandParser: RowParser[CompletionStreamResponse] = {
val parserWithCodeAndMessage = sharedColumns ~
int("rejection_status_code") ~ str("rejection_status_message") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage =>
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
val parserWithProtobufStatus = sharedColumns ~
binaryStream("rejection_status") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
parserWithCodeAndMessage | parserWithProtobufStatus
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -55,8 +65,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
rejection_status_code,
rejection_status_message,
rejection_status
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.daml.platform.store.appendonlydao.events.{
}
import com.daml.platform.store.dao.DeduplicationKeyMaker
import com.google.protobuf.ByteString
import com.google.rpc.status.{Status => RpcStatus}
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.scalactic.TripleEquals._
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -258,12 +258,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optDeduplicationPeriod = None,
submissionId = someSubmissionId,
)
val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
val update = state.Update.CommandRejected(
someRecordTime,
completionInfo,
new state.Update.CommandRejected.FinalReason(
RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
),
new state.Update.CommandRejected.FinalReason(status),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
Expand All @@ -277,8 +276,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = Set(someParty),
command_id = someCommandId,
transaction_id = None,
status_code = Some(Status.Code.ABORTED.value()),
status_message = Some("test reason"),
rejection_status = Some(status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -348,8 +346,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -415,8 +412,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -494,8 +490,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -573,8 +568,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -735,8 +729,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -831,8 +824,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -951,8 +943,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -1048,8 +1039,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down Expand Up @@ -1122,8 +1112,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
),
)
}
Expand Down

0 comments on commit 1fd2a37

Please sign in to comment.