Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d8451829143e22581afc5b31930bb0ff8c5efab89a3eaca0ed2e57454fc6823
2c67203c14e8cd9a316f8a2787be30cf13ecb9eedcafa49876b8f9b63b99bc71
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,18 @@ CREATE TABLE participant_command_completions (
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
-- The transaction ID is `NULL` for rejected transactions.
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
-- The three columns below are `NULL` if the completion is for an accepted transaction.
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
This conversation was marked as resolved.
Show resolved Hide resolved
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
This conversation was marked as resolved.
Show resolved Hide resolved
-- driver).
-- The `rejection_status_code` and `rejection_status_message` columns will always be
-- `NULL` in an H2-backed index, as data is not persisted across ledger restarts. However, we
This conversation was marked as resolved.
Show resolved Hide resolved
-- keep them for parity with old data in other databases.
rejection_status_code INTEGER,
rejection_status_message VARCHAR,
rejection_status BLOB
This conversation was marked as resolved.
Show resolved Hide resolved
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down Expand Up @@ -318,6 +327,9 @@ CREATE INDEX participant_events_consuming_exercise_tree_event_witnesses_idx ON p
-- lookup by contract id
CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participant_events_consuming_exercise (contract_id);

-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
This conversation was marked as resolved.
Show resolved Hide resolved
-- SPDX-License-Identifier: Apache-2.0

---------------------------------------------------------------------------------------------------
-- Events table: non-consuming exercise
---------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c6e4c74b1c854a51d9cd79d3c614b60a372ca60e88b8d69cbd90a7a674389080
7cd06324d0b97f384401b77a0d0af4bf7e949427c812097bc5c565f68c8bfee6
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ CREATE INDEX idx_party_entries ON party_entries(submission_id);

CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) not null,
record_time TIMESTAMP not null,
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,

application_id NVARCHAR2(1000) not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,
application_id NVARCHAR2(1000) NOT NULL,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
transaction_id NVARCHAR2(1000), -- null for rejected transactions and checkpoints
rejection_status_code INTEGER, -- null for accepted transactions and checkpoints
rejection_status_message CLOB, -- null for accepted transactions and checkpoints
rejection_status BLOB -- null for accepted transactions and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down Expand Up @@ -386,6 +387,9 @@ CREATE TABLE participant_events_non_consuming_exercise (
-- offset index: used to translate to sequential_id
CREATE INDEX participant_events_non_consuming_exercise_event_offset ON participant_events_non_consuming_exercise(event_offset);

-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
This conversation was marked as resolved.
Show resolved Hide resolved
-- SPDX-License-Identifier: Apache-2.0

-- sequential_id index for paging
CREATE INDEX participant_events_non_consuming_exercise_event_sequential_id ON participant_events_non_consuming_exercise(event_sequential_id);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e97a66e8f6431ff0dd19d48b46bbe1b90ef7d840be9145e73a567107de06db04
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`.
--
-- The `rejection_status` contains a Protocol-Buffers-serialized message of type
-- `google.rpc.Status`, containing the code, message, and further details (decided by the ledger
-- driver).
--
-- We only rename the `rejection_status_code` and `rejection_status_message` columns, and so they
-- may contain historical data. Readers of this table will therefore need to query for all three
-- columns and either use the `google.rpc.Status` message in `rejection_status` if it exists, or
-- construct one from the `rejection_status_code` and `rejection_status_message` if it does not.

ALTER TABLE participant_command_completions
RENAME COLUMN status_code TO rejection_status_code;
ALTER TABLE participant_command_completions
RENAME COLUMN status_message TO rejection_status_message;
ALTER TABLE participant_command_completions
ADD COLUMN rejection_status BYTEA;
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,42 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status

// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Restrict the scope of this to com.daml.platform.store.dao when
// TODO - the in-memory sandbox is gone
private[platform] object CompletionFromTransaction {
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
private val RejectionTransactionId = ""

def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] =
Some(
Checkpoint(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))),
)
def acceptedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
transactionId: String,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)),
)

// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
def apply(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was inlined into InMemoryLedger in Sandbox Classic, as it's only used there.

appId: Ref.ApplicationId,
parties: Set[Ref.Party],
): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = {
case (
offset,
LedgerEntry.Transaction(
Some(commandId),
transactionId,
Some(`appId`),
_,
actAs,
_,
_,
recordTime,
_,
_,
),
) if actAs.exists(parties) =>
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(Status()), transactionId)),
)
def rejectedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
status: StatusProto,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)),
)

case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(status))),
)
}
private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint =
Checkpoint.of(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
rejection_status: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
rejection_status = Some(u.reasonTemplate.status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +268,7 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
rejection_status = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,46 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.rpc.status.Status
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

def queryStrategy: QueryStrategy

private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] =
private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
}
private val rejectedCommandParser: RowParser[CompletionStreamResponse] = {
val parserWithCodeAndMessage = sharedColumns ~
int("rejection_status_code") ~ str("rejection_status_message") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusCode ~ rejectionStatusMessage =>
val status = StatusProto.of(rejectionStatusCode, rejectionStatusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we have tests somewhere to test this part?
normally it would not be tested because on the HEAD we would always populate the rejection_status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rautenrieth-da did you maybe encountered a test suit which would test this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are very bad at testing migrations. I have no idea how we would go about testing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compatibility package tests migrations in a way. It does:

  1. install an old SDK version
  2. push some data into a sandbox ledger, using the ledger API
  3. upgrade to the next SDK version (up to and including the head version)
  4. use the ledger API to check that the ledger content still looks the same
  5. go to step 2

The application that uploads and checks data is a custom client application written in Haskell. There are checks for comparing the ACS and contract visibility due to divulgence. It could probably be extended to compare completions as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but do we also test rejections this way? Never mind me, it will probably work...although we would not necessarily would have here dead code if we follow @miklos-da useful suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we test rejections, but we could extend the compatibility tests to cover them.

val parserWithProtobufStatus = sharedColumns ~
binaryStream("rejection_status") map {
case offset ~ recordTime ~ commandId ~ rejectionStatusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(rejectionStatusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}
parserWithCodeAndMessage | parserWithProtobufStatus
}

private val completionParser: RowParser[CompletionStreamResponse] =
acceptedCommandParser | rejectedCommandParser
Expand All @@ -61,8 +65,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
rejection_status_code,
rejection_status_message,
rejection_status
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"rejection_status" -> fieldStrategy.byteaOptional(_.rejection_status),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import java.time.Instant

import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

private[platform] object CommandCompletionsTable {

Expand All @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable {
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
This conversation was marked as resolved.
Show resolved Hide resolved
}

val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser
Expand Down
Loading