Skip to content

Commit

Permalink
The completion stream RPC defaults to the ledger end as offset (digit…
Browse files Browse the repository at this point in the history
…al-asset#1961)

* The completion stream RPC defaults to the ledger end as offset

Fixes digital-asset#1913

Relevant changes are propagated to the Java bindings (including
deprecating a method that would now return a nullable ledger end).

* Refactor completionStream method

* Address review comments

- ignore command creation results
(digital-asset#1961 (comment))
- avoid re-connecting to the client for every command
(digital-asset#1961 (comment))
- move offset field optionality to domain object
(digital-asset#1961 (comment))

* Improve tests
  • Loading branch information
stefanobaghino-da authored and mergify[bot] committed Jul 2, 2019
1 parent 6bef12f commit 4774e75
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public interface CommandCompletionClient {

Flowable<CompletionStreamResponse> completionStream(String applicationId, LedgerOffset offset, Set<String> parties);
Flowable<CompletionStreamResponse> completionStream(String applicationId, Set<String> parties);

Single<CompletionEndResponse> completionEnd();
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void close() throws Exception {
pool.close();
}

public static void main(String[] args) throws SSLException {
public static void main(String[] args) {
DamlLedgerClient ledgerClient = DamlLedgerClient.forHostWithLedgerIdDiscovery("localhost", 6865, Optional.empty());
ledgerClient.connect();
String ledgerId = ledgerClient.ledgerIdentityClient.getLedgerIdentity().blockingGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@ public CommandCompletionClientImpl(String ledgerId, Channel channel, ExecutionSe
serviceFutureStub = CommandCompletionServiceGrpc.newFutureStub(channel);
}

@Override
public Flowable<CompletionStreamResponse> completionStream(String applicationId, LedgerOffset offset, Set<String> parties) {
CommandCompletionServiceOuterClass.CompletionStreamRequest request = new CompletionStreamRequest(ledgerId, applicationId, parties, offset).toProto();
private Flowable<CompletionStreamResponse> completionStream(CompletionStreamRequest request) {
return ClientPublisherFlowable
.create(request, serviceStub::completionStream, sequencerFactory)
.create(request.toProto(), serviceStub::completionStream, sequencerFactory)
.map(CompletionStreamResponse::fromProto);
}

@Override
public Flowable<CompletionStreamResponse> completionStream(String applicationId, LedgerOffset offset, Set<String> parties) {
return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties, offset));
}

@Override
public Flowable<CompletionStreamResponse> completionStream(String applicationId, Set<String> parties) {
return completionStream(new CompletionStreamRequest(ledgerId, applicationId, parties));
}

@Override
public Single<CompletionEndResponse> completionEnd() {
CommandCompletionServiceOuterClass.CompletionEndRequest request = CommandCompletionServiceOuterClass.CompletionEndRequest.newBuilder().setLedgerId(ledgerId).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class DummyLedgerClient(
offset: LedgerOffset,
parties: util.Set[String]): Flowable[CompletionStreamResponse] =
commandCompletions
override def completionStream(
applicationId: String,
parties: util.Set[String]): Flowable[CompletionStreamResponse] =
commandCompletions

override def completionEnd(): Single[CompletionEndResponse] = ???
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

public class CompletionStreamRequest {
Expand All @@ -17,7 +18,7 @@ public class CompletionStreamRequest {

private final Set<String> parties;

private final LedgerOffset offset;
private final Optional<LedgerOffset> offset;

public static CompletionStreamRequest fromProto(CommandCompletionServiceOuterClass.CompletionStreamRequest request) {
String ledgerId = request.getLedgerId();
Expand All @@ -28,12 +29,13 @@ public static CompletionStreamRequest fromProto(CommandCompletionServiceOuterCla
}

public CommandCompletionServiceOuterClass.CompletionStreamRequest toProto() {
return CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder()
.setLedgerId(this.ledgerId)
.setApplicationId(this.applicationId)
.addAllParties(this.parties)
.setOffset(this.offset.toProto())
.build();
CommandCompletionServiceOuterClass.CompletionStreamRequest.Builder protoBuilder =
CommandCompletionServiceOuterClass.CompletionStreamRequest.newBuilder()
.setLedgerId(this.ledgerId)
.setApplicationId(this.applicationId)
.addAllParties(this.parties);
this.offset.ifPresent(offset -> protoBuilder.setOffset(offset.toProto()));
return protoBuilder.build();
}

@Override
Expand Down Expand Up @@ -76,15 +78,27 @@ public Set<String> getParties() {
return parties;
}

/**
* @deprecated Legacy, nullable version of {@link #getLedgerOffset()}, which should be used instead.
*/
@Deprecated
public LedgerOffset getOffset() {
return offset;
return offset.orElse(null);
}

public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties, LedgerOffset offset) {
public Optional<LedgerOffset> getLedgerOffset() { return offset; }

public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties) {
this.ledgerId = ledgerId;
this.applicationId = applicationId;
this.parties = parties;
this.offset = Optional.empty();
}

public CompletionStreamRequest(String ledgerId, String applicationId, Set<String> parties, LedgerOffset offset) {
this.ledgerId = ledgerId;
this.applicationId = applicationId;
this.parties = parties;
this.offset = offset;
this.offset = Optional.of(offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.digitalasset.ledger.api.v1.trace_context.TraceContext
import com.digitalasset.ledger.api.v1.transaction.{Transaction, TransactionTree, TreeEvent}
import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
import com.digitalasset.ledger.api.v1.value.{Identifier, Value}
import com.digitalasset.ledger.api.v1.value.Value.Sum.Text
import com.digitalasset.ledger.api.v1.value.{Identifier, Value}
import com.google.protobuf.timestamp.Timestamp

import scala.util.Random
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class CompletionServiceRequestValidator(ledgerId: LedgerId, partyNameChecker: Pa
.map(invalidField("application_id", _))
nonEmptyParties <- requireNonEmpty(request.parties, "parties")
knownParties <- partyValidator.requireKnownParties(nonEmptyParties)
offset <- FieldValidations.requirePresence(request.offset, "offset")
convertedOffset <- LedgerOffsetValidator.validate(offset, "offset")
convertedOffset <- LedgerOffsetValidator.validateOptional(request.offset, "offset")
} yield
CompletionStreamRequest(
ledgerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ object LedgerOffsetValidator {

private val boundary = "boundary"

def validateOptional(
ledgerOffset: Option[LedgerOffset],
fieldName: String): Either[StatusRuntimeException, Option[domain.LedgerOffset]] =
ledgerOffset
.map(validate(_, fieldName))
.fold[Either[StatusRuntimeException, Option[domain.LedgerOffset]]](Right(None))(
_.map(Some(_)))

def validate(
ledgerOffset: LedgerOffset,
fieldName: String): Either[StatusRuntimeException, domain.LedgerOffset] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ class TransactionServiceRequestValidator(
filter <- requirePresence(req.filter, "filter")
requiredBegin <- requirePresence(req.begin, "begin")
convertedBegin <- LedgerOffsetValidator.validate(requiredBegin, "begin")
convertedEnd <- req.end
.fold[Result[Option[domain.LedgerOffset]]](rightNone)(end =>
LedgerOffsetValidator.validate(end, "end").map(Some(_)))
convertedEnd <- LedgerOffsetValidator.validateOptional(req.end, "end")
knownParties <- partyValidator.requireKnownParties(req.getFilter.filtersByParty.keySet)
} yield
PartialValidation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,50 @@ import com.google.rpc.status.Status
import io.grpc.Status.Code
import scalaz.syntax.tag._

import com.digitalasset.ledger.api.messages.command.completion.{
CompletionStreamRequest => ValidatedCompletionStreamRequest
}

import scala.concurrent.Future

object GrpcCommandCompletionService {

private[this] val completionStreamDefaultOffset = Some(domain.LedgerOffset.LedgerEnd)

private def fillInWithDefaults(
request: ValidatedCompletionStreamRequest): ValidatedCompletionStreamRequest =
if (request.offset.isDefined) {
request
} else {
request.copy(offset = completionStreamDefaultOffset)
}

}

class GrpcCommandCompletionService(
ledgerId: LedgerId,
service: CommandCompletionService,
partyNameChecker: PartyNameChecker
)(implicit protected val esf: ExecutionSequencerFactory, protected val mat: Materializer)
extends CommandCompletionServiceAkkaGrpc {

import GrpcCommandCompletionService.fillInWithDefaults

private val validator = new CompletionServiceRequestValidator(ledgerId, partyNameChecker)

override def completionStreamSource(
request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] =
request: CompletionStreamRequest): Source[CompletionStreamResponse, akka.NotUsed] = {
validator
.validateCompletionStreamRequest(request)
.fold(
Source.failed[CompletionStreamResponse],
validatedRequest =>
validatedRequest => {
service
.completionStreamSource(validatedRequest)
.completionStreamSource(fillInWithDefaults(validatedRequest))
.map(toApiCompletion)
}
)
}

override def completionEnd(request: CompletionEndRequest): Future[CompletionEndResponse] =
validator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ case class CompletionStreamRequest(
ledgerId: LedgerId,
applicationId: ApplicationId,
parties: Set[Ref.Party],
offset: LedgerOffset
offset: Option[LedgerOffset]
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,31 @@ import com.digitalasset.ledger.api.testing.utils.{
import com.digitalasset.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc.CommandCompletionService
import com.digitalasset.ledger.api.v1.command_completion_service.{
Checkpoint,
CompletionStreamRequest
CompletionStreamRequest,
CompletionStreamResponse
}
import com.digitalasset.ledger.api.v1.commands.{
Command => ProtoCommand,
CreateCommand => ProtoCreateCommand
}
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.digitalasset.ledger.api.v1.transaction_service.GetLedgerEndRequest
import com.digitalasset.ledger.api.v1.value.{Record, RecordField, Value}
import com.digitalasset.ledger.client.services.commands.CompletionStreamElement.{
CheckpointElement,
CompletionElement
}
import com.digitalasset.ledger.client.services.commands.{
CommandClient,
CommandCompletionSource,
CompletionStreamElement
}
import com.digitalasset.platform.apitesting.LedgerContextExtensions._
import com.digitalasset.platform.apitesting.MultiLedgerFixture
import com.digitalasset.platform.apitesting.{LedgerContext, MultiLedgerFixture, TestTemplateIds}
import com.digitalasset.platform.sandbox.utils.FirstElementObserver
import com.digitalasset.platform.services.time.TimeProviderType.WallClock
import com.digitalasset.util.Ctx
import org.scalatest.{AsyncWordSpec, Matchers}
Expand All @@ -46,14 +55,16 @@ class CommandCompletionServiceIT
with MultiLedgerFixture
with SuiteResourceManagementAroundAll {

private[this] val templateIds = new TestTemplateIds(config).templateIds

private def completionSource(
completionService: CommandCompletionService,
ledgerId: domain.LedgerId,
applicationId: String,
parties: Seq[String],
offset: LedgerOffset): Source[CompletionStreamElement, NotUsed] =
offset: Option[LedgerOffset]): Source[CompletionStreamElement, NotUsed] =
CommandCompletionSource(
CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, Some(offset)),
CompletionStreamRequest(ledgerId.unwrap, applicationId, parties, offset),
completionService.completionStream)

"Commands Completion Service" when {
Expand All @@ -79,7 +90,7 @@ class CommandCompletionServiceIT
ctx.ledgerId,
applicationId,
Seq(party),
LedgerOffset(Boundary(LEDGER_BEGIN)))
Some(LedgerOffset(Boundary(LEDGER_BEGIN))))

val recordTimes = completionService.collect {
case CheckpointElement(Checkpoint(Some(recordTime), _)) => recordTime
Expand Down Expand Up @@ -107,7 +118,7 @@ class CommandCompletionServiceIT
ctx.ledgerId,
applicationId,
configuredParties,
offset)
Some(offset))
.sliding(2, 1)
.map(_.toList)
.collect {
Expand Down Expand Up @@ -154,6 +165,58 @@ class CommandCompletionServiceIT
commandIds3 should not contain (commandIds1(1))
}
}

"implicitly tail the stream if no offset is passed" in allFixtures { ctx =>
// A command to create a Test.Dummy contract (see //ledger/sandbox/src/main/resources/damls/Test.daml)
val createDummyCommand = ProtoCommand(
ProtoCommand.Command.Create(
ProtoCreateCommand(
templateId = Some(templateIds.dummy),
createArguments = Some(
Record(
fields = Seq(
RecordField("operator", Some(Value(Value.Sum.Party(party))))
)
))
)))

def trackDummyCreation(client: CommandClient, context: LedgerContext, id: String) =
client.trackSingleCommand(context.command(id, Seq(createDummyCommand)))

def trackDummyCreations(client: CommandClient, context: LedgerContext, ids: List[String]) =
Future.sequence(ids.map(trackDummyCreation(client, context, _)))

// Don't use the `completionSource` to ensure that the RPC lands on the server before anything else happens
def tailCompletions(
context: LedgerContext
): Future[Completion] = {
val (streamObserver, future) = FirstElementObserver[CompletionStreamResponse]
context.commandCompletionService.completionStream(
CompletionStreamRequest(
context.ledgerId.unwrap,
applicationId,
Seq(party),
offset = None),
streamObserver)
future.map(_.get).collect {
case CompletionStreamResponse(_, completion +: _) => completion
}
}

val arbitraryCommandIds = List.tabulate(10)(_.toString)
val expectedCommandId = "the-one"

for {
client <- ctx.commandClient(ctx.ledgerId)
_ <- trackDummyCreations(client, ctx, arbitraryCommandIds)
futureCompletion = tailCompletions(ctx) // this action and the following have to run concurrently
_ = trackDummyCreation(client, ctx, expectedCommandId) // concurrent to previous action
completion <- futureCompletion
} yield {
completion.commandId shouldBe expectedCommandId
}

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class ApiCommandCompletionService private (
subscriptionId: Any,
request)

val offset = request.offset.getOrElse(LedgerOffset.LedgerEnd)

completionsService
.getCompletions(request.offset, request.applicationId, request.parties)
.getCompletions(offset, request.applicationId, request.parties)
.via(Slf4JLog(logger, s"Serving response for completion subscription $subscriptionId"))
}

Expand Down
3 changes: 3 additions & 0 deletions unreleased.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ This page contains release notes for the SDK.
HEAD — ongoing
--------------

- [Sandbox] The completion stream method of the command completion service uses the ledger end as a default value for the offset. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Java bindings] Added overloads to the Java bindings ``CompletionStreamRequest`` constructor and the ``CommandCompletionClient`` to accept a request without an explicit ledger offset. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Java bindings] **DEPRECATION**: the ``CompletionStreamRequest#getOffset`` method is deprecated in favor of the non-nullable ``CompletionStreamRequest#getLedgerOffset``. See `#1913 <https://github.com/digital-asset/daml/issues/1913>`__.
- [Scala bindings] Contract keys are exposed on CreatedEvent. See `#1681 <https://github.com/digital-asset/daml/issues/1681>`__.
- [Navigator] Contract keys are show in the contract details page. See `#1681 <https://github.com/digital-asset/daml/issues/1681>`__.
- [DAML Standard Library] **BREAKING CHANGE**: Remove the deprecated modules ``DA.Map``, ``DA.Set``, ``DA.Experimental.Map`` and ``DA.Experimental.Set``. Please use ``DA.Next.Map`` and ``DA.Next.Set`` instead.
Expand Down

0 comments on commit 4774e75

Please sign in to comment.