diff --git a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/CommandCompletionClient.java b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/CommandCompletionClient.java index dac596e5ce28..061f141f57df 100644 --- a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/CommandCompletionClient.java +++ b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/CommandCompletionClient.java @@ -17,6 +17,7 @@ public interface CommandCompletionClient { Flowable completionStream(String applicationId, LedgerOffset offset, Set parties); + Flowable completionStream(String applicationId, Set parties); Single completionEnd(); } diff --git a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/DamlLedgerClient.java b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/DamlLedgerClient.java index af90652a693b..bb9c36b1a729 100644 --- a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/DamlLedgerClient.java +++ b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/DamlLedgerClient.java @@ -177,7 +177,7 @@ public void close() throws Exception { pool.close(); } - public static void main(String[] args) throws SSLException { + public static void main(String[] args) { DamlLedgerClient ledgerClient = DamlLedgerClient.forHostWithLedgerIdDiscovery("localhost", 6865, Optional.empty()); ledgerClient.connect(); String ledgerId = ledgerClient.ledgerIdentityClient.getLedgerIdentity().blockingGet(); diff --git a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/CommandCompletionClientImpl.java b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/CommandCompletionClientImpl.java index 7d9cf779a917..5b22f3886308 100644 --- a/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/CommandCompletionClientImpl.java +++ b/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/CommandCompletionClientImpl.java @@ -32,14 +32,22 @@ public CommandCompletionClientImpl(String ledgerId, Channel channel, ExecutionSe serviceFutureStub = CommandCompletionServiceGrpc.newFutureStub(channel); } - @Override - public Flowable completionStream(String applicationId, LedgerOffset offset, Set parties) { - CommandCompletionServiceOuterClass.CompletionStreamRequest request = new CompletionStreamRequest(ledgerId, applicationId, parties, offset).toProto(); + private Flowable completionStream(CompletionStreamRequest request) { return ClientPublisherFlowable - .create(request, serviceStub::completionStream, sequencerFactory) + .create(request.toProto(), serviceStub::completionStream, sequencerFactory) .map(CompletionStreamResponse::fromProto); } + @Override + public Flowable completionStream(String applicationId, LedgerOffset offset, Set parties) { + return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties, offset)); + } + + @Override + public Flowable completionStream(String applicationId, Set parties) { + return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties)); + } + @Override public Single completionEnd() { CommandCompletionServiceOuterClass.CompletionEndRequest request = CommandCompletionServiceOuterClass.CompletionEndRequest.newBuilder().setLedgerId(ledgerId).build(); diff --git a/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/components/tests/helpers/DummyLedgerClient.scala b/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/components/tests/helpers/DummyLedgerClient.scala index 51414ee18c68..7d06e8103774 100644 --- a/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/components/tests/helpers/DummyLedgerClient.scala +++ b/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/components/tests/helpers/DummyLedgerClient.scala @@ -96,6 +96,10 @@ class DummyLedgerClient( offset: LedgerOffset, parties: util.Set[String]): Flowable[CompletionStreamResponse] = commandCompletions + override def completionStream( + applicationId: String, + parties: util.Set[String]): Flowable[CompletionStreamResponse] = + commandCompletions override def completionEnd(): Single[CompletionEndResponse] = ??? } diff --git a/language-support/java/bindings/src/main/java/com/daml/ledger/javaapi/data/CompletionStreamRequest.java b/language-support/java/bindings/src/main/java/com/daml/ledger/javaapi/data/CompletionStreamRequest.java index dbb0c97713f1..3b9793949989 100644 --- a/language-support/java/bindings/src/main/java/com/daml/ledger/javaapi/data/CompletionStreamRequest.java +++ b/language-support/java/bindings/src/main/java/com/daml/ledger/javaapi/data/CompletionStreamRequest.java @@ -7,6 +7,7 @@ import java.util.HashSet; import java.util.Objects; +import java.util.Optional; import java.util.Set; public class CompletionStreamRequest { @@ -17,7 +18,7 @@ public class CompletionStreamRequest { private final Set parties; - private final LedgerOffset offset; + private final Optional offset; public static CompletionStreamRequest fromProto(CommandCompletionServiceOuterClass.CompletionStreamRequest request) { String ledgerId = request.getLedgerId(); @@ -28,12 +29,13 @@ public static CompletionStreamRequest fromProto(CommandCompletionServiceOuterCla } public CommandCompletionServiceOuterClass.CompletionStreamRequest toProto() { - return CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder() - .setLedgerId(this.ledgerId) - .setApplicationId(this.applicationId) - .addAllParties(this.parties) - .setOffset(this.offset.toProto()) - .build(); + CommandCompletionServiceOuterClass.CompletionStreamRequest.Builder protoBuilder = + CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder() + .setLedgerId(this.ledgerId) + .setApplicationId(this.applicationId) + .addAllParties(this.parties); + this.offset.ifPresent(offset -> protoBuilder.setOffset(offset.toProto())); + return protoBuilder.build(); } @Override @@ -76,15 +78,27 @@ public Set getParties() { return parties; } + /** + * @deprecated Legacy, nullable version of {@link #getLedgerOffset()}, which should be used instead. + */ + @Deprecated public LedgerOffset getOffset() { - return offset; + return offset.orElse(null); } - public CompletionStreamRequest(String ledgerId, String applicationId, Set parties, LedgerOffset offset) { + public Optional getLedgerOffset() { return offset; } + public CompletionStreamRequest(String ledgerId, String applicationId, Set parties) { + this.ledgerId = ledgerId; + this.applicationId = applicationId; + this.parties = parties; + this.offset = Optional.empty(); + } + + public CompletionStreamRequest(String ledgerId, String applicationId, Set parties, LedgerOffset offset) { this.ledgerId = ledgerId; this.applicationId = applicationId; this.parties = parties; - this.offset = offset; + this.offset = Optional.of(offset); } } diff --git a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MockMessages.scala b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MockMessages.scala index 69c5a4c6c45c..ab1af879d473 100644 --- a/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MockMessages.scala +++ b/ledger-api/testing-utils/src/main/scala/com/digitalasset/ledger/api/testing/utils/MockMessages.scala @@ -16,8 +16,8 @@ import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary import com.digitalasset.ledger.api.v1.trace_context.TraceContext import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree, TreeEvent} import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter} -import com.digitalasset.ledger.api.v1.value.{Identifier, Value} import com.digitalasset.ledger.api.v1.value.Value.Sum.Text +import com.digitalasset.ledger.api.v1.value.{Identifier, Value} import com.google.protobuf.timestamp.Timestamp import scala.util.Random diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CompletionServiceRequestValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CompletionServiceRequestValidator.scala index 65039e717744..f4d513cfb6a4 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CompletionServiceRequestValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CompletionServiceRequestValidator.scala @@ -32,8 +32,7 @@ class CompletionServiceRequestValidator(ledgerId: LedgerId, partyNameChecker: Pa .map(invalidField("application_id", _)) nonEmptyParties <- requireNonEmpty(request.parties, "parties") knownParties <- partyValidator.requireKnownParties(nonEmptyParties) - offset <- FieldValidations.requirePresence(request.offset, "offset") - convertedOffset <- LedgerOffsetValidator.validate(offset, "offset") + convertedOffset <- LedgerOffsetValidator.validateOptional(request.offset, "offset") } yield CompletionStreamRequest( ledgerId, diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/LedgerOffsetValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/LedgerOffsetValidator.scala index c75ae1ee50b6..ea2fe2d96569 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/LedgerOffsetValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/LedgerOffsetValidator.scala @@ -17,6 +17,14 @@ object LedgerOffsetValidator { private val boundary = "boundary" + def validateOptional( + ledgerOffset: Option[LedgerOffset], + fieldName: String): Either[StatusRuntimeException, Option[domain.LedgerOffset]] = + ledgerOffset + .map(validate(_, fieldName)) + .fold[Either[StatusRuntimeException, Option[domain.LedgerOffset]]](Right(None))( + _.map(Some(_))) + def validate( ledgerOffset: LedgerOffset, fieldName: String): Either[StatusRuntimeException, domain.LedgerOffset] = { diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/TransactionServiceRequestValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/TransactionServiceRequestValidator.scala index ed9904f45ed8..3a04e06b1f2b 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/TransactionServiceRequestValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/TransactionServiceRequestValidator.scala @@ -55,9 +55,7 @@ class TransactionServiceRequestValidator( filter <- requirePresence(req.filter, "filter") requiredBegin <- requirePresence(req.begin, "begin") convertedBegin <- LedgerOffsetValidator.validate(requiredBegin, "begin") - convertedEnd <- req.end - .fold[Result[Option[domain.LedgerOffset]]](rightNone)(end => - LedgerOffsetValidator.validate(end, "end").map(Some(_))) + convertedEnd <- LedgerOffsetValidator.validateOptional(req.end, "end") knownParties <- partyValidator.requireKnownParties(req.getFilter.filtersByParty.keySet) } yield PartialValidation( diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandCompletionService.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandCompletionService.scala index ed3d93edc73c..ee7ed5ebcec9 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandCompletionService.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandCompletionService.scala @@ -19,8 +19,26 @@ import com.google.rpc.status.Status import io.grpc.Status.Code import scalaz.syntax.tag._ +import com.digitalasset.ledger.api.messages.command.completion.{ + CompletionStreamRequest => ValidatedCompletionStreamRequest +} + import scala.concurrent.Future +object GrpcCommandCompletionService { + + private[this] val completionStreamDefaultOffset = Some(domain.LedgerOffset.LedgerEnd) + + private def fillInWithDefaults( + request: ValidatedCompletionStreamRequest): ValidatedCompletionStreamRequest = + if (request.offset.isDefined) { + request + } else { + request.copy(offset = completionStreamDefaultOffset) + } + +} + class GrpcCommandCompletionService( ledgerId: LedgerId, service: CommandCompletionService, @@ -28,19 +46,23 @@ class GrpcCommandCompletionService( )(implicit protected val esf: ExecutionSequencerFactory, protected val mat: Materializer) extends CommandCompletionServiceAkkaGrpc { + import GrpcCommandCompletionService.fillInWithDefaults + private val validator = new CompletionServiceRequestValidator(ledgerId, partyNameChecker) override def completionStreamSource( - request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] = + request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] = { validator .validateCompletionStreamRequest(request) .fold( Source.failed[CompletionStreamResponse], - validatedRequest => + validatedRequest => { service - .completionStreamSource(validatedRequest) + .completionStreamSource(fillInWithDefaults(validatedRequest)) .map(toApiCompletion) + } ) + } override def completionEnd(request: CompletionEndRequest): Future[CompletionEndResponse] = validator diff --git a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/messages/command/completion/CompletionStreamRequest.scala b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/messages/command/completion/CompletionStreamRequest.scala index c4e3a6d90077..793c6af4fe92 100644 --- a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/messages/command/completion/CompletionStreamRequest.scala +++ b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/messages/command/completion/CompletionStreamRequest.scala @@ -10,5 +10,5 @@ case class CompletionStreamRequest( ledgerId: LedgerId, applicationId: ApplicationId, parties: Set[Ref.Party], - offset: LedgerOffset + offset: Option[LedgerOffset] ) diff --git a/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/commands/CommandCompletionServiceIT.scala b/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/commands/CommandCompletionServiceIT.scala index aabdbaf0f3bc..763219c15b15 100644 --- a/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/commands/CommandCompletionServiceIT.scala +++ b/ledger/ledger-api-integration-tests/src/test/itsuite/scala/com/digitalasset/platform/tests/integration/ledger/api/commands/CommandCompletionServiceIT.scala @@ -14,22 +14,31 @@ import com.digitalasset.ledger.api.testing.utils.{ import com.digitalasset.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionService import com.digitalasset.ledger.api.v1.command_completion_service.{ Checkpoint, - CompletionStreamRequest + CompletionStreamRequest, + CompletionStreamResponse } +import com.digitalasset.ledger.api.v1.commands.{ + Command => ProtoCommand, + CreateCommand => ProtoCreateCommand +} +import com.digitalasset.ledger.api.v1.completion.Completion import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary import com.digitalasset.ledger.api.v1.transaction_service.GetLedgerEndRequest +import com.digitalasset.ledger.api.v1.value.{Record, RecordField, Value} import com.digitalasset.ledger.client.services.commands.CompletionStreamElement.{ CheckpointElement, CompletionElement } import com.digitalasset.ledger.client.services.commands.{ + CommandClient, CommandCompletionSource, CompletionStreamElement } import com.digitalasset.platform.apitesting.LedgerContextExtensions._ -import com.digitalasset.platform.apitesting.MultiLedgerFixture +import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture, TestTemplateIds} +import com.digitalasset.platform.sandbox.utils.FirstElementObserver import com.digitalasset.platform.services.time.TimeProviderType.WallClock import com.digitalasset.util.Ctx import org.scalatest.{AsyncWordSpec, Matchers} @@ -46,14 +55,16 @@ class CommandCompletionServiceIT with MultiLedgerFixture with SuiteResourceManagementAroundAll { + private[this] val templateIds = new TestTemplateIds(config).templateIds + private def completionSource( completionService: CommandCompletionService, ledgerId: domain.LedgerId, applicationId: String, parties: Seq[String], - offset: LedgerOffset): Source[CompletionStreamElement, NotUsed] = + offset: Option[LedgerOffset]): Source[CompletionStreamElement, NotUsed] = CommandCompletionSource( - CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, Some(offset)), + CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, offset), completionService.completionStream) "Commands Completion Service" when { @@ -79,7 +90,7 @@ class CommandCompletionServiceIT ctx.ledgerId, applicationId, Seq(party), - LedgerOffset(Boundary(LEDGER_BEGIN))) + Some(LedgerOffset(Boundary(LEDGER_BEGIN)))) val recordTimes = completionService.collect { case CheckpointElement(Checkpoint(Some(recordTime), _)) => recordTime @@ -107,7 +118,7 @@ class CommandCompletionServiceIT ctx.ledgerId, applicationId, configuredParties, - offset) + Some(offset)) .sliding(2, 1) .map(_.toList) .collect { @@ -154,6 +165,58 @@ class CommandCompletionServiceIT commandIds3 should not contain (commandIds1(1)) } } + + "implicitly tail the stream if no offset is passed" in allFixtures { ctx => + // A command to create a Test.Dummy contract (see //ledger/sandbox/src/main/resources/damls/Test.daml) + val createDummyCommand = ProtoCommand( + ProtoCommand.Command.Create( + ProtoCreateCommand( + templateId = Some(templateIds.dummy), + createArguments = Some( + Record( + fields = Seq( + RecordField("operator", Some(Value(Value.Sum.Party(party)))) + ) + )) + ))) + + def trackDummyCreation(client: CommandClient, context: LedgerContext, id: String) = + client.trackSingleCommand(context.command(id, Seq(createDummyCommand))) + + def trackDummyCreations(client: CommandClient, context: LedgerContext, ids: List[String]) = + Future.sequence(ids.map(trackDummyCreation(client, context, _))) + + // Don't use the `completionSource` to ensure that the RPC lands on the server before anything else happens + def tailCompletions( + context: LedgerContext + ): Future[Completion] = { + val (streamObserver, future) = FirstElementObserver[CompletionStreamResponse] + context.commandCompletionService.completionStream( + CompletionStreamRequest( + context.ledgerId.unwrap, + applicationId, + Seq(party), + offset = None), + streamObserver) + future.map(_.get).collect { + case CompletionStreamResponse(_, completion +: _) => completion + } + } + + val arbitraryCommandIds = List.tabulate(10)(_.toString) + val expectedCommandId = "the-one" + + for { + client <- ctx.commandClient(ctx.ledgerId) + _ <- trackDummyCreations(client, ctx, arbitraryCommandIds) + futureCompletion = tailCompletions(ctx) // this action and the following have to run concurrently + _ = trackDummyCreation(client, ctx, expectedCommandId) // concurrent to previous action + completion <- futureCompletion + } yield { + completion.commandId shouldBe expectedCommandId + } + + } } } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/ApiCommandCompletionService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/ApiCommandCompletionService.scala index 661493353d0a..42d426608d85 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/ApiCommandCompletionService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/services/ApiCommandCompletionService.scala @@ -45,8 +45,10 @@ class ApiCommandCompletionService private ( subscriptionId: Any, request) + val offset = request.offset.getOrElse(LedgerOffset.LedgerEnd) + completionsService - .getCompletions(request.offset, request.applicationId, request.parties) + .getCompletions(offset, request.applicationId, request.parties) .via(Slf4JLog(logger, s"Serving response for completion subscription $subscriptionId")) } diff --git a/unreleased.rst b/unreleased.rst index 788a8bac6451..f912e775cc6b 100644 --- a/unreleased.rst +++ b/unreleased.rst @@ -9,6 +9,9 @@ This page contains release notes for the SDK. HEAD — ongoing -------------- +- [Sandbox] The completion stream method of the command completion service uses the ledger end as a default value for the offset. See `#1913 `__. +- [Java bindings] Added overloads to the Java bindings ``CompletionStreamRequest`` constructor and the ``CommandCompletionClient`` to accept a request without an explicit ledger offset. See `#1913 `__. +- [Java bindings] **DEPRECATION**: the ``CompletionStreamRequest#getOffset`` method is deprecated in favor of the non-nullable ``CompletionStreamRequest#getLedgerOffset``. See `#1913 `__. - [Scala bindings] Contract keys are exposed on CreatedEvent. See `#1681 `__. - [Navigator] Contract keys are show in the contract details page. See `#1681 `__. - [DAML Standard Library] **BREAKING CHANGE**: Remove the deprecated modules ``DA.Map``, ``DA.Set``, ``DA.Experimental.Map`` and ``DA.Experimental.Set``. Please use ``DA.Next.Map`` and ``DA.Next.Set`` instead.