Skip to content

Commit

Permalink
participant-integration-api: Store a status gRPC protobuf. [KVL-1005] (
Browse files Browse the repository at this point in the history
…#10600)

* participant-integration-api: Construct completions in one place.

* sandbox-classic: Inline `CompletionFromTransaction#apply`.

It's only used here; there's no reason to keep it in the
_participant-integration-api_.

* participant-integration-api: Store a status gRPC protobuf.

Instead of storing the status code and message, we store a serialized
`google.rpc.Status` protocol buffers message. This allows us to pass
through any additional information reported by the driver `ReadService`.

The migration is only done for the append-only database, and preserves
old data in the existing columns. New data will only be written to the
new column.

CHANGELOG_BEGIN
CHANGELOG_END

* participant-integration-api: Improve comments in migrations.

Co-authored-by: Fabio Tudone <fabio.tudone@digitalasset.com>

* participant-integration-api: Further improvements to migrations.

* participant-integration-api: Store the rejection status as 3 columns.

Serializing the details but keeping the code and message columns
populated.

* participant-integration-api: Publish the indexer protobuf to Maven.

Co-authored-by: Fabio Tudone <fabio.tudone@digitalasset.com>
  • Loading branch information
SamirTalwar and fabiotudone-da authored Aug 18, 2021
1 parent 0af5b49 commit c38703e
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 127 deletions.
15 changes: 15 additions & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
load("@oracle//:index.bzl", "oracle_testing")
load("@os_info//:os_info.bzl", "is_windows")
load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix")
load("//bazel_tools:proto.bzl", "proto_jars")
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
Expand All @@ -16,7 +17,20 @@ load(
load("//bazel_tools:pom_file.bzl", "pom_file")
load("//rules_daml:daml.bzl", "daml_compile")

proto_jars(
name = "participant-integration-api-proto",
srcs = glob(["src/main/protobuf/**/*.proto"]),
maven_artifact_prefix = "participant-integration-api",
maven_group = "com.daml",
strip_import_prefix = "src/main/protobuf",
visibility = ["//visibility:public"],
deps = [
"@com_google_protobuf//:any_proto",
],
)

compile_deps = [
":participant-integration-api-proto_scala",
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/archive:daml_lf_dev_archive_proto_java",
"//daml-lf/data",
Expand Down Expand Up @@ -238,6 +252,7 @@ da_scala_test_suite(
],
deps = [
":participant-integration-api",
":participant-integration-api-proto_scala",
":participant-integration-api-tests-lib",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/archive:daml_lf_archive_reader",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

// Serialization format for Protocol Buffers values stored in the index.
//
// WARNING:
// As all messages declared here represent values stored to the index database, we MUST ensure that
// they remain backwards-compatible forever.

syntax = "proto3";

package daml.platform.index;

option java_package = "com.daml.platform.index";

import "google/protobuf/any.proto";

// Serialized status details, conveyed from the driver `ReadService` to the ledger API client.
// To be combined with a status code and message.
message StatusDetails {
repeated google.protobuf.Any details = 1;
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,15 @@ CREATE TABLE participant_command_completions (
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
-- The three columns below are `NULL` if the completion is for an accepted transaction.
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status_details BYTEA
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);

CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) not null,
record_time TIMESTAMP not null,
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,

application_id NVARCHAR2(1000) not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,
application_id NVARCHAR2(1000) NOT NULL,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints
rejection_status_code INTEGER, -- null for accepted transactions and checkpoints
rejection_status_message CLOB, -- null for accepted transactions and checkpoints
rejection_status_details BLOB -- null for accepted transactions and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Adds a column to store extra details for the rejection status.
--
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status_details BYTEA;
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,42 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status

// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Restrict the scope of this to com.daml.platform.store.dao when
// TODO - the in-memory sandbox is gone
private[platform] object CompletionFromTransaction {
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
private val RejectionTransactionId = ""

def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] =
Some(
Checkpoint(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))),
)
def acceptedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
transactionId: String,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)),
)

// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
def apply(
appId: Ref.ApplicationId,
parties: Set[Ref.Party],
): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = {
case (
offset,
LedgerEntry.Transaction(
Some(commandId),
transactionId,
Some(`appId`),
_,
actAs,
_,
_,
recordTime,
_,
_,
),
) if actAs.exists(parties) =>
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(Status()), transactionId)),
)
def rejectedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
status: StatusProto,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)),
)

case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(status))),
)
}
private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint =
Checkpoint.of(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
rejection_status_code: Option[Int],
rejection_status_message: Option[String],
rejection_status_details: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.lf.engine.Blinding
import com.daml.lf.ledger.EventId
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.dao.DeduplicationKeyMaker
Expand All @@ -34,8 +35,10 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
rejection_status_code = Some(u.reasonTemplate.code),
rejection_status_message = Some(u.reasonTemplate.message),
rejection_status_details =
Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +272,9 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,44 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

def queryStrategy: QueryStrategy

private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
}
sharedColumns ~
int("rejection_status_code") ~
str("rejection_status_message") ~
binaryStream("rejection_status_details").? map {
case offset ~ recordTime ~ commandId ~
rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails =>
val details = rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.getOrElse(Seq.empty)
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -61,8 +63,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
rejection_status_code,
rejection_status_message,
rejection_status_details
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code),
"rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message),
"rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import java.time.Instant

import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

private[platform] object CommandCompletionsTable {

Expand All @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable {
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser
Expand Down
Loading

0 comments on commit c38703e

Please sign in to comment.