Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
15 changes: 15 additions & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
load("@oracle//:index.bzl", "oracle_testing")
load("@os_info//:os_info.bzl", "is_windows")
load("@scala_version//:index.bzl", "scala_major_version", "scala_major_version_suffix")
load("//bazel_tools:proto.bzl", "proto_jars")
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
Expand All @@ -16,7 +17,20 @@ load(
load("//bazel_tools:pom_file.bzl", "pom_file")
load("//rules_daml:daml.bzl", "daml_compile")

proto_jars(
name = "participant-integration-api-proto",
srcs = glob(["src/main/protobuf/**/*.proto"]),
maven_artifact_prefix = "participant-integration-api",
maven_group = "com.daml",
strip_import_prefix = "src/main/protobuf",
visibility = ["//visibility:public"],
deps = [
"@com_google_protobuf//:any_proto",
],
)

compile_deps = [
":participant-integration-api-proto_scala",
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/archive:daml_lf_dev_archive_proto_java",
"//daml-lf/data",
Expand Down Expand Up @@ -238,6 +252,7 @@ da_scala_test_suite(
],
deps = [
":participant-integration-api",
":participant-integration-api-proto_scala",
":participant-integration-api-tests-lib",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/archive:daml_lf_archive_reader",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

// Serialization format for Protocol Buffers values stored in the index.
//
// WARNING:
// As all messages declared here represent values stored to the index database, we MUST ensure that
// they remain backwards-compatible forever.

syntax = "proto3";

package daml.platform.index;

option java_package = "com.daml.platform.index";

import "google/protobuf/any.proto";

// Serialized status details, conveyed from the driver `ReadService` to the ledger API client.
// To be combined with a status code and message.
message StatusDetails {
repeated google.protobuf.Any details = 1;
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,15 @@ CREATE TABLE participant_command_completions (
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
-- The three columns below are `NULL` if the completion is for an accepted transaction.
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status_details BYTEA
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);

CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) not null,
record_time TIMESTAMP not null,
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,

application_id NVARCHAR2(1000) not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,
application_id NVARCHAR2(1000) NOT NULL,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints
rejection_status_code INTEGER, -- null for accepted transactions and checkpoints
rejection_status_message CLOB, -- null for accepted transactions and checkpoints
rejection_status_details BLOB -- null for accepted transactions and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
824b012135f72b24d8393c412535432e4b2c669075e012971220f14eeb7bdaae
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Adds a column to store extra details for the rejection status.
--
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
-- (decided by the ledger driver), and may be `NULL` even if the other two columns are set.

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status_details BYTEA;
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,42 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status

// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Restrict the scope of this to com.daml.platform.store.dao when
// TODO - the in-memory sandbox is gone
private[platform] object CompletionFromTransaction {
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
private val RejectionTransactionId = ""

def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] =
Some(
Checkpoint(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))),
)
def acceptedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
transactionId: String,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)),
)

// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
def apply(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was inlined into InMemoryLedger in Sandbox Classic, as it's only used there.

appId: Ref.ApplicationId,
parties: Set[Ref.Party],
): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = {
case (
offset,
LedgerEntry.Transaction(
Some(commandId),
transactionId,
Some(`appId`),
_,
actAs,
_,
_,
recordTime,
_,
_,
),
) if actAs.exists(parties) =>
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(Status()), transactionId)),
)
def rejectedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
status: StatusProto,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)),
)

case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(status))),
)
}
private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint =
Checkpoint.of(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
rejection_status_code: Option[Int],
rejection_status_message: Option[String],
rejection_status_details: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.lf.engine.Blinding
import com.daml.lf.ledger.EventId
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.dao.DeduplicationKeyMaker
Expand All @@ -34,8 +35,10 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
rejection_status_code = Some(u.reasonTemplate.code),
rejection_status_message = Some(u.reasonTemplate.message),
rejection_status_details =
Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +272,9 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
rejection_status_code = None,
rejection_status_message = None,
rejection_status_details = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,44 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

def queryStrategy: QueryStrategy

private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
}
sharedColumns ~
int("rejection_status_code") ~
str("rejection_status_message") ~
binaryStream("rejection_status_details").? map {
case offset ~ recordTime ~ commandId ~
rejectionStatusCode ~ rejectionStatusMessage ~ rejectionStatusDetails =>
val details = rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.getOrElse(Seq.empty)
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, details)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -61,8 +63,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
rejection_status_code,
rejection_status_message,
rejection_status_details
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"rejection_status_code" -> fieldStrategy.intOptional(_.rejection_status_code),
"rejection_status_message" -> fieldStrategy.stringOptional(_.rejection_status_message),
"rejection_status_details" -> fieldStrategy.byteaOptional(_.rejection_status_details),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import java.time.Instant

import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

private[platform] object CommandCompletionsTable {

Expand All @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable {
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
This conversation was marked as resolved.
Show resolved Hide resolved
}

val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser
Expand Down
Loading