Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Store a status gRPC protobuf. [KVL-1005] #10600

Merged
merged 8 commits into from
Aug 18, 2021
Prev Previous commit
Next Next commit
sandbox-classic: Inline CompletionFromTransaction#apply.
It's only used here; there's no reason to keep it in the
_participant-integration-api_.
  • Loading branch information
SamirTalwar committed Aug 17, 2021
commit 2527ec8b39cfc4c0b8081b17cb1d5cbec427f303
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, Completion
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status

Expand All @@ -24,38 +21,6 @@ private[platform] object CompletionFromTransaction {
private val OkStatus = StatusProto.of(Status.Code.OK.value(), "", Seq.empty)
private val RejectionTransactionId = ""

// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
def apply(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was inlined into InMemoryLedger in Sandbox Classic, as it's only used there.

appId: Ref.ApplicationId,
parties: Set[Ref.Party],
): PartialFunction[(Offset, LedgerEntry), (Offset, CompletionStreamResponse)] = {
case (
offset,
LedgerEntry.Transaction(
Some(commandId),
transactionId,
Some(`appId`),
_,
actAs,
_,
_,
recordTime,
_,
_,
),
) if actAs.exists(parties) =>
offset -> acceptedCompletion(recordTime, offset, commandId, transactionId)

case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason))
if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> rejectedCompletion(recordTime, offset, commandId, status)
}

def acceptedCompletion(
recordTime: Instant,
offset: Offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import com.daml.platform.sandbox.stores.ledger.inmemory.InMemoryLedger._
import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection}
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Contract.ActiveContract
import com.daml.platform.store.Conversions.RejectionReasonOps
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
Expand Down Expand Up @@ -166,13 +167,50 @@ private[sandbox] final class InMemoryLedger(
endInclusive: Option[Offset],
applicationId: ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] =
entries
.getSource(startExclusive, endInclusive)
.collect { case (offset, InMemoryLedgerEntry(entry)) =>
(offset, entry)
}
.collect(CompletionFromTransaction(applicationId.unwrap, parties))
)(implicit
loggingContext: LoggingContext
): Source[(Offset, CompletionStreamResponse), NotUsed] = {
val appId = applicationId.unwrap
entries.getSource(startExclusive, endInclusive).collect {
case (
offset,
InMemoryLedgerEntry(
LedgerEntry.Transaction(
Some(commandId),
transactionId,
Some(`appId`),
_,
actAs,
_,
_,
recordTime,
_,
_,
)
),
) if actAs.exists(parties) =>
offset -> CompletionFromTransaction.acceptedCompletion(
recordTime,
offset,
commandId,
transactionId,
)

case (
offset,
InMemoryLedgerEntry(
LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason)
),
) if actAs.exists(parties) =>
val status = reason.toParticipantStateRejectionReason.status
offset -> CompletionFromTransaction.rejectedCompletion(
recordTime,
offset,
commandId,
status,
)
}
}

override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = entries.ledgerEnd

Expand Down