Skip to content

Commit

Permalink
[participant-integration-api] - Use internal CommandCompletionService…
Browse files Browse the repository at this point in the history
… [KVL-1172] (#11741)

Fully decouple the usage of the gRPC CommandCompletionService vs the internally defined CommandCompletionService. This allows the two interfaces to diverge and does not force us to expose all the methods that we want to add to the internal CommandCompletionService.

changelog_begin
changelog_end
  • Loading branch information
nicu-da authored Nov 18, 2021
1 parent 39a38d3 commit f5a6302
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ class CompletionServiceRequestValidator(
import errorFactories._
import fieldValidations._

def validateCompletionStreamRequest(
request: GrpcCompletionStreamRequest,
ledgerEnd: LedgerOffset.Absolute,
def validateGrpcCompletionStreamRequest(
request: GrpcCompletionStreamRequest
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, CompletionStreamRequest] =
for {
_ <- matchLedgerId(ledgerId)(LedgerId(request.ledgerId))
validLedgerId <- matchLedgerId(ledgerId)(LedgerId(request.ledgerId))
nonEmptyAppId <- requireNonEmptyString(
request.applicationId,
"application_id",
Expand All @@ -46,21 +45,32 @@ class CompletionServiceRequestValidator(
.fromString(nonEmptyAppId)
.left
.map(invalidField("application_id", _, None))
nonEmptyParties <- requireNonEmpty(request.parties, "parties")
knownParties <- partyValidator.requireKnownParties(nonEmptyParties)
parties <- requireParties(request.parties.toSet)
convertedOffset <- ledgerOffsetValidator.validateOptional(request.offset, "offset")
_ <- ledgerOffsetValidator.offsetIsBeforeEndIfAbsolute(
"Begin",
convertedOffset,
ledgerEnd,
)
} yield CompletionStreamRequest(
ledgerId,
validLedgerId,
ApplicationId(appId),
knownParties,
parties,
convertedOffset,
)

def validateCompletionStreamRequest(
request: CompletionStreamRequest,
ledgerEnd: LedgerOffset.Absolute,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, CompletionStreamRequest] =
for {
_ <- matchLedgerId(ledgerId)(request.ledgerId)
_ <- ledgerOffsetValidator.offsetIsBeforeEndIfAbsolute(
"Begin",
request.offset,
ledgerEnd,
)
_ <- requireNonEmpty(request.parties, "parties")
_ <- partyValidator.requireKnownParties(request.parties)
} yield request

def validateCompletionEndRequest(
req: CompletionEndRequest
)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,40 @@ package com.daml.platform.server.api.services.grpc

import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.messages.command.completion.{
CompletionStreamRequest => ValidatedCompletionStreamRequest
}
import com.daml.ledger.api.v1.command_completion_service._
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.validation.{CompletionServiceRequestValidator, PartyNameChecker}
import com.daml.ledger.api.validation.CompletionServiceRequestValidator
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.server.api.ValidationLogger
import com.daml.platform.server.api.services.domain.CommandCompletionService

import scala.concurrent.{ExecutionContext, Future}

object GrpcCommandCompletionService {

private[this] val completionStreamDefaultOffset = Some(domain.LedgerOffset.LedgerEnd)

private def fillInWithDefaults(
request: ValidatedCompletionStreamRequest
): ValidatedCompletionStreamRequest =
if (request.offset.isDefined) {
request
} else {
request.copy(offset = completionStreamDefaultOffset)
}

}

class GrpcCommandCompletionService(
ledgerId: LedgerId,
service: CommandCompletionService,
partyNameChecker: PartyNameChecker,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
validator: CompletionServiceRequestValidator,
)(implicit
protected val mat: Materializer,
protected val esf: ExecutionSequencerFactory,
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) extends CommandCompletionServiceAkkaGrpc {

private val validator =
new CompletionServiceRequestValidator(ledgerId, partyNameChecker, errorCodesVersionSwitcher)
protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: DamlContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

override def completionStreamSource(
request: CompletionStreamRequest
): Source[CompletionStreamResponse, akka.NotUsed] = {
Source.future(service.getLedgerEnd(LedgerId(request.ledgerId))).flatMapConcat { ledgerEnd =>
validator
.validateCompletionStreamRequest(request, ledgerEnd)
.fold(
t => Source.failed[CompletionStreamResponse](ValidationLogger.logFailure(request, t)),
GrpcCommandCompletionService.fillInWithDefaults _ andThen service.completionStreamSource,
)
}
validator
.validateGrpcCompletionStreamRequest(request)
.fold(
t => Source.failed[CompletionStreamResponse](ValidationLogger.logFailure(request, t)),
service.completionStreamSource,
)
}

override def completionEnd(request: CompletionEndRequest): Future[CompletionEndResponse] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package com.daml.ledger.api.validation

import com.daml.error.{ContextualizedErrorLogger, ErrorCodesVersionSwitcher, NoLogging}
import com.daml.ledger.api.domain
import com.daml.ledger.api.messages.command.completion.CompletionStreamRequest
import com.daml.ledger.api.v1.command_completion_service.{
CompletionEndRequest,
CompletionStreamRequest,
CompletionStreamRequest => GrpcCompletionStreamRequest,
}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary
import com.daml.lf.data.Ref
import io.grpc.Status.Code._
import org.mockito.MockitoSugar
import org.scalatest.wordspec.AnyWordSpec
Expand All @@ -20,12 +22,18 @@ class CompletionServiceRequestValidatorTest
with ValidatorTestUtils
with MockitoSugar {
private implicit val noLogging: ContextualizedErrorLogger = NoLogging
private val completionReq = CompletionStreamRequest(
private val grpcCompletionReq = GrpcCompletionStreamRequest(
expectedLedgerId,
expectedApplicationId,
List(party),
Some(LedgerOffset(LedgerOffset.Value.Absolute(absoluteOffset))),
)
private val completionReq = CompletionStreamRequest(
domain.LedgerId(expectedLedgerId),
domain.ApplicationId(Ref.ApplicationId.assertFromString(expectedApplicationId)),
List(party).toSet,
Some(domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(absoluteOffset))),
)

private val endReq = CompletionEndRequest(expectedLedgerId)

Expand All @@ -46,13 +54,12 @@ class CompletionServiceRequestValidatorTest

"CompletionRequestValidation" when {

"validating regular requests" should {
"validating gRPC completion requests" should {

"reject requests with empty ledger ID" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withLedgerId(""),
ledgerEnd,
testedRequest = _.validateGrpcCompletionStreamRequest(
grpcCompletionReq.withLedgerId("")
),
expectedCodeV1 = NOT_FOUND,
expectedDescriptionV1 = "Ledger ID '' not found. Actual Ledger ID is 'expectedLedgerId'.",
Expand All @@ -64,9 +71,8 @@ class CompletionServiceRequestValidatorTest

"return the correct error on missing application ID" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withApplicationId(""),
ledgerEnd,
testedRequest = _.validateGrpcCompletionStreamRequest(
grpcCompletionReq.withApplicationId("")
),
expectedCodeV1 = INVALID_ARGUMENT,
expectedDescriptionV1 = "Missing field: application_id",
Expand All @@ -76,42 +82,71 @@ class CompletionServiceRequestValidatorTest
)
}

"return the correct error on missing party" in {
"return the correct error on unknown begin boundary" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withParties(Seq()),
ledgerEnd,
testedRequest = _.validateGrpcCompletionStreamRequest(
grpcCompletionReq.withOffset(
LedgerOffset(LedgerOffset.Value.Boundary(LedgerBoundary.Unrecognized(7)))
)
),
expectedCodeV1 = INVALID_ARGUMENT,
expectedDescriptionV1 = "Missing field: parties",
expectedDescriptionV1 =
"Invalid argument: Unknown ledger boundary value '7' in field offset.boundary",
expectedCodeV2 = INVALID_ARGUMENT,
expectedDescriptionV2 =
"MISSING_FIELD(8,0): The submitted command is missing a mandatory field: parties",
"INVALID_ARGUMENT(8,0): The submitted command has invalid arguments: Unknown ledger boundary value '7' in field offset.boundary",
)
}

"return the correct error on unknown begin boundary" in {
"tolerate all fields filled out" in {
inside(
validator.validateGrpcCompletionStreamRequest(grpcCompletionReq)
) { case Right(req) =>
req shouldBe completionReq
verifyZeroInteractions(errorCodesVersionSwitcher_mock)
}
}
}

"validate domain completion requests" should {

"reject requests with empty ledger ID" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withOffset(
LedgerOffset(LedgerOffset.Value.Boundary(LedgerBoundary.Unrecognized(7)))
),
completionReq.copy(ledgerId = domain.LedgerId("")),
ledgerEnd,
),
expectedCodeV1 = NOT_FOUND,
expectedDescriptionV1 = "Ledger ID '' not found. Actual Ledger ID is 'expectedLedgerId'.",
expectedCodeV2 = NOT_FOUND,
expectedDescriptionV2 =
"LEDGER_ID_MISMATCH(11,0): Ledger ID '' not found. Actual Ledger ID is 'expectedLedgerId'.",
)
}

"return the correct error on missing party" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.copy(parties = Set.empty),
ledgerEnd,
),
expectedCodeV1 = INVALID_ARGUMENT,
expectedDescriptionV1 =
"Invalid argument: Unknown ledger boundary value '7' in field offset.boundary",
expectedDescriptionV1 = "Missing field: parties",
expectedCodeV2 = INVALID_ARGUMENT,
expectedDescriptionV2 =
"INVALID_ARGUMENT(8,0): The submitted command has invalid arguments: Unknown ledger boundary value '7' in field offset.boundary",
"MISSING_FIELD(8,0): The submitted command is missing a mandatory field: parties",
)
}

"return the correct error when offset is after ledger end" in {
fixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withOffset(
LedgerOffset(LedgerOffset.Value.Absolute((ledgerEnd.value.toInt + 1).toString))
completionReq.copy(offset =
Some(
domain.LedgerOffset.Absolute(
Ref.LedgerString.assertFromString((ledgerEnd.value.toInt + 1).toString)
)
)
),
ledgerEnd,
),
Expand All @@ -126,7 +161,7 @@ class CompletionServiceRequestValidatorTest
"tolerate missing end" in {
inside(
validator.validateCompletionStreamRequest(
completionReq.update(_.optionalOffset := None),
completionReq.copy(offset = None),
ledgerEnd,
)
) { case Right(req) =>
Expand All @@ -137,18 +172,6 @@ class CompletionServiceRequestValidatorTest
verifyZeroInteractions(errorCodesVersionSwitcher_mock)
}
}

"tolerate all fields filled out" in {
inside(
validator.validateCompletionStreamRequest(completionReq, ledgerEnd)
) { case Right(req) =>
req.ledgerId shouldEqual expectedLedgerId
req.applicationId shouldEqual expectedApplicationId
req.parties shouldEqual Set(party)
req.offset shouldEqual Some(domain.LedgerOffset.Absolute(absoluteOffset))
verifyZeroInteractions(errorCodesVersionSwitcher_mock)
}
}
}

"validating completions end requests" should {
Expand Down Expand Up @@ -176,21 +199,21 @@ class CompletionServiceRequestValidatorTest

"applying party name checks" should {

val knowsPartyOnlyFixture = new ValidatorFixture((enabled) => {
val knowsPartyOnlyFixture = new ValidatorFixture(enabled => {
new CompletionServiceRequestValidator(
domain.LedgerId(expectedLedgerId),
PartyNameChecker.AllowPartySet(Set(party)),
new ErrorCodesVersionSwitcher(enabled),
)
})

val unknownParties = List("party", "Alice", "Bob")
val knownParties = List("party")
val unknownParties = List("party", "Alice", "Bob").map(Ref.Party.assertFromString).toSet
val knownParties = List("party").map(Ref.Party.assertFromString)

"reject completion requests for unknown parties" in {
knowsPartyOnlyFixture.testRequestFailure(
testedRequest = _.validateCompletionStreamRequest(
completionReq.withParties(unknownParties),
completionReq.copy(parties = unknownParties),
ledgerEnd,
),
expectedCodeV1 = INVALID_ARGUMENT,
Expand All @@ -204,15 +227,13 @@ class CompletionServiceRequestValidatorTest
"accept transaction requests for known parties" in {
knowsPartyOnlyFixture
.tested(enabledSelfServiceErrorCodes = true)
.validateCompletionStreamRequest(
completionReq.withParties(knownParties),
ledgerEnd,
.validateGrpcCompletionStreamRequest(
grpcCompletionReq.withParties(knownParties)
) shouldBe a[Right[_, _]]
knowsPartyOnlyFixture
.tested(enabledSelfServiceErrorCodes = false)
.validateCompletionStreamRequest(
completionReq.withParties(knownParties),
ledgerEnd,
.validateGrpcCompletionStreamRequest(
grpcCompletionReq.withParties(knownParties)
) shouldBe a[Right[_, _]]
}
}
Expand Down
Loading

0 comments on commit f5a6302

Please sign in to comment.