-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
participant-integration-api: Store a status gRPC protobuf.
Instead of storing the status code and message, we store a serialized `google.rpc.Status` protocol buffers message. This allows us to pass through any additional information reported by the driver `ReadService`. The migration is done in Scala because constructing protobuf messages in SQL turns out to be very, very unpleasant. CHANGELOG_BEGIN CHANGELOG_END
- Loading branch information
1 parent
a3ddc8c
commit cb1edb4
Showing
12 changed files
with
110 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
...ticipant-integration-api/src/main/resources/db/migration/h2database/V31__status_proto.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
-- SPDX-License-Identifier: Apache-2.0 | ||
|
||
-- Store the completion status as a serialized Protocol Buffers message of type `google.rpc.Status`. | ||
|
||
ALTER TABLE participant_command_completions | ||
DROP COLUMN status_code; | ||
ALTER TABLE participant_command_completions | ||
DROP COLUMN status_message; | ||
ALTER TABLE participant_command_completions | ||
ADD COLUMN status BLOB; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
...t-integration-api/src/main/scala/db/migration/postgres/V46__Convert_status_to_proto.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.platform.db.migration.postgres | ||
|
||
import java.sql.{Connection, Statement} | ||
|
||
import com.google.rpc.status.{Status => StatusProto} | ||
import org.flywaydb.core.api.migration.{BaseJavaMigration, Context} | ||
|
||
private[migration] final class V46__Convert_status_to_proto extends BaseJavaMigration { | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
rautenrieth-da
Contributor
|
||
|
||
@throws[Exception] | ||
override def migrate(context: Context): Unit = { | ||
implicit val connection: Connection = context.getConnection | ||
|
||
executeDdl("ALTER TABLE participant_command_completions ADD COLUMN status BYTEA") | ||
|
||
val completionsWithStatusesResultSet = connection | ||
.createStatement() | ||
.executeQuery( | ||
"""|SELECT completion_offset, status_code, status_message | ||
|FROM participant_command_completions | ||
|WHERE status_code IS NOT NULL | ||
| AND status_message IS NOT NULL | ||
|""".stripMargin | ||
) | ||
val update = connection.prepareStatement( | ||
"""|UPDATE participant_command_completions | ||
|SET status = ? | ||
|WHERE completion_offset = ? | ||
|""".stripMargin | ||
) | ||
Iterator | ||
.unfold(completionsWithStatusesResultSet) { results => | ||
if (results.next()) { | ||
val offset = results.getString("completion_offset") | ||
val statusCode = results.getInt("status_code") | ||
val statusMessage = results.getString("status_message") | ||
Some((offset, statusCode, statusMessage) -> results) | ||
} else { | ||
None | ||
} | ||
} | ||
.foreach { case (offset, statusCode, statusMessage) => | ||
val status = StatusProto.of(statusCode, statusMessage, Seq.empty) | ||
update.setBinaryStream(1, status.toByteString.newInput()) | ||
update.setString(2, offset) | ||
update.addBatch() | ||
This comment has been minimized.
Sorry, something went wrong.
rautenrieth-da
Contributor
|
||
} | ||
if (update.executeBatch().contains(Statement.EXECUTE_FAILED)) { | ||
throw new IllegalStateException("Migration failed.") | ||
} | ||
|
||
executeDdl("ALTER TABLE participant_command_completions DROP COLUMN status_code") | ||
executeDdl("ALTER TABLE participant_command_completions DROP COLUMN status_message") | ||
} | ||
|
||
private def executeDdl(sql: String)(implicit connection: Connection): Unit = { | ||
connection.createStatement().execute(sql) | ||
() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This will not work for users that have already upgraded to the append-only schema (Flyway won't apply this migration because it's already past version 100). AFAIK we have no customers that have upgraded (the flags are still hidden), but it would be good to double-check.