Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
Prev Previous commit
Next Next commit
participant-integration-api: Store the rejection status as 3 columns.
Serializing the details but keeping the code and message columns
populated.
  • Loading branch information
SamirTalwar authored Aug 18, 2021
commit bacc41ff6e73b4dfb3d5a61107a8c074cf6ca324
15 changes: 15 additions & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
load("@oracle//:index.bzl", "oracle_testing")
load("@os_info//:os_info.bzl", "is_windows")
load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix")
load("//bazel_tools:proto.bzl", "proto_jars")
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
Expand All @@ -16,7 +17,20 @@ load(
load("//bazel_tools:pom_file.bzl", "pom_file")
load("//rules_daml:daml.bzl", "daml_compile")

proto_jars(
name = "participant-integration-api-proto",
srcs = glob(["src/main/protobuf/**/*.proto"]),
maven_artifact_prefix = "participant-integration-api",
maven_group = "com.daml",
strip_import_prefix = "src/main/protobuf",
visibility = ["//visibility:public"],
deps = [
"@com_google_protobuf//:any_proto",
],
)

compile_deps = [
":participant-integration-api-proto_scala",
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/archive:daml_lf_dev_archive_proto_java",
"//daml-lf/data",
Expand Down Expand Up @@ -238,6 +252,7 @@ da_scala_test_suite(
],
deps = [
":participant-integration-api",
":participant-integration-api-proto_scala",
":participant-integration-api-tests-lib",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/archive:daml_lf_archive_reader",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

// Serialization format for Protocol Buffers values stored in the index.
//
// WARNING:
// As all messages declared here represent values stored to the index database, we MUST ensure that
// they remain backwards-compatible forever.

syntax = "proto3";

package daml.platform.index;

option java_package = "com.daml.platform.index";

import "google/protobuf/any.proto";

// Serialized status details, conveyed from the driver `ReadService` to the ledger API client.
// To be combined with a status code and message.
message StatusDetails {
repeated google.protobuf.Any details = 1;
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d5d135c634d971cd4a4b67d81bb28da4a0ea01c94070675b24b990ffcf285321
e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,12 @@ CREATE TABLE participant_command_completions (
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
-- The three columns below are `NULL` if the completion is for an accepted transaction.
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver).
-- The `rejection_status_code` and `rejection_status_message` columns will always be
-- `NULL` in an H2-backed index, as data is not persisted across ledger restarts. However, we
-- keep them for parity with old data in other databases.
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status BYTEA
rejection_status_details BYTEA
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7cd06324d0b97f384401b77a0d0af4bf7e949427c812097bc5c565f68c8bfee6
16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ CREATE TABLE participant_command_completions
transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints
rejection_status_code INTEGER, -- null for accepted transactions and checkpoints
rejection_status_message CLOB, -- null for accepted transactions and checkpoints
rejection_status BLOB -- null for accepted transactions and checkpoints
rejection_status_details BLOB -- null for accepted transactions and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e97a66e8f6431ff0dd19d48b46bbe1b90ef7d840be9145e73a567107de06db04
824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`.
-- Adds a column to store extra details for the rejection status.
--
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver).
--
-- We only rename the `rejection_status_code` and `rejection_status_message` columns, and so they
-- may contain historical data. Readers of this table will therefore need to query for all three
-- columns and either use the `google.rpc.Status` message in `rejection_status` if it exists, or
-- construct one from the `rejection_status_code` and `rejection_status_message` if it does not.
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status BYTEA;
ADD COLUMN rejection_status_details BYTEA;
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
rejection_status: Option[Array[Byte]],
rejection_status_code: Option[Int],
rejection_status_message: Option[String],
rejection_status_details: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.lf.engine.Blinding
import com.daml.lf.ledger.EventId
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.dao.DeduplicationKeyMaker
Expand All @@ -34,7 +35,10 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
rejection_status = Some(u.reasonTemplate.status.toByteArray),
rejection_status_code = Some(u.reasonTemplate.code),
rejection_status_message = Some(u.reasonTemplate.message),
rejection_status_details =
Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -268,7 +272,9 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import com.daml.ledger.api.v1.command_completion_service.CompletionStreamRespons
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
Expand All @@ -31,21 +31,19 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] = {
val parserWithCodeAndMessage = sharedColumns ~
int("rejection_status_code") ~ str("rejection_status_message") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage =>
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty)
private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~
int("rejection_status_code") ~
str("rejection_status_message") ~
binaryStream("rejection_status_details").? map {
case offset ~ recordTime ~ commandId ~
rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails =>
val details = rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.getOrElse(Seq.empty)
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
val parserWithProtobufStatus = sharedColumns ~
binaryStream("rejection_status") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
parserWithCodeAndMessage | parserWithProtobufStatus
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -67,7 +65,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
transaction_id,
rejection_status_code,
rejection_status_message,
rejection_status
rejection_status_details
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status),
"rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code),
"rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message),
"rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.daml.lf.transaction.BlindingInfo
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent
import com.daml.platform.store.appendonlydao.events.{
Expand Down Expand Up @@ -276,7 +277,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = Set(someParty),
command_id = someCommandId,
transaction_id = None,
rejection_status = Some(status.toByteArray),
rejection_status_code = Some(status.code),
rejection_status_message = Some(status.message),
rejection_status_details = Some(StatusDetails.of(status.details).toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -346,7 +349,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -412,7 +417,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -490,7 +497,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -568,7 +577,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -729,7 +740,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -824,7 +837,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -943,7 +958,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -1039,7 +1056,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down Expand Up @@ -1112,7 +1131,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
rejection_status = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
),
)
}
Expand Down