Skip to content

Commit

Permalink
ActiveContractsService stream ow always returns at least 1 element (d…
Browse files Browse the repository at this point in the history
…igital-asset#2799)

This removes the need for clients to handle the special case where the
stream might be empty.
Now the clients can always assume that they receive at least one
response element in the stream.
  • Loading branch information
gerolf-da authored and mergify[bot] committed Sep 18, 2019
1 parent 96642d9 commit b70e289
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ option csharp_namespace = "Com.DigitalAsset.Ledger.Api.V1";
// Allows clients to initialize themselves according to a fairly recent state of the ledger without reading through all transactions that were committed since the ledger's creation.
service ActiveContractsService {

// Returns a stream of the latest snapshot of active contracts. Getting an empty stream means that the active contracts set is empty and the client should listen to transactions using ``LEDGER_BEGIN``.
// Returns a stream of the latest snapshot of active contracts.
// If there are no active contracts, the stream returns a single GetActiveContractsResponse message with the offset at which the snapshot has been taken.
// Clients SHOULD use the offset in the last GetActiveContractsResponse message to continue streaming transactions with the transaction service.
// Clients SHOULD NOT assume that the set of active contracts they receive reflects the state at the ledger end.
rpc GetActiveContracts (GetActiveContractsRequest) returns (stream GetActiveContractsResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

package com.digitalasset.http

import akka.stream.scaladsl.{Keep, Source}
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.http.util.FutureUtil.toFuture
import com.digitalasset.jwt.domain.Jwt
Expand All @@ -15,7 +16,6 @@ import com.digitalasset.ledger.api.v1.command_service.{
import com.digitalasset.ledger.api.v1.transaction_filter.TransactionFilter
import com.digitalasset.ledger.client.LedgerClient
import com.digitalasset.ledger.client.configuration.LedgerClientConfiguration
import com.digitalasset.util.akkastreams.ExtractMaterializedValue
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import io.grpc.stub.MetadataUtils
import io.grpc.{Channel, ClientInterceptors, Metadata}
Expand All @@ -29,7 +29,7 @@ object LedgerClientJwt {
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionResponse]

type GetActiveContracts =
(Jwt, TransactionFilter, Boolean) => Source[GetActiveContractsResponse, Future[String]]
(Jwt, TransactionFilter, Boolean) => Source[GetActiveContractsResponse, NotUsed]

def singleHostChannel(
hostIp: String,
Expand Down Expand Up @@ -84,16 +84,12 @@ object LedgerClientJwt {
(jwt, req) =>
forChannel(jwt, config, channel)
.flatMap(_.commandServiceClient.submitAndWaitForTransaction(req))

private val exMat = new ExtractMaterializedValue((_: LedgerClient) => None: Option[String])

@SuppressWarnings(Array("org.wartremover.warts.Any"))
def getActiveContracts(config: LedgerClientConfiguration, channel: io.grpc.Channel)(
implicit ec: ExecutionContext,
esf: ExecutionSequencerFactory): GetActiveContracts =
(jwt, filter, flag) =>
Source
.fromFuture(forChannel(jwt, config, channel))
.viaMat(exMat)(Keep.right)
.flatMapConcat(client => client.activeContractSetClient.getActiveContracts(filter, flag))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ import scalaz.syntax.tag._

class ActiveContractSetClient(ledgerId: LedgerId, activeContractsService: ActiveContractsService)(
implicit esf: ExecutionSequencerFactory) {
/*
Returns a stream of GetActiveContractsResponse messages. The materialized value will
be resolved to the offset that can be used as a starting offset for streaming transactions
via the transaction service.
If the stream completes before the offset can be set, the materialized future will
be failed with an exception.
*/
def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean = false): Source[GetActiveContractsResponse, Future[String]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object ActiveContractSetSource {

ClientAdapter
.serverStreaming(request, stub)
.viaMat(new ExtractMaterializedValue(r => Some(r.offset)))(Keep.right)
.viaMat(new ExtractMaterializedValue(r => if (r.offset.nonEmpty) Some(r.offset) else None))(
Keep.right)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ExtractSingleMaterializedValueTest
}

"there are multiple valid values" should {
"extract the first" in {
"extract the first matching element" in {
val elemToExtract = -1
val otherCandidateShuffledIn = -2

Expand Down
1 change: 1 addition & 0 deletions unreleased.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ HEAD — ongoing
+ [Sandbox] Dramatically increased performance of the ActiveContractService by only loading the contracts that the parties in the transaction filter are allowed to see.
+ [DAML-LF] change signature of MUL_NUMERIC and DIV_NUMERIC
+ [DAML Integration Kit] fix contract key uniqueness check in kvutils.
+ [Ledger API] ActiveContractsService now specifies to always return at least one message with the offset. This removes a special case where clients would need to check if the stream was empty or not.

0 comments on commit b70e289

Please sign in to comment.