From 66e9e9366ae985a7f901274f6e7d5419a94aaf2e Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Mon, 16 Aug 2021 17:17:09 +0200 Subject: [PATCH 1/7] participant-integration-api: Construct completions in one place. --- .../store/CompletionFromTransaction.scala | 51 ++++++++++++------- .../CompletionStorageBackendTemplate.scala | 16 ++---- .../store/dao/CommandCompletionsTable.scala | 16 ++---- 3 files changed, 44 insertions(+), 39 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala index b2250d5ea409..b4b809f875c8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala @@ -14,20 +14,15 @@ import com.daml.lf.data.Ref import com.daml.platform.ApiOffset.ApiOffsetConverter import com.daml.platform.store.Conversions.RejectionReasonOps import com.daml.platform.store.entries.LedgerEntry -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} +import io.grpc.Status // Turn a stream of transactions into a stream of completions for a given application and set of parties // TODO Restrict the scope of this to com.daml.platform.store.dao when // TODO - the in-memory sandbox is gone private[platform] object CompletionFromTransaction { - - def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] = - Some( - Checkpoint( - recordTime = Some(fromInstant(recordTime)), - offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))), - ) - ) + private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty) + private val RejectionTransactionId = "" // Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId // This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package). @@ -53,17 +48,39 @@ private[platform] object CompletionFromTransaction { _, ), ) if actAs.exists(parties) => - offset -> CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - Seq(Completion(commandId, Some(Status()), transactionId)), - ) + offset -> acceptedCompletion(recordTime, offset, commandId, transactionId) case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)) if actAs.exists(parties) => val status = reason.toParticipantStateRejectionReason.status - offset -> CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - Seq(Completion(commandId, Some(status))), - ) + offset -> rejectedCompletion(recordTime, offset, commandId, status) } + + def acceptedCompletion( + recordTime: Instant, + offset: Offset, + commandId: String, + transactionId: String, + ): CompletionStreamResponse = + CompletionStreamResponse.of( + checkpoint = Some(toApiCheckpoint(recordTime, offset)), + completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)), + ) + + def rejectedCompletion( + recordTime: Instant, + offset: Offset, + commandId: String, + status: StatusProto, + ): CompletionStreamResponse = + CompletionStreamResponse.of( + checkpoint = Some(toApiCheckpoint(recordTime, offset)), + completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)), + ) + + private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint = + Checkpoint.of( + recordTime = Some(fromInstant(recordTime)), + offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))), + ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala index 0e58bdcb4220..f9be3f0ca228 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala @@ -9,14 +9,13 @@ import java.time.Instant import anorm.{RowParser, ~} import anorm.SqlParser.{int, str} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse -import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref import com.daml.lf.data.Ref.Party -import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint +import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions.{instant, offset} import com.daml.platform.store.backend.CompletionStorageBackend -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} trait CompletionStorageBackendTemplate extends CompletionStorageBackend { @@ -28,19 +27,14 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { private val acceptedCommandParser: RowParser[CompletionStreamResponse] = sharedCompletionColumns ~ str("transaction_id") map { case offset ~ recordTime ~ commandId ~ transactionId => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status()), transactionId)), - ) + CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } private val rejectedCommandParser: RowParser[CompletionStreamResponse] = sharedCompletionColumns ~ int("status_code") ~ str("status_message") map { case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))), - ) + val status = StatusProto.of(statusCode, statusMessage, Seq.empty) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) } private val completionParser: RowParser[CompletionStreamResponse] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala b/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala index 5e106df1de80..ae1dca0a71d4 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/dao/CommandCompletionsTable.scala @@ -7,13 +7,12 @@ import java.time.Instant import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse -import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref -import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint +import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions._ import com.daml.platform.store.dao.events.SqlFunctions -import com.google.rpc.status.Status +import com.google.rpc.status.{Status => StatusProto} private[platform] object CommandCompletionsTable { @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable { private val acceptedCommandParser: RowParser[CompletionStreamResponse] = sharedColumns ~ str("transaction_id") map { case offset ~ recordTime ~ commandId ~ transactionId => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status()), transactionId)), - ) + CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } private val rejectedCommandParser: RowParser[CompletionStreamResponse] = sharedColumns ~ int("status_code") ~ str("status_message") map { case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage => - CompletionStreamResponse( - checkpoint = toApiCheckpoint(recordTime, offset), - completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))), - ) + val status = StatusProto.of(statusCode, statusMessage, Seq.empty) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) } val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser From 2527ec8b39cfc4c0b8081b17cb1d5cbec427f303 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Mon, 16 Aug 2021 17:51:30 +0200 Subject: [PATCH 2/7] sandbox-classic: Inline `CompletionFromTransaction#apply`. It's only used here; there's no reason to keep it in the _participant-integration-api_. --- .../store/CompletionFromTransaction.scala | 35 ------------- .../ledger/inmemory/InMemoryLedger.scala | 52 ++++++++++++++++--- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala index b4b809f875c8..e51780b71009 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/CompletionFromTransaction.scala @@ -10,10 +10,7 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.offset.Offset -import com.daml.lf.data.Ref import com.daml.platform.ApiOffset.ApiOffsetConverter -import com.daml.platform.store.Conversions.RejectionReasonOps -import com.daml.platform.store.entries.LedgerEntry import com.google.rpc.status.{Status => StatusProto} import io.grpc.Status @@ -24,38 +21,6 @@ private[platform] object CompletionFromTransaction { private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty) private val RejectionTransactionId = "" - // Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId - // This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package). - // But for an api server that is part of a distributed ledger network, we might see - // transactions that originated from some other api server. These transactions don't contain the submitter information, - // and therefore we don't emit CommandAccepted completions for those - def apply( - appId: Ref.ApplicationId, - parties: Set[Ref.Party], - ): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = { - case ( - offset, - LedgerEntry.Transaction( - Some(commandId), - transactionId, - Some(`appId`), - _, - actAs, - _, - _, - recordTime, - _, - _, - ), - ) if actAs.exists(parties) => - offset -> acceptedCompletion(recordTime, offset, commandId, transactionId) - - case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)) - if actAs.exists(parties) => - val status = reason.toParticipantStateRejectionReason.status - offset -> rejectedCompletion(recordTime, offset, commandId, status) - } - def acceptedCompletion( recordTime: Instant, offset: Offset, diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index a88abd9c8de5..269dfb2e2fbe 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -56,6 +56,7 @@ import com.daml.platform.sandbox.stores.ledger.inmemory.InMemoryLedger._ import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection} import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Contract.ActiveContract +import com.daml.platform.store.Conversions.RejectionReasonOps import com.daml.platform.store.entries.{ ConfigurationEntry, LedgerEntry, @@ -166,13 +167,50 @@ private[sandbox] final class InMemoryLedger( endInclusive: Option[Offset], applicationId: ApplicationId, parties: Set[Ref.Party], - )(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] = - entries - .getSource(startExclusive, endInclusive) - .collect { case (offset, InMemoryLedgerEntry(entry)) => - (offset, entry) - } - .collect(CompletionFromTransaction(applicationId.unwrap, parties)) + )(implicit + loggingContext: LoggingContext + ): Source[(Offset, CompletionStreamResponse), NotUsed] = { + val appId = applicationId.unwrap + entries.getSource(startExclusive, endInclusive).collect { + case ( + offset, + InMemoryLedgerEntry( + LedgerEntry.Transaction( + Some(commandId), + transactionId, + Some(`appId`), + _, + actAs, + _, + _, + recordTime, + _, + _, + ) + ), + ) if actAs.exists(parties) => + offset -> CompletionFromTransaction.acceptedCompletion( + recordTime, + offset, + commandId, + transactionId, + ) + + case ( + offset, + InMemoryLedgerEntry( + LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason) + ), + ) if actAs.exists(parties) => + val status = reason.toParticipantStateRejectionReason.status + offset -> CompletionFromTransaction.rejectedCompletion( + recordTime, + offset, + commandId, + status, + ) + } + } override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = entries.ledgerEnd From 1fd2a372b03de8e03f286e795d335a93014e2dbc Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Tue, 17 Aug 2021 14:53:34 +0200 Subject: [PATCH 3/7] participant-integration-api: Store a status gRPC protobuf. Instead of storing the status code and message, we store a serialized `google.rpc.Status` protocol buffers message. This allows us to pass through any additional information reported by the driver `ReadService`. The migration is only done for the append-only database, and preserves old data in the existing columns. New data will only be written to the new column. CHANGELOG_BEGIN CHANGELOG_END --- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 11 +++++- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 17 +++++---- ...__add_rejection_status_proto_column.sha256 | 1 + ...106__add_rejection_status_proto_column.sql | 16 ++++++++ .../scala/platform/store/backend/DbDto.scala | 3 +- .../store/backend/UpdateToDbDto.scala | 6 +-- .../CompletionStorageBackendTemplate.scala | 33 +++++++++++------ .../store/backend/common/Schema.scala | 3 +- .../store/backend/UpdateToDbDtoSpec.scala | 37 +++++++------------ 11 files changed, 76 insertions(+), 55 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index e4020ae8bed1..f531d52671f7 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823 +e64e1bc672d79af26e86ca8e495a31c6ef026ae6c6df86b3101d9d942954e745 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index b7adac9311d7..a1870fd4ba19 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -121,9 +121,16 @@ CREATE TABLE participant_command_completions ( application_id VARCHAR NOT NULL, submitters ARRAY NOT NULL, command_id VARCHAR NOT NULL, + -- The transaction ID is `NULL` for rejected transactions. transaction_id VARCHAR, - status_code INTEGER, - status_message VARCHAR + -- The rejection status is `NULL` if the completion is for an accepted transaction. + -- The `rejection_status` contains a Protocol-Buffers-serialized message of type + -- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger + -- driver). The `rejection_status_code` and `rejection_status_message` columns will always be + -- `NULL` in an H2-backed index, but we keep them for parity with old data in other databases. + rejection_status_code INTEGER, + rejection_status_message VARCHAR, + rejection_status BLOB ); CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 index 8bfc6d49dbe4..e0c575d079ff 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080 +525584affccdd5c5a6b20eac660b068ba1db392c1da7a6c95c3d3a70e46913d3 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql index 1570b88d6bbc..a1a166ae9488 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id); CREATE TABLE participant_command_completions ( - completion_offset VARCHAR2(4000) not null, - record_time TIMESTAMP not null, + completion_offset VARCHAR2(4000) NOT NULL, + record_time TIMESTAMP NOT NULL, - application_id NVARCHAR2(1000) not null, - submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON), - command_id NVARCHAR2(1000) not null, + application_id NVARCHAR2(1000) NOT NULL, + submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON), + command_id NVARCHAR2(1000) NOT NULL, - transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints - status_code INTEGER, -- null for successful command and checkpoints - status_message CLOB -- null for successful command and checkpoints + transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints + rejection_status_code INTEGER, -- null for successful command and checkpoints + rejection_status_message CLOB, -- null for successful command and checkpoints + rejection_status BLOB -- null for successful command and checkpoints ); CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 new file mode 100644 index 000000000000..30b698864f05 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 @@ -0,0 +1 @@ +454de41175e6f9d5b4a7b125ef06a57115342384b39898f85208cce572a1d4d4 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql new file mode 100644 index 000000000000..84774d65b44e --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql @@ -0,0 +1,16 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`. + +-- The `rejection_status` contains a Protocol-Buffers-serialized message of type +-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger +-- driver). We do not migrate the `rejection_status_code` and `rejection_status_message` columns, +-- and so they may contain old data, which means we need to keep them. + +ALTER TABLE participant_command_completions + RENAME COLUMN status_code TO rejection_status_code; +ALTER TABLE participant_command_completions + RENAME COLUMN status_message TO rejection_status_message; +ALTER TABLE participant_command_completions + ADD COLUMN rejection_status BYTEA; diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala index 09a5c1b4e198..5e4ff722a332 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala @@ -133,8 +133,7 @@ object DbDto { submitters: Set[String], command_id: String, transaction_id: Option[String], - status_code: Option[Int], - status_message: Option[String], + rejection_status: Option[Array[Byte]], ) extends DbDto final case class CommandDeduplication(deduplication_key: String) extends DbDto diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala index 005a7f68b5d5..c2965d04b433 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala @@ -34,8 +34,7 @@ object UpdateToDbDto { submitters = u.completionInfo.actAs.toSet, command_id = u.completionInfo.commandId, transaction_id = None, - status_code = Some(u.reasonTemplate.code), - status_message = Some(u.reasonTemplate.message), + rejection_status = Some(u.reasonTemplate.status.toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -269,8 +268,7 @@ object UpdateToDbDto { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(u.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala index f9be3f0ca228..d0fff569e2b7 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala @@ -6,8 +6,8 @@ package com.daml.platform.store.backend.common import java.sql.Connection import java.time.Instant +import anorm.SqlParser.{binaryStream, int, str} import anorm.{RowParser, ~} -import anorm.SqlParser.{int, str} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref @@ -15,27 +15,37 @@ import com.daml.lf.data.Ref.Party import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions.{instant, offset} import com.daml.platform.store.backend.CompletionStorageBackend +import com.google.protobuf.CodedInputStream import com.google.rpc.status.{Status => StatusProto} trait CompletionStorageBackendTemplate extends CompletionStorageBackend { def queryStrategy: QueryStrategy - private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] = + private val sharedColumns: RowParser[Offset ~ Instant ~ String] = offset("completion_offset") ~ instant("record_time") ~ str("command_id") private val acceptedCommandParser: RowParser[CompletionStreamResponse] = - sharedCompletionColumns ~ str("transaction_id") map { + sharedColumns ~ str("transaction_id") map { case offset ~ recordTime ~ commandId ~ transactionId => CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } - private val rejectedCommandParser: RowParser[CompletionStreamResponse] = - sharedCompletionColumns ~ int("status_code") ~ str("status_message") map { - case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage => - val status = StatusProto.of(statusCode, statusMessage, Seq.empty) - CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) - } + private val rejectedCommandParser: RowParser[CompletionStreamResponse] = { + val parserWithCodeAndMessage = sharedColumns ~ + int("rejection_status_code") ~ str("rejection_status_message") map { + case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage => + val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) + } + val parserWithProtobufStatus = sharedColumns ~ + binaryStream("rejection_status") map { + case offset ~ recordTime ~ commandId ~ rejectionStatusStream => + val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream)) + CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) + } + parserWithCodeAndMessage | parserWithProtobufStatus + } private val completionParser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser @@ -55,8 +65,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { record_time, command_id, transaction_id, - status_code, - status_message + rejection_status_code, + rejection_status_message, + rejection_status FROM participant_command_completions WHERE diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala index 5214c23ad3f3..e675be527c6a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala @@ -225,8 +225,7 @@ private[backend] object AppendOnlySchema { "submitters" -> fieldStrategy.stringArray(_.submitters), "command_id" -> fieldStrategy.string(_.command_id), "transaction_id" -> fieldStrategy.stringOptional(_.transaction_id), - "status_code" -> fieldStrategy.intOptional(_.status_code), - "status_message" -> fieldStrategy.stringOptional(_.status_message), + "rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status), ) val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] = diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala index 9edeb5afa20f..81f6c8d2579c 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala @@ -31,7 +31,7 @@ import com.daml.platform.store.appendonlydao.events.{ } import com.daml.platform.store.dao.DeduplicationKeyMaker import com.google.protobuf.ByteString -import com.google.rpc.status.{Status => RpcStatus} +import com.google.rpc.status.{Status => StatusProto} import io.grpc.Status import org.scalactic.TripleEquals._ import org.scalatest.matchers.should.Matchers @@ -258,12 +258,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { optDeduplicationPeriod = None, submissionId = someSubmissionId, ) + val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty) val update = state.Update.CommandRejected( someRecordTime, completionInfo, - new state.Update.CommandRejected.FinalReason( - RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty) - ), + new state.Update.CommandRejected.FinalReason(status), ) val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)( someOffset @@ -277,8 +276,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = Set(someParty), command_id = someCommandId, transaction_id = None, - status_code = Some(Status.Code.ABORTED.value()), - status_message = Some("test reason"), + rejection_status = Some(status.toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -348,8 +346,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -415,8 +412,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -494,8 +490,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -573,8 +568,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -735,8 +729,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -831,8 +824,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -951,8 +943,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -1048,8 +1039,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } @@ -1122,8 +1112,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - status_code = None, - status_message = None, + rejection_status = None, ), ) } From 5217ed4edb5265c17f8f91b9fb0fc2c883d39546 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Tue, 17 Aug 2021 19:28:34 +0200 Subject: [PATCH 4/7] participant-integration-api: Improve comments in migrations. Co-authored-by: Fabio Tudone --- .../V1__Append_only_schema.sha256 | 2 +- .../h2database-appendonly/V1__Append_only_schema.sql | 11 ++++++++--- .../oracle-appendonly/V1__Append_only_schema.sha256 | 2 +- .../oracle-appendonly/V1__Append_only_schema.sql | 11 +++++++---- .../V106__add_rejection_status_proto_column.sha256 | 2 +- .../V106__add_rejection_status_proto_column.sql | 10 +++++++--- 6 files changed, 25 insertions(+), 13 deletions(-) diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index f531d52671f7..6416bdab7f15 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -e64e1bc672d79af26e86ca8e495a31c6ef026ae6c6df86b3101d9d942954e745 +2c67203c14e8cd9a316f8a2787be30cf13ecb9eedcafa49876b8f9b63b99bc71 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index a1870fd4ba19..1d5f740ba003 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -123,11 +123,13 @@ CREATE TABLE participant_command_completions ( command_id VARCHAR NOT NULL, -- The transaction ID is `NULL` for rejected transactions. transaction_id VARCHAR, - -- The rejection status is `NULL` if the completion is for an accepted transaction. + -- The three columns below are `NULL` if the completion is for an accepted transaction. -- The `rejection_status` contains a Protocol-Buffers-serialized message of type -- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger - -- driver). The `rejection_status_code` and `rejection_status_message` columns will always be - -- `NULL` in an H2-backed index, but we keep them for parity with old data in other databases. + -- driver). + -- The `rejection_status_code` and `rejection_status_message` columns will always be + -- `NULL` in an H2-backed index, as data is not persisted across ledger restarts. However, we + -- keep them for parity with old data in other databases. rejection_status_code INTEGER, rejection_status_message VARCHAR, rejection_status BLOB @@ -325,6 +327,9 @@ CREATE INDEX participant_events_consuming_exercise_tree_event_witnesses_idx ON p -- lookup by contract id CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participant_events_consuming_exercise (contract_id); +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + --------------------------------------------------------------------------------------------------- -- Events table: non-consuming exercise --------------------------------------------------------------------------------------------------- diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 index e0c575d079ff..4870bfab48fe 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -525584affccdd5c5a6b20eac660b068ba1db392c1da7a6c95c3d3a70e46913d3 +7cd06324d0b97f384401b77a0d0af4bf7e949427c812097bc5c565f68c8bfee6 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql index a1a166ae9488..caf706e8cfe4 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql @@ -131,10 +131,10 @@ CREATE TABLE participant_command_completions submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON), command_id NVARCHAR2(1000) NOT NULL, - transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints - rejection_status_code INTEGER, -- null for successful command and checkpoints - rejection_status_message CLOB, -- null for successful command and checkpoints - rejection_status BLOB -- null for successful command and checkpoints + transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints + rejection_status_code INTEGER, -- null for accepted transactions and checkpoints + rejection_status_message CLOB, -- null for accepted transactions and checkpoints + rejection_status BLOB -- null for accepted transactions and checkpoints ); CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id); @@ -387,6 +387,9 @@ CREATE TABLE participant_events_non_consuming_exercise ( -- offset index: used to translate to sequential_id CREATE INDEX participant_events_non_consuming_exercise_event_offset ON participant_events_non_consuming_exercise(event_offset); +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + -- sequential_id index for paging CREATE INDEX participant_events_non_consuming_exercise_event_sequential_id ON participant_events_non_consuming_exercise(event_sequential_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 index 30b698864f05..e95657792dfa 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 @@ -1 +1 @@ -454de41175e6f9d5b4a7b125ef06a57115342384b39898f85208cce572a1d4d4 +e97a66e8f6431ff0dd19d48b46bbe1b90ef7d840be9145e73a567107de06db04 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql index 84774d65b44e..90b52fbe82b5 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql @@ -2,11 +2,15 @@ -- SPDX-License-Identifier: Apache-2.0 -- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`. - +-- -- The `rejection_status` contains a Protocol-Buffers-serialized message of type -- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger --- driver). We do not migrate the `rejection_status_code` and `rejection_status_message` columns, --- and so they may contain old data, which means we need to keep them. +-- driver). +-- +-- We only rename the `rejection_status_code` and `rejection_status_message` columns, and so they +-- may contain historical data. Readers of this table will therefore need to query for all three +-- columns and either use the `google.rpc.Status` message in `rejection_status` if it exists, or +-- construct one from the `rejection_status_code` and `rejection_status_message` if it does not. ALTER TABLE participant_command_completions RENAME COLUMN status_code TO rejection_status_code; From c7a74a09fe4e756842736f12645978b471e0ddbc Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 18 Aug 2021 09:20:19 +0200 Subject: [PATCH 5/7] participant-integration-api: Further improvements to migrations. --- .../h2database-appendonly/V1__Append_only_schema.sha256 | 2 +- .../h2database-appendonly/V1__Append_only_schema.sql | 5 +---- .../migration/oracle-appendonly/V1__Append_only_schema.sql | 3 --- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index 6416bdab7f15..0003e0fb00ec 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -2c67203c14e8cd9a316f8a2787be30cf13ecb9eedcafa49876b8f9b63b99bc71 +d5d135c634d971cd4a4b67d81bb28da4a0ea01c94070675b24b990ffcf285321 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 1d5f740ba003..867ff1808f7e 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -132,7 +132,7 @@ CREATE TABLE participant_command_completions ( -- keep them for parity with old data in other databases. rejection_status_code INTEGER, rejection_status_message VARCHAR, - rejection_status BLOB + rejection_status BYTEA ); CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id); @@ -327,9 +327,6 @@ CREATE INDEX participant_events_consuming_exercise_tree_event_witnesses_idx ON p -- lookup by contract id CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participant_events_consuming_exercise (contract_id); --- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. --- SPDX-License-Identifier: Apache-2.0 - --------------------------------------------------------------------------------------------------- -- Events table: non-consuming exercise --------------------------------------------------------------------------------------------------- diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql index caf706e8cfe4..34739b89d27e 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql @@ -387,9 +387,6 @@ CREATE TABLE participant_events_non_consuming_exercise ( -- offset index: used to translate to sequential_id CREATE INDEX participant_events_non_consuming_exercise_event_offset ON participant_events_non_consuming_exercise(event_offset); --- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. --- SPDX-License-Identifier: Apache-2.0 - -- sequential_id index for paging CREATE INDEX participant_events_non_consuming_exercise_event_sequential_id ON participant_events_non_consuming_exercise(event_sequential_id); From bacc41ff6e73b4dfb3d5a61107a8c074cf6ca324 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 18 Aug 2021 16:50:19 +0200 Subject: [PATCH 6/7] participant-integration-api: Store the rejection status as 3 columns. Serializing the details but keeping the code and message columns populated. --- .../participant-integration-api/BUILD.bazel | 15 +++++++ .../main/protobuf/daml/platform/index.proto | 22 ++++++++++ .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 11 ++--- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 2 +- ...__add_rejection_status_proto_column.sha256 | 2 +- ...106__add_rejection_status_proto_column.sql | 15 +++---- .../scala/platform/store/backend/DbDto.scala | 4 +- .../store/backend/UpdateToDbDto.scala | 10 ++++- .../CompletionStorageBackendTemplate.scala | 28 ++++++------- .../store/backend/common/Schema.scala | 4 +- .../store/backend/UpdateToDbDtoSpec.scala | 41 ++++++++++++++----- 13 files changed, 108 insertions(+), 50 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index ed24df7206f8..e96c3c8357c3 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -4,6 +4,7 @@ load("@oracle//:index.bzl", "oracle_testing") load("@os_info//:os_info.bzl", "is_windows") load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix") +load("//bazel_tools:proto.bzl", "proto_jars") load( "//bazel_tools:scala.bzl", "da_scala_binary", @@ -16,7 +17,20 @@ load( load("//bazel_tools:pom_file.bzl", "pom_file") load("//rules_daml:daml.bzl", "daml_compile") +proto_jars( + name = "participant-integration-api-proto", + srcs = glob(["src/main/protobuf/**/*.proto"]), + maven_artifact_prefix = "participant-integration-api", + maven_group = "com.daml", + strip_import_prefix = "src/main/protobuf", + visibility = ["//visibility:public"], + deps = [ + "@com_google_protobuf//:any_proto", + ], +) + compile_deps = [ + ":participant-integration-api-proto_scala", "//daml-lf/archive:daml_lf_archive_reader", "//daml-lf/archive:daml_lf_dev_archive_proto_java", "//daml-lf/data", @@ -238,6 +252,7 @@ da_scala_test_suite( ], deps = [ ":participant-integration-api", + ":participant-integration-api-proto_scala", ":participant-integration-api-tests-lib", "//bazel_tools/runfiles:scala_runfiles", "//daml-lf/archive:daml_lf_archive_reader", diff --git a/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto b/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto new file mode 100644 index 000000000000..089a9f529a54 --- /dev/null +++ b/ledger/participant-integration-api/src/main/protobuf/daml/platform/index.proto @@ -0,0 +1,22 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Serialization format for Protocol Buffers values stored in the index. +// +// WARNING: +// As all messages declared here represent values stored to the index database, we MUST ensure that +// they remain backwards-compatible forever. + +syntax = "proto3"; + +package daml.platform.index; + +option java_package = "com.daml.platform.index"; + +import "google/protobuf/any.proto"; + +// Serialized status details, conveyed from the driver `ReadService` to the ledger API client. +// To be combined with a status code and message. +message StatusDetails { + repeated google.protobuf.Any details = 1; +} diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index 0003e0fb00ec..4c3c9ff05901 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -d5d135c634d971cd4a4b67d81bb28da4a0ea01c94070675b24b990ffcf285321 +e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 867ff1808f7e..ba9d4b126454 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -124,15 +124,12 @@ CREATE TABLE participant_command_completions ( -- The transaction ID is `NULL` for rejected transactions. transaction_id VARCHAR, -- The three columns below are `NULL` if the completion is for an accepted transaction. - -- The `rejection_status` contains a Protocol-Buffers-serialized message of type - -- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger - -- driver). - -- The `rejection_status_code` and `rejection_status_message` columns will always be - -- `NULL` in an H2-backed index, as data is not persisted across ledger restarts. However, we - -- keep them for parity with old data in other databases. + -- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type + -- `daml.platform.index.StatusDetails`, containing the code, message, and further details + -- (decided by the ledger driver), and may be `NULL` even if the other two columns are set. rejection_status_code INTEGER, rejection_status_message VARCHAR, - rejection_status BYTEA + rejection_status_details BYTEA ); CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 index 4870bfab48fe..3cb235847dc1 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -7cd06324d0b97f384401b77a0d0af4bf7e949427c812097bc5c565f68c8bfee6 +16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4 diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql index 34739b89d27e..41fd17fe1a35 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V1__Append_only_schema.sql @@ -134,7 +134,7 @@ CREATE TABLE participant_command_completions transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints rejection_status_code INTEGER, -- null for accepted transactions and checkpoints rejection_status_message CLOB, -- null for accepted transactions and checkpoints - rejection_status BLOB -- null for accepted transactions and checkpoints + rejection_status_details BLOB -- null for accepted transactions and checkpoints ); CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 index e95657792dfa..d30ded888f67 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sha256 @@ -1 +1 @@ -e97a66e8f6431ff0dd19d48b46bbe1b90ef7d840be9145e73a567107de06db04 +824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql index 90b52fbe82b5..0c4befb83785 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V106__add_rejection_status_proto_column.sql @@ -1,20 +1,15 @@ -- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -- SPDX-License-Identifier: Apache-2.0 --- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`. +-- Adds a column to store extra details for the rejection status. -- --- The `rejection_status` contains a Protocol-Buffers-serialized message of type --- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger --- driver). --- --- We only rename the `rejection_status_code` and `rejection_status_message` columns, and so they --- may contain historical data. Readers of this table will therefore need to query for all three --- columns and either use the `google.rpc.Status` message in `rejection_status` if it exists, or --- construct one from the `rejection_status_code` and `rejection_status_message` if it does not. +-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type +-- `daml.platform.index.StatusDetails`, containing the code, message, and further details +-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set. ALTER TABLE participant_command_completions RENAME COLUMN status_code TO rejection_status_code; ALTER TABLE participant_command_completions RENAME COLUMN status_message TO rejection_status_message; ALTER TABLE participant_command_completions - ADD COLUMN rejection_status BYTEA; + ADD COLUMN rejection_status_details BYTEA; diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala index 5e4ff722a332..ac25caa19101 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala @@ -133,7 +133,9 @@ object DbDto { submitters: Set[String], command_id: String, transaction_id: Option[String], - rejection_status: Option[Array[Byte]], + rejection_status_code: Option[Int], + rejection_status_message: Option[String], + rejection_status_details: Option[Array[Byte]], ) extends DbDto final case class CommandDeduplication(deduplication_key: String) extends DbDto diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala index c2965d04b433..01c35fdb2e3a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/UpdateToDbDto.scala @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref import com.daml.lf.engine.Blinding import com.daml.lf.ledger.EventId +import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.daml.platform.store.appendonlydao.events._ import com.daml.platform.store.dao.DeduplicationKeyMaker @@ -34,7 +35,10 @@ object UpdateToDbDto { submitters = u.completionInfo.actAs.toSet, command_id = u.completionInfo.commandId, transaction_id = None, - rejection_status = Some(u.reasonTemplate.status.toByteArray), + rejection_status_code = Some(u.reasonTemplate.code), + rejection_status_message = Some(u.reasonTemplate.message), + rejection_status_details = + Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -268,7 +272,9 @@ object UpdateToDbDto { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(u.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala index d0fff569e2b7..659c91c218f1 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala @@ -12,10 +12,10 @@ import com.daml.ledger.api.v1.command_completion_service.CompletionStreamRespons import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref import com.daml.lf.data.Ref.Party +import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.CompletionFromTransaction import com.daml.platform.store.Conversions.{instant, offset} import com.daml.platform.store.backend.CompletionStorageBackend -import com.google.protobuf.CodedInputStream import com.google.rpc.status.{Status => StatusProto} trait CompletionStorageBackendTemplate extends CompletionStorageBackend { @@ -31,21 +31,19 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId) } - private val rejectedCommandParser: RowParser[CompletionStreamResponse] = { - val parserWithCodeAndMessage = sharedColumns ~ - int("rejection_status_code") ~ str("rejection_status_message") map { - case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage => - val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty) + private val rejectedCommandParser: RowParser[CompletionStreamResponse] = + sharedColumns ~ + int("rejection_status_code") ~ + str("rejection_status_message") ~ + binaryStream("rejection_status_details").? map { + case offset ~ recordTime ~ commandId ~ + rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails => + val details = rejectionStatusDetails + .map(stream => StatusDetails.parseFrom(stream).details) + .getOrElse(Seq.empty) + val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details) CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) } - val parserWithProtobufStatus = sharedColumns ~ - binaryStream("rejection_status") map { - case offset ~ recordTime ~ commandId ~ rejectionStatusStream => - val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream)) - CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status) - } - parserWithCodeAndMessage | parserWithProtobufStatus - } private val completionParser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser @@ -67,7 +65,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { transaction_id, rejection_status_code, rejection_status_message, - rejection_status + rejection_status_details FROM participant_command_completions WHERE diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala index e675be527c6a..e76a5d2ac67a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/Schema.scala @@ -225,7 +225,9 @@ private[backend] object AppendOnlySchema { "submitters" -> fieldStrategy.stringArray(_.submitters), "command_id" -> fieldStrategy.string(_.command_id), "transaction_id" -> fieldStrategy.stringOptional(_.transaction_id), - "rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status), + "rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code), + "rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message), + "rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details), ) val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] = diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala index 81f6c8d2579c..5a69df559e3e 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/backend/UpdateToDbDtoSpec.scala @@ -18,6 +18,7 @@ import com.daml.lf.transaction.BlindingInfo import com.daml.lf.transaction.test.TransactionBuilder import com.daml.lf.value.Value import com.daml.logging.LoggingContext +import com.daml.platform.index.index.StatusDetails import com.daml.platform.store.appendonlydao.JdbcLedgerDao import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent import com.daml.platform.store.appendonlydao.events.{ @@ -276,7 +277,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = Set(someParty), command_id = someCommandId, transaction_id = None, - rejection_status = Some(status.toByteArray), + rejection_status_code = Some(status.code), + rejection_status_message = Some(status.message), + rejection_status_details = Some(StatusDetails.of(status.details).toByteArray), ), DbDto.CommandDeduplication( DeduplicationKeyMaker.make( @@ -346,7 +349,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -412,7 +417,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -490,7 +497,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -568,7 +577,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -729,7 +740,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -824,7 +837,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -943,7 +958,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -1039,7 +1056,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } @@ -1112,7 +1131,9 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers { submitters = completionInfo.actAs.toSet, command_id = completionInfo.commandId, transaction_id = Some(update.transactionId), - rejection_status = None, + rejection_status_code = None, + rejection_status_message = None, + rejection_status_details = None, ), ) } From 162772bde3fc5b457778d82cb2c7f73b6a53503b Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 18 Aug 2021 18:31:26 +0200 Subject: [PATCH 7/7] participant-integration-api: Publish the indexer protobuf to Maven. --- release/artifacts.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/release/artifacts.yaml b/release/artifacts.yaml index 006ce9174bc0..ce9d6bae69ec 100644 --- a/release/artifacts.yaml +++ b/release/artifacts.yaml @@ -155,6 +155,8 @@ type: jar-scala - target: //ledger/participant-integration-api:participant-integration-api type: jar-scala +- target: //ledger/participant-integration-api:participant-integration-api-proto_scala + type: jar-scala - target: //ledger/participant-state:participant-state type: jar-scala - target: //ledger/participant-state/kvutils:daml_kvutils_proto_jar