Skip to content

Commit

Permalink
websocket variant of query endpoint (#3936)
Browse files Browse the repository at this point in the history
* minor improvements on websocket and add websocketIT

* add it for websocket, and support config args

* add one more test case

* make ws config optional

* avoid fromTryCatchNonFatal when derivative already exists

* spelling and missing type parameter

* use richer Matchers in WebsocketServiceIntegrationTest

* scalafmt

* IDEs may love braces but we don't

* utility for simplifying FanOutShape2s; use in ContractsService

* split matSecondOut into generalization; make compile again

* match matSecondOut utility with standard utility methods

* spelling

* getCreatesAndArchivesSince doesn't need to query the transaction boundary

* boolean newtype utility

* split up transactionMessageHandler into components

* decodeAndParsePayload passes through the Jwt

* clean up config and default WS config

* take multiple template IDs for insertDeleteStepSource

* replace websocket return with {errors, add, remove}, based on acsFollowingAndBoundary

* parse ValuePredicate in websocket

* remove unused lfvToJson

* nominal internal state for emitted WS steps-and-errors

* missing copyright headers

* add filtering to convertFilterContracts

* add step conflation to websocket output

* move conflation to static function

* rename /transactions endpoint to /contracts/searchForever

* empty requests are not allowed; numConns is per-service

* option for GetCreatesAndArchiveSince to not terminate; use in WebsocketService

* start of searchForever documentation

* stub searchForever longer test

* use valueOr

* don't run all other tests again with WebsocketServiceIntegrationTest

* start of websocket delta test

* solve init order problem with AbstractHttpServiceIntegrationTestFuns

- previous order caused test set to be cleared; mutation is intuitive
  for sure!

* full flow test, fails for lack of create/exercise yet

* passing full flow test

* full documentation examples

* rename add/remove to created/archived

* cleaner NewBoolean.Named

* document heartbeats

* document subprotocols for searchForever

* note about the tests mysteriously terminating

* ensure create has happened before attempting query in tests

* reorganize multi-step WS test so its states and assertions are clearer

* filter out heartbeats in raw string tests

* factor out ContractDelta

* make exercisePayload easier to read

* filter out heartbeats in conversation test

* remove type lambda

* accept chunked queries

- clients may not be in control of how query bodies are delivered to the
  server, so we should be agnostic in that respect

* add changelog

CHANGELOG_BEGIN

- [JSON API - Experimental] WebSocket contract search at ``/contracts/searchForever``.
  See `issue #3936 <https://github.com/digital-asset/daml/pull/3936>`_.

CHANGELOG_END

* adapt to #3991 template ID strings

* adapt to #3971 argument -> payload

* fix create command for test (string template ID redux)

* adapt to #4014 ResolveTemplateId change

* update copyright headers

* rebuild WS example output to match latest changes

- thanks @leo-da

* SeqOps is not a safe name

* don't need breakOut anymore

* use util library form of partitionMap

- thanks @leo-da for pointing it out

Co-authored-by: lima-da <54044170+lima-da@users.noreply.github.com>
  • Loading branch information
S11001001 and lima-da authored Jan 15, 2020
1 parent 45c474b commit 3ae0438
Show file tree
Hide file tree
Showing 18 changed files with 820 additions and 225 deletions.
100 changes: 99 additions & 1 deletion docs/source/json-api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ Under "Verify Signature", put ``secret`` as the secret (_not_ base64
encoded); that is the hardcoded secret for testing.

Then the "Encoded" box should have your token; set HTTP header
``Authorization: Bearer copy-paste-token-here``.
``Authorization: Bearer copy-paste-token-here`` for normal requests, and
add the subprotocols ``jwt.token.copy-paste-token-here`` and
``daml.ws.auth`` for WebSockets requests.

Here are two tokens you can use for testing:

Expand Down Expand Up @@ -251,6 +253,102 @@ Nonempty Response with Unknown Template IDs Warning
"status": 200
}
WebSocket ``/contracts/searchForever``
======================================

List currently active contracts that match a given query, with
continuous updates.

Two subprotocols must be passed, as described in `Choosing a party
<#choosing-a-party>`__.

application/json body must be sent first, formatted according to the
:doc:`search-query-language`::

{"%templates": ["Iou:Iou"]}

output a series of JSON documents, each ``argument`` formatted according
to :doc:`lf-value-specification`::

{
"created": [{
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "999.99",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#1:0",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}]
}

To keep the stream alive, you'll occasionally see messages like this,
which can be safely ignored::

{"heartbeat": "ping"}

After submitting an ``Iou_Split`` exercise, which creates two contracts
and archives the one above, the same stream will eventually produce::

{
"created": [{
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "42.42",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#2:1",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}, {
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "957.57",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#2:2",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}],
"archived": ["#1:0"]
}

Some notes on behavior:

1. Each result object means "this is what would have changed if you just
polled ``/contracts/search`` iteratively." In particular, just as
polling search can "miss" contracts (as a create and archive can be
paired between polls), such contracts may or may not appear in any
result object.

2. No ``archived`` ever contains a contract ID occurring within an
``created`` in the same object. So, for example, supposing you are
keeping an internal map of active contracts, you can apply the
``created`` first or the ``archived`` first and be guaranteed to get
the same results.

3. You will almost certainly receive contract IDs in the ``archived``
set that you never received an ``created`` for. These are contracts
that query filtered out, but for which the server no longer is aware
of that. You can safely ignore these. However, such "phantom
archives" *are* guaranteed to represent an actual archival *on the
ledger*, so if you are keeping a more global dataset outside the
context of this specific search, you can use that archival
information as you wish.

POST ``/command/create``
========================

Expand Down
3 changes: 3 additions & 0 deletions ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ hj_scalacopts = [
da_scala_library(
name = "http-json",
srcs = glob(["src/main/scala/**/*.scala"]),
plugins = [
"@maven//:org_spire_math_kind_projector_2_12",
],
scalacopts = hj_scalacopts,
tags = ["maven_coordinates=com.digitalasset.ledger-service:http-json:__VERSION__"],
visibility = ["//visibility:public"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.digitalasset.http

import java.io.File
import java.nio.file.Path
import java.util.concurrent.TimeUnit

import akka.stream.ThrottleMode
import com.digitalasset.util.ExceptionOps._
Expand All @@ -27,23 +28,22 @@ private[http] final case class Config(
jdbcConfig: Option[JdbcConfig] = None,
staticContentConfig: Option[StaticContentConfig] = None,
accessTokenFile: Option[Path] = None,
wsConfig: WebsocketConfig = Config.DefaultWsConfig
wsConfig: Option[WebsocketConfig] = None
)

private[http] object Config {
import scala.language.postfixOps
val Empty = Config(ledgerHost = "", ledgerPort = -1, httpPort = -1)
val DefaultWsConfig = WebsocketConfig(12 hours, 20, 1 second, 20, ThrottleMode.Shaping)
val DefaultWsConfig =
WebsocketConfig(
maxDuration = 120 minutes,
throttleElem = 20,
throttlePer = 1 second,
maxBurst = 20,
ThrottleMode.Shaping,
heartBeatPer = 5 second)
}

protected case class WebsocketConfig(
maxDuration: FiniteDuration,
throttleElem: Int,
throttlePer: FiniteDuration,
maxBurst: Int,
mode: ThrottleMode
)

private[http] abstract class ConfigCompanion[A](name: String) {

def create(x: Map[String, String]): Either[String, A]
Expand All @@ -62,11 +62,19 @@ private[http] abstract class ConfigCompanion[A](name: String) {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
protected def optionalBooleanField(m: Map[String, String])(
k: String): Either[String, Option[Boolean]] =
m.get(k).traverseU(v => parseBoolean(k)(v)).toEither
m.get(k).traverse(v => parseBoolean(k)(v)).toEither

@SuppressWarnings(Array("org.wartremover.warts.Any"))
protected def optionalLongField(m: Map[String, String])(k: String): Either[String, Option[Long]] =
m.get(k).traverse(v => parseLong(k)(v)).toEither

import scalaz.syntax.std.string._

protected def parseBoolean(k: String)(v: String): String \/ Boolean =
\/.fromTryCatchNonFatal(v.toBoolean).leftMap(e =>
s"$k=$v must be a boolean value: ${e.description}")
v.parseBoolean.leftMap(e => s"$k=$v must be a boolean value: ${e.description}").disjunction

protected def parseLong(k: String)(v: String): String \/ Long =
v.parseLong.leftMap(e => s"$k=$v must be a int value: ${e.description}").disjunction

protected def requiredDirectoryField(m: Map[String, String])(k: String): Either[String, File] =
requiredField(m)(k).flatMap(directory)
Expand Down Expand Up @@ -139,6 +147,49 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
s"""\"driver=$driver,url=$url,user=$user,password=$password,createSchema=$createSchema\""""
}

private[http] final case class WebsocketConfig(
maxDuration: FiniteDuration,
throttleElem: Int,
throttlePer: FiniteDuration,
maxBurst: Int,
mode: ThrottleMode,
heartBeatPer: FiniteDuration
)

private[http] object WebsocketConfig extends ConfigCompanion[WebsocketConfig]("WebsocketConfig") {

implicit val showInstance: Show[WebsocketConfig] = Show.shows(c =>
s"WebsocketConfig(maxDuration=${c.maxDuration}, heartBeatPer=${c.heartBeatPer}.seconds)")

lazy val help: String =
"Contains comma-separated key-value pairs. Where:\n" +
"\tmaxDuration -- Maximum websocket session duration in minutes\n" +
"\theartBeatPer -- Server-side heartBeat interval in seconds\n" +
"\tExample: " + helpString("120", "5")

lazy val usage: String = helpString(
"<Maximum websocket session duration in minutes>",
"Server-side heartBeat interval in seconds")

override def create(x: Map[String, String]): Either[String, WebsocketConfig] =
for {
md <- optionalLongField(x)("maxDuration")
hbp <- optionalLongField(x)("heartBeatPer")
} yield
Config.DefaultWsConfig
.copy(
maxDuration = md
.map(t => FiniteDuration(t, TimeUnit.MINUTES))
.getOrElse(Config.DefaultWsConfig.maxDuration),
heartBeatPer = hbp
.map(t => FiniteDuration(t, TimeUnit.SECONDS))
.getOrElse(Config.DefaultWsConfig.heartBeatPer)
)

private def helpString(maxDuration: String, heartBeatPer: String): String =
s"""\"maxDuration=$maxDuration,heartBeatPer=$heartBeatPer\""""
}

private[http] final case class StaticContentConfig(
prefix: String,
directory: File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import akka.stream.scaladsl.{
Concat,
Flow,
GraphDSL,
Keep,
Partition,
RunnableGraph,
Sink,
SinkQueueWithCancel,
Source
}
import akka.stream.{ClosedShape, FanOutShape2, Graph, Materializer}
import akka.stream.{ClosedShape, FanOutShape2, FlowShape, Graph, Materializer}
import com.digitalasset.http.Statement.discard
import com.digitalasset.http.dbbackend.ContractDao.StaleOffsetException
import com.digitalasset.http.dbbackend.{ContractDao, Queries}
import com.digitalasset.http.dbbackend.Queries.{DBContract, SurrogateTpId}
import com.digitalasset.http.domain.TemplateId
import com.digitalasset.http.LedgerClientJwt.Terminates
import com.digitalasset.http.util.ApiValueToLfValueConverter.apiValueToLfValue
import com.digitalasset.http.json.JsonProtocol.LfValueDatabaseCodec.{
apiValueToJsValue => lfValueToDbJsValue
Expand Down Expand Up @@ -150,13 +152,14 @@ private class ContractsFetch(
GraphDSL.create(
Sink.queue[ConnectionIO[Unit]](),
Sink.last[OffsetBookmark[String]]
)((a, b) => (a, b)) { implicit builder => (acsSink, offsetSink) =>
)(Keep.both) { implicit builder => (acsSink, offsetSink) =>
import GraphDSL.Implicits._

val txnK = getCreatesAndArchivesSince(
jwt,
transactionFilter(party, List(templateId)),
_: lav1.ledger_offset.LedgerOffset)
_: lav1.ledger_offset.LedgerOffset,
Terminates.AtLedgerEnd)

// include ACS iff starting at LedgerBegin
val (idses, lastOff) = offset match {
Expand Down Expand Up @@ -269,6 +272,25 @@ private[http] object ContractsFetch {
InsertDeleteStep(csb.result() filter (ce => !as.contains(ce.contractId)), as)
}

object GraphExtensions {
implicit final class `Graph FOS2 funs`[A, Y, Z, M](
private val g: Graph[FanOutShape2[A, Y, Z], M])
extends AnyVal {
private def divertToMat[N, O](oz: Sink[Z, N])(mat: (M, N) => O): Flow[A, Y, O] =
Flow fromGraph GraphDSL.create(g, oz)(mat) { implicit b => (gs, zOut) =>
import GraphDSL.Implicits._
gs.out1 ~> zOut
new FlowShape(gs.in, gs.out0)
}

/** Several of the graphs here have a second output guaranteed to deliver only one value.
* This turns such a graph into a flow with the value materialized.
*/
def divertToHead(implicit noM: M <~< NotUsed): Flow[A, Y, Future[Z]] =
divertToMat(Sink.head)(Keep.right[M, Future[Z]])
}
}

/** Like `acsAndBoundary`, but also include the events produced by `transactionsSince`
* after the ACS's last offset, terminating with the last offset of the last transaction,
* or the ACS's last offset if there were no transactions.
Expand Down Expand Up @@ -404,6 +426,11 @@ private[http] object ContractsFetch {
(if (o.deletes.isEmpty) inserts
else inserts.filter(c => !o.deletes.contains(cid(c)))) ++ o.inserts,
deletes union o.deletes)

def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty

/** Results undefined if cid(d) != cid(c) */
def mapPreservingIds[D](f: C => D): InsertDeleteStep[D] = copy(inserts = inserts map f)
}

private def transactionFilter(
Expand Down
Loading

0 comments on commit 3ae0438

Please sign in to comment.