Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
Next Next commit
participant-integration-api: Construct completions in one place.
  • Loading branch information
SamirTalwar committed Aug 17, 2021
commit 66e9e9366ae985a7f901274f6e7d5419a94aaf2e
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@ import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status

// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Restrict the scope of this to com.daml.platform.store.dao when
// TODO - the in-memory sandbox is gone
private[platform] object CompletionFromTransaction {

def toApiCheckpoint(recordTime: Instant, offset: Offset): Some[Checkpoint] =
Some(
Checkpoint(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset(LedgerOffset.Value.Absolute(offset.toApiString))),
)
)
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
private val RejectionTransactionId = ""

// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
Expand All @@ -53,17 +48,39 @@ private[platform] object CompletionFromTransaction {
_,
),
) if actAs.exists(parties) =>
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(Status()), transactionId)),
)
offset -> acceptedCompletion(recordTime, offset, commandId, transactionId)

case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
Seq(Completion(commandId, Some(status))),
)
offset -> rejectedCompletion(recordTime, offset, commandId, status)
}

def acceptedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
transactionId: String,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(OkStatus), transactionId)),
)

def rejectedCompletion(
recordTime: Instant,
offset: Offset,
commandId: String,
status: StatusProto,
): CompletionStreamResponse =
CompletionStreamResponse.of(
checkpoint = Some(toApiCheckpoint(recordTime, offset)),
completions = Seq(Completion.of(commandId, Some(status), RejectionTransactionId)),
)

private def toApiCheckpoint(recordTime: Instant, offset: Offset): Checkpoint =
Checkpoint.of(
recordTime = Some(fromInstant(recordTime)),
offset = Some(LedgerOffset.of(LedgerOffset.Value.Absolute(offset.toApiString))),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import java.time.Instant
import anorm.{RowParser, ~}
import anorm.SqlParser.{int, str}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

Expand All @@ -28,19 +27,14 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedCompletionColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
}

private val completionParser: RowParser[CompletionStreamResponse] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import java.time.Instant

import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import com.google.rpc.status.Status
import com.google.rpc.status.{Status => StatusProto}

private[platform] object CommandCompletionsTable {

Expand All @@ -25,19 +24,14 @@ private[platform] object CommandCompletionsTable {
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ str("transaction_id") map {
case offset ~ recordTime ~ commandId ~ transactionId =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status()), transactionId)),
)
CompletionFromTransaction.acceptedCompletion(recordTime, offset, commandId, transactionId)
}

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ int("status_code") ~ str("status_message") map {
case offset ~ recordTime ~ commandId ~ statusCode ~ statusMessage =>
CompletionStreamResponse(
checkpoint = toApiCheckpoint(recordTime, offset),
completions = Seq(Completion(commandId, Some(Status(statusCode, statusMessage)))),
)
val status = StatusProto.of(statusCode, statusMessage, Seq.empty)
CompletionFromTransaction.rejectedCompletion(recordTime, offset, commandId, status)
This conversation was marked as resolved.
Show resolved Hide resolved
}

val parser: RowParser[CompletionStreamResponse] = acceptedCommandParser | rejectedCommandParser
Expand Down