Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket variant of query endpoint #3936

Merged
merged 69 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
b134aea
minor improvements on websocket and add websocketIT
lima-da Dec 18, 2019
cfe29b0
merge in master
lima-da Dec 18, 2019
807c14a
add it for websocket, and support config args
lima-da Dec 19, 2019
2771f85
Merge in Master
lima-da Dec 19, 2019
51fb690
add one more test case
lima-da Dec 19, 2019
fcf5e14
make ws config optional
lima-da Dec 20, 2019
9ae56db
Merge branch 'master' of github.com:digital-asset/daml into json-webs…
lima-da Dec 20, 2019
78a8d17
avoid fromTryCatchNonFatal when derivative already exists
S11001001 Dec 20, 2019
792a55f
spelling and missing type parameter
S11001001 Dec 20, 2019
f7ef4ea
use richer Matchers in WebsocketServiceIntegrationTest
S11001001 Dec 20, 2019
a0dfab3
scalafmt
S11001001 Dec 20, 2019
b0fa48f
IDEs may love braces but we don't
S11001001 Dec 20, 2019
6f4e81b
utility for simplifying FanOutShape2s; use in ContractsService
S11001001 Dec 23, 2019
44d2392
split matSecondOut into generalization; make compile again
S11001001 Dec 24, 2019
eebcae4
match matSecondOut utility with standard utility methods
S11001001 Dec 24, 2019
5ecba29
spelling
S11001001 Dec 24, 2019
3b05dc8
getCreatesAndArchivesSince doesn't need to query the transaction boun…
S11001001 Dec 24, 2019
f2c026b
boolean newtype utility
S11001001 Dec 24, 2019
4a34b4c
split up transactionMessageHandler into components
S11001001 Dec 30, 2019
75c59af
decodeAndParsePayload passes through the Jwt
S11001001 Dec 30, 2019
b3dcb27
Merge commit 'f04ce1a3c898f5713d6fd6f1d3f56ced7b1c4646' into 3880-web…
S11001001 Dec 30, 2019
0dc9260
clean up config and default WS config
S11001001 Jan 1, 2020
aac4687
take multiple template IDs for insertDeleteStepSource
S11001001 Jan 1, 2020
7ae5b7d
replace websocket return with {errors, add, remove}, based on acsFoll…
S11001001 Jan 1, 2020
20f9566
parse ValuePredicate in websocket
S11001001 Jan 2, 2020
d14175f
remove unused lfvToJson
S11001001 Jan 2, 2020
4ee3246
nominal internal state for emitted WS steps-and-errors
S11001001 Jan 2, 2020
9433580
missing copyright headers
S11001001 Jan 2, 2020
ea75469
add filtering to convertFilterContracts
S11001001 Jan 2, 2020
9ff70dc
add step conflation to websocket output
S11001001 Jan 2, 2020
cf992ca
move conflation to static function
S11001001 Jan 2, 2020
0cb1444
rename /transactions endpoint to /contracts/searchForever
S11001001 Jan 2, 2020
eca83ba
empty requests are not allowed; numConns is per-service
S11001001 Jan 2, 2020
caf0786
option for GetCreatesAndArchiveSince to not terminate; use in Websock…
S11001001 Jan 2, 2020
1b94379
start of searchForever documentation
S11001001 Jan 2, 2020
707cea0
stub searchForever longer test
S11001001 Jan 2, 2020
386ad59
use valueOr
S11001001 Jan 2, 2020
2422271
don't run all other tests again with WebsocketServiceIntegrationTest
S11001001 Jan 3, 2020
735422e
start of websocket delta test
S11001001 Jan 8, 2020
3438c76
Merge commit 'a043b09375814a64c7e5823a1c549f5931e68c87' into 3880-web…
S11001001 Jan 9, 2020
5b52999
solve init order problem with AbstractHttpServiceIntegrationTestFuns
S11001001 Jan 9, 2020
da7dbc3
full flow test, fails for lack of create/exercise yet
S11001001 Jan 9, 2020
edf8969
passing full flow test
S11001001 Jan 9, 2020
f199e95
full documentation examples
S11001001 Jan 9, 2020
1469ea7
rename add/remove to created/archived
S11001001 Jan 9, 2020
4383b1c
cleaner NewBoolean.Named
S11001001 Jan 9, 2020
a3df010
document heartbeats
S11001001 Jan 9, 2020
a1360b5
document subprotocols for searchForever
S11001001 Jan 10, 2020
798f918
note about the tests mysteriously terminating
S11001001 Jan 10, 2020
427704d
ensure create has happened before attempting query in tests
S11001001 Jan 10, 2020
5e10fcc
reorganize multi-step WS test so its states and assertions are clearer
S11001001 Jan 10, 2020
7cb76bc
filter out heartbeats in raw string tests
S11001001 Jan 10, 2020
917cbc7
factor out ContractDelta
S11001001 Jan 10, 2020
fe89d28
make exercisePayload easier to read
S11001001 Jan 10, 2020
2d3448c
filter out heartbeats in conversation test
S11001001 Jan 10, 2020
9c9d1a2
remove type lambda
S11001001 Jan 10, 2020
131e0f7
accept chunked queries
S11001001 Jan 10, 2020
8e1b92f
add changelog
S11001001 Jan 10, 2020
b9ce77a
Merge commit '006aa9b60816e13b4587b565f3348bc438507074' into 3880-web…
S11001001 Jan 10, 2020
140ef02
adapt to #3991 template ID strings
S11001001 Jan 10, 2020
a32e105
adapt to #3971 argument -> payload
S11001001 Jan 10, 2020
d87cfcb
fix create command for test (string template ID redux)
S11001001 Jan 15, 2020
0c585da
Merge commit '536d6a3f9468c8fd1461872c73717ec2798d80c2' into 3880-web…
S11001001 Jan 15, 2020
d757518
adapt to #4014 ResolveTemplateId change
S11001001 Jan 15, 2020
af7d548
update copyright headers
S11001001 Jan 15, 2020
42dc6b9
rebuild WS example output to match latest changes
S11001001 Jan 15, 2020
094668b
SeqOps is not a safe name
S11001001 Jan 15, 2020
c6b58c0
don't need breakOut anymore
S11001001 Jan 15, 2020
c3e5537
use util library form of partitionMap
S11001001 Jan 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion docs/source/json-api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ Under "Verify Signature", put ``secret`` as the secret (_not_ base64
encoded); that is the hardcoded secret for testing.

Then the "Encoded" box should have your token; set HTTP header
``Authorization: Bearer copy-paste-token-here``.
``Authorization: Bearer copy-paste-token-here`` for normal requests, and
add the subprotocols ``jwt.token.copy-paste-token-here`` and
``daml.ws.auth`` for WebSockets requests.

Here are two tokens you can use for testing:

Expand Down Expand Up @@ -251,6 +253,102 @@ Nonempty Response with Unknown Template IDs Warning
"status": 200
}

WebSocket ``/contracts/searchForever``
======================================

List currently active contracts that match a given query, with
continuous updates.

Two subprotocols must be passed, as described in `Choosing a party
<#choosing-a-party>`__.

application/json body must be sent first, formatted according to the
:doc:`search-query-language`::

{"%templates": ["Iou:Iou"]}

output a series of JSON documents, each ``argument`` formatted according
to :doc:`lf-value-specification`::

{
"created": [{
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "999.99",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#1:0",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}]
}

To keep the stream alive, you'll occasionally see messages like this,
which can be safely ignored::

{"heartbeat": "ping"}

After submitting an ``Iou_Split`` exercise, which creates two contracts
and archives the one above, the same stream will eventually produce::

{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format differs from what you get from the /commands/exercise endpoint. Her you get something looking like

{
  created: [C1, C2],
  archived: [A1, A2]
}

There you'd instead get

[
  {created: C1},
  {created: C2},
  {archived: A1},
  {archived: A2}
]

I'd prefer if the encoding is the same for both endpoints. Since the /commands/exercise endpoint does not work with the first format, we need to use the second in both places. It would be nice if could keep the order of the events we don't filter out during consolidation for the /contracts/searchForever endpoint, but I don't consider it a hard requirement.

"created": [{
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "42.42",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#2:1",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}, {
"observers": [],
"agreementText": "",
"payload": {
"observers": [],
"issuer": "Alice",
"amount": "957.57",
"currency": "USD",
"owner": "Alice"
},
"signatories": ["Alice"],
"contractId": "#2:2",
"templateId": "f95486336ffb3c982319625bed0c88f68799b780b26b558b1e119277614ed634:Iou:Iou"
}],
"archived": ["#1:0"]
}

Some notes on behavior:

1. Each result object means "this is what would have changed if you just
polled ``/contracts/search`` iteratively." In particular, just as
polling search can "miss" contracts (as a create and archive can be
paired between polls), such contracts may or may not appear in any
result object.

2. No ``archived`` ever contains a contract ID occurring within an
``created`` in the same object. So, for example, supposing you are
keeping an internal map of active contracts, you can apply the
``created`` first or the ``archived`` first and be guaranteed to get
the same results.

3. You will almost certainly receive contract IDs in the ``archived``
set that you never received an ``created`` for. These are contracts
that query filtered out, but for which the server no longer is aware
of that. You can safely ignore these. However, such "phantom
archives" *are* guaranteed to represent an actual archival *on the
ledger*, so if you are keeping a more global dataset outside the
context of this specific search, you can use that archival
information as you wish.

POST ``/command/create``
========================

Expand Down
3 changes: 3 additions & 0 deletions ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ hj_scalacopts = [
da_scala_library(
name = "http-json",
srcs = glob(["src/main/scala/**/*.scala"]),
plugins = [
"@maven//:org_spire_math_kind_projector_2_12",
],
scalacopts = hj_scalacopts,
tags = ["maven_coordinates=com.digitalasset.ledger-service:http-json:__VERSION__"],
visibility = ["//visibility:public"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.digitalasset.http

import java.io.File
import java.nio.file.Path
import java.util.concurrent.TimeUnit

import akka.stream.ThrottleMode
import com.digitalasset.util.ExceptionOps._
Expand All @@ -27,23 +28,22 @@ private[http] final case class Config(
jdbcConfig: Option[JdbcConfig] = None,
staticContentConfig: Option[StaticContentConfig] = None,
accessTokenFile: Option[Path] = None,
wsConfig: WebsocketConfig = Config.DefaultWsConfig
wsConfig: Option[WebsocketConfig] = None
)

private[http] object Config {
import scala.language.postfixOps
val Empty = Config(ledgerHost = "", ledgerPort = -1, httpPort = -1)
val DefaultWsConfig = WebsocketConfig(12 hours, 20, 1 second, 20, ThrottleMode.Shaping)
val DefaultWsConfig =
WebsocketConfig(
maxDuration = 120 minutes,
throttleElem = 20,
throttlePer = 1 second,
maxBurst = 20,
ThrottleMode.Shaping,
heartBeatPer = 5 second)
}

protected case class WebsocketConfig(
maxDuration: FiniteDuration,
throttleElem: Int,
throttlePer: FiniteDuration,
maxBurst: Int,
mode: ThrottleMode
)

private[http] abstract class ConfigCompanion[A](name: String) {

def create(x: Map[String, String]): Either[String, A]
Expand All @@ -62,11 +62,19 @@ private[http] abstract class ConfigCompanion[A](name: String) {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
protected def optionalBooleanField(m: Map[String, String])(
k: String): Either[String, Option[Boolean]] =
m.get(k).traverseU(v => parseBoolean(k)(v)).toEither
m.get(k).traverse(v => parseBoolean(k)(v)).toEither

@SuppressWarnings(Array("org.wartremover.warts.Any"))
protected def optionalLongField(m: Map[String, String])(k: String): Either[String, Option[Long]] =
m.get(k).traverse(v => parseLong(k)(v)).toEither

import scalaz.syntax.std.string._

protected def parseBoolean(k: String)(v: String): String \/ Boolean =
\/.fromTryCatchNonFatal(v.toBoolean).leftMap(e =>
s"$k=$v must be a boolean value: ${e.description}")
v.parseBoolean.leftMap(e => s"$k=$v must be a boolean value: ${e.description}").disjunction

protected def parseLong(k: String)(v: String): String \/ Long =
v.parseLong.leftMap(e => s"$k=$v must be a int value: ${e.description}").disjunction

protected def requiredDirectoryField(m: Map[String, String])(k: String): Either[String, File] =
requiredField(m)(k).flatMap(directory)
Expand Down Expand Up @@ -139,6 +147,49 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
s"""\"driver=$driver,url=$url,user=$user,password=$password,createSchema=$createSchema\""""
}

private[http] final case class WebsocketConfig(
maxDuration: FiniteDuration,
throttleElem: Int,
throttlePer: FiniteDuration,
maxBurst: Int,
mode: ThrottleMode,
heartBeatPer: FiniteDuration
)

private[http] object WebsocketConfig extends ConfigCompanion[WebsocketConfig]("WebsocketConfig") {

implicit val showInstance: Show[WebsocketConfig] = Show.shows(c =>
s"WebsocketConfig(maxDuration=${c.maxDuration}, heartBeatPer=${c.heartBeatPer}.seconds)")

lazy val help: String =
"Contains comma-separated key-value pairs. Where:\n" +
"\tmaxDuration -- Maximum websocket session duration in minutes\n" +
"\theartBeatPer -- Server-side heartBeat interval in seconds\n" +
"\tExample: " + helpString("120", "5")

lazy val usage: String = helpString(
"<Maximum websocket session duration in minutes>",
"Server-side heartBeat interval in seconds")

override def create(x: Map[String, String]): Either[String, WebsocketConfig] =
for {
md <- optionalLongField(x)("maxDuration")
hbp <- optionalLongField(x)("heartBeatPer")
} yield
Config.DefaultWsConfig
.copy(
maxDuration = md
.map(t => FiniteDuration(t, TimeUnit.MINUTES))
.getOrElse(Config.DefaultWsConfig.maxDuration),
heartBeatPer = hbp
.map(t => FiniteDuration(t, TimeUnit.SECONDS))
.getOrElse(Config.DefaultWsConfig.heartBeatPer)
)

private def helpString(maxDuration: String, heartBeatPer: String): String =
s"""\"maxDuration=$maxDuration,heartBeatPer=$heartBeatPer\""""
}

private[http] final case class StaticContentConfig(
prefix: String,
directory: File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import akka.stream.scaladsl.{
Concat,
Flow,
GraphDSL,
Keep,
Partition,
RunnableGraph,
Sink,
SinkQueueWithCancel,
Source
}
import akka.stream.{ClosedShape, FanOutShape2, Graph, Materializer}
import akka.stream.{ClosedShape, FanOutShape2, FlowShape, Graph, Materializer}
import com.digitalasset.http.Statement.discard
import com.digitalasset.http.dbbackend.ContractDao.StaleOffsetException
import com.digitalasset.http.dbbackend.{ContractDao, Queries}
import com.digitalasset.http.dbbackend.Queries.{DBContract, SurrogateTpId}
import com.digitalasset.http.domain.TemplateId
import com.digitalasset.http.LedgerClientJwt.Terminates
import com.digitalasset.http.util.ApiValueToLfValueConverter.apiValueToLfValue
import com.digitalasset.http.json.JsonProtocol.LfValueDatabaseCodec.{
apiValueToJsValue => lfValueToDbJsValue
Expand Down Expand Up @@ -150,13 +152,14 @@ private class ContractsFetch(
GraphDSL.create(
Sink.queue[ConnectionIO[Unit]](),
Sink.last[OffsetBookmark[String]]
)((a, b) => (a, b)) { implicit builder => (acsSink, offsetSink) =>
)(Keep.both) { implicit builder => (acsSink, offsetSink) =>
import GraphDSL.Implicits._

val txnK = getCreatesAndArchivesSince(
jwt,
transactionFilter(party, List(templateId)),
_: lav1.ledger_offset.LedgerOffset)
_: lav1.ledger_offset.LedgerOffset,
Terminates.AtLedgerEnd)

// include ACS iff starting at LedgerBegin
val (idses, lastOff) = offset match {
Expand Down Expand Up @@ -269,6 +272,25 @@ private[http] object ContractsFetch {
InsertDeleteStep(csb.result() filter (ce => !as.contains(ce.contractId)), as)
}

object GraphExtensions {
implicit final class `Graph FOS2 funs`[A, Y, Z, M](
private val g: Graph[FanOutShape2[A, Y, Z], M])
extends AnyVal {
private def divertToMat[N, O](oz: Sink[Z, N])(mat: (M, N) => O): Flow[A, Y, O] =
Flow fromGraph GraphDSL.create(g, oz)(mat) { implicit b => (gs, zOut) =>
import GraphDSL.Implicits._
gs.out1 ~> zOut
new FlowShape(gs.in, gs.out0)
}

/** Several of the graphs here have a second output guaranteed to deliver only one value.
* This turns such a graph into a flow with the value materialized.
*/
def divertToHead(implicit noM: M <~< NotUsed): Flow[A, Y, Future[Z]] =
divertToMat(Sink.head)(Keep.right[M, Future[Z]])
}
}

/** Like `acsAndBoundary`, but also include the events produced by `transactionsSince`
* after the ACS's last offset, terminating with the last offset of the last transaction,
* or the ACS's last offset if there were no transactions.
Expand Down Expand Up @@ -404,6 +426,11 @@ private[http] object ContractsFetch {
(if (o.deletes.isEmpty) inserts
else inserts.filter(c => !o.deletes.contains(cid(c)))) ++ o.inserts,
deletes union o.deletes)

def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty

/** Results undefined if cid(d) != cid(c) */
def mapPreservingIds[D](f: C => D): InsertDeleteStep[D] = copy(inserts = inserts map f)
}

private def transactionFilter(
Expand Down
Loading