Skip to content

Commit

Permalink
participant-integration-api: Store a status gRPC protobuf.
Browse files Browse the repository at this point in the history
Instead of storing the status code and message, we store a serialized
`google.rpc.Status` protocol buffers message. This allows us to pass
through any additional information reported by the driver `ReadService`.

The migration is done in Scala because constructing protobuf messages
in SQL turns out to be very, very unpleasant.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar committed Aug 17, 2021
1 parent a3ddc8c commit cb1edb4
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ CREATE TABLE participant_command_completions (
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
transaction_id VARCHAR,
status_code INTEGER,
status_message VARCHAR
status BLOB
);

CREATE INDEX participant_command_completion_offset_application_idx ON participant_command_completions (completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`.

ALTER TABLE participant_command_completions
DROP COLUMN status_code;
ALTER TABLE participant_command_completions
DROP COLUMN status_message;
ALTER TABLE participant_command_completions
ADD COLUMN status BLOB;
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ CREATE TABLE participant_command_completions
command_id NVARCHAR2(1000) not null,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
status BLOB -- null for successful command and checkpoints
);

CREATE INDEX participant_command_completions_idx ON participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ CREATE TABLE participant_command_completions
command_id NVARCHAR2(1000) not null,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message CLOB -- null for successful command and checkpoints
status BLOB -- null for successful command and checkpoints
);

create index participant_command_completions_idx on participant_command_completions(completion_offset, application_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.db.migration.postgres

import java.sql.{Connection, Statement}

import com.google.rpc.status.{Status => StatusProto}
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}

private[migration] final class V46__Convert_status_to_proto extends BaseJavaMigration {

This comment has been minimized.

Copy link
@rautenrieth-da

rautenrieth-da Aug 17, 2021

Contributor

This will not work for users that have already upgraded to the append-only schema (Flyway won't apply this migration because it's already past version 100). AFAIK we have no customers that have upgraded (the flags are still hidden), but it would be good to double-check.

This comment has been minimized.

Copy link
@rautenrieth-da

rautenrieth-da Aug 18, 2021

Contributor

we cannot support the append-only schema

Why is that? The old (mutating) schema will be removed in the near future, after that all migrations will be added at the end again.

Btw, there is a flag that allows Flyway to ignore the above error, but it's asking for trouble.


@throws[Exception]
override def migrate(context: Context): Unit = {
implicit val connection: Connection = context.getConnection

executeDdl("ALTER TABLE participant_command_completions ADD COLUMN status BYTEA")

val completionsWithStatusesResultSet = connection
.createStatement()
.executeQuery(
"""|SELECT completion_offset, status_code, status_message
|FROM participant_command_completions
|WHERE status_code IS NOT NULL
| AND status_message IS NOT NULL
|""".stripMargin
)
val update = connection.prepareStatement(
"""|UPDATE participant_command_completions
|SET status = ?
|WHERE completion_offset = ?
|""".stripMargin
)
Iterator
.unfold(completionsWithStatusesResultSet) { results =>
if (results.next()) {
val offset = results.getString("completion_offset")
val statusCode = results.getInt("status_code")
val statusMessage = results.getString("status_message")
Some((offset, statusCode, statusMessage) -> results)
} else {
None
}
}
.foreach { case (offset, statusCode, statusMessage) =>
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
update.setBinaryStream(1, status.toByteString.newInput())
update.setString(2, offset)
update.addBatch()

This comment has been minimized.

Copy link
@rautenrieth-da

rautenrieth-da Aug 17, 2021

Contributor

If the database contains a large number of completions, the batch will probably not fit in memory. Could this be an issue?

}
if (update.executeBatch().contains(Statement.EXECUTE_FAILED)) {
throw new IllegalStateException("Migration failed.")
}

executeDdl("ALTER TABLE participant_command_completions DROP COLUMN status_code")
executeDdl("ALTER TABLE participant_command_completions DROP COLUMN status_message")
}

private def executeDdl(sql: String)(implicit connection: Connection): Unit = {
connection.createStatement().execute(sql)
()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ object DbDto {
submitters: Set[String],
command_id: String,
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
status: Option[Array[Byte]],
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DbDto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ object UpdateToDbDto {
submitters = u.completionInfo.actAs.toSet,
command_id = u.completionInfo.commandId,
transaction_id = None,
status_code = Some(u.reasonTemplate.code),
status_message = Some(u.reasonTemplate.message),
status = Some(u.reasonTemplate.status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -269,8 +268,7 @@ object UpdateToDbDto {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(u.transactionId),
status_code = None,
status_message = None,
status = None,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, str}
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
Expand All @@ -31,9 +32,9 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
sharedCompletionColumns ~ binaryStream("status") map {
case offset ~ recordTime ~ commandId ~ statusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(statusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

Expand All @@ -55,8 +56,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
record_time,
command_id,
transaction_id,
status_code,
status_message
status
FROM
participant_command_completions
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ private[backend] object AppendOnlySchema {
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"status_code" -> fieldStrategy.intOptional(_.status_code),
"status_message" -> fieldStrategy.stringOptional(_.status_message),
"status" -> fieldStrategy.byteaOptional(_.status),
)

val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ package com.daml.platform.store.dao

import java.time.Instant

import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import anorm.SqlParser.{binaryStream, str}
import anorm.{Row, RowParser, SimpleSql, SqlStringInterpolation, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import com.google.protobuf.CodedInputStream
import com.google.rpc.status.{Status => StatusProto}

private[platform] object CommandCompletionsTable {

import SqlParser.{int, str}

private val sharedColumns: RowParser[Offset ~ Instant ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id")

Expand All @@ -28,9 +28,9 @@ private[platform] object CommandCompletionsTable {
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
sharedColumns ~ binaryStream("status") map {
case offset ~ recordTime ~ commandId ~ statusStream =>
val status = StatusProto.parseFrom(CodedInputStream.newInstance(statusStream))
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

Expand All @@ -45,7 +45,7 @@ private[platform] object CommandCompletionsTable {
): SimpleSql[Row] = {
val submittersInPartiesClause =
sqlFunctions.arrayIntersectionWhereClause("submitters", parties)
SQL"select completion_offset, record_time, command_id, transaction_id, status_code, status_message from participant_command_completions where ($startExclusive is null or completion_offset > $startExclusive) and completion_offset <= $endInclusive and application_id = $applicationId and #$submittersInPartiesClause order by completion_offset asc"
SQL"select completion_offset, record_time, command_id, transaction_id, status from participant_command_completions where ($startExclusive is null or completion_offset > $startExclusive) and completion_offset <= $endInclusive and application_id = $applicationId and #$submittersInPartiesClause order by completion_offset asc"
}

def prepareCompletionsDelete(endInclusive: Offset): SimpleSql[Row] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,8 +1214,8 @@ private[platform] object JdbcLedgerDao {
recordTime: Instant,
reason: state.Update.CommandRejected.RejectionReasonTemplate,
): SimpleSql[Row] = {
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status_code, status_message) values ($offset, $recordTime, ${completionInfo.applicationId}, ${completionInfo.actAs
.toArray[String]}, ${completionInfo.commandId}, ${reason.code}, ${reason.message})"
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status) values ($offset, $recordTime, ${completionInfo.applicationId}, ${completionInfo.actAs
.toArray[String]}, ${completionInfo.commandId}, ${reason.status.toByteArray})"
}

protected[JdbcLedgerDao] def escapeReservedWord(word: String): String
Expand Down Expand Up @@ -1403,7 +1403,7 @@ private[platform] object JdbcLedgerDao {
reason: state.Update.CommandRejected.RejectionReasonTemplate,
): SimpleSql[Row] = {
import com.daml.platform.store.OracleArrayConversions._
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status_code, status_message) values ($offset, $recordTime, ${completionInfo.applicationId}, ${completionInfo.actAs.toJson.compactPrint}, ${completionInfo.commandId}, ${reason.code}, ${reason.message})"
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status) values ($offset, $recordTime, ${completionInfo.applicationId}, ${completionInfo.actAs.toJson.compactPrint}, ${completionInfo.commandId}, ${reason.status.toByteArray})"
}

// spaces which are subsequently trimmed left only for readability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.daml.platform.store.appendonlydao.events.{
}
import com.daml.platform.store.dao.DeduplicationKeyMaker
import com.google.protobuf.ByteString
import com.google.rpc.status.{Status => RpcStatus}
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.scalactic.TripleEquals._
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -258,12 +258,11 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
optDeduplicationPeriod = None,
submissionId = someSubmissionId,
)
val status = StatusProto.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
val update = state.Update.CommandRejected(
someRecordTime,
completionInfo,
new state.Update.CommandRejected.FinalReason(
RpcStatus.of(Status.Code.ABORTED.value(), "test reason", Seq.empty)
),
new state.Update.CommandRejected.FinalReason(status),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
Expand All @@ -277,8 +276,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = Set(someParty),
command_id = someCommandId,
transaction_id = None,
status_code = Some(Status.Code.ABORTED.value()),
status_message = Some("test reason"),
status = Some(status.toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
Expand Down Expand Up @@ -348,8 +346,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -415,8 +412,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -494,8 +490,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -573,8 +568,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -735,8 +729,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -831,8 +824,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -951,8 +943,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -1048,8 +1039,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down Expand Up @@ -1122,8 +1112,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
transaction_id = Some(update.transactionId),
status_code = None,
status_message = None,
status = None,
),
)
}
Expand Down

0 comments on commit cb1edb4

Please sign in to comment.