Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API error handling/reporting improvements #5141

Merged
merged 7 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/source/json-api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ Successful response with a warning, HTTP status: 200 OK
"warnings": <JSON object>
}

.. _error-format:

Failure, HTTP status: 400 | 401 | 404 | 500
===========================================

Expand Down Expand Up @@ -1053,6 +1055,54 @@ Streaming API
Two subprotocols must be passed with every request, as described in
`Passing token with WebSockets <#passing-token-with-websockets>`__.

JavaScript/Node.js example demonstrating how to establish Streaming API connection:

.. code-block:: javascript

const WebSocket = require("ws")

console.log("Starting")

const tokenPrefix = "jwt.token."
const wsProtocol = "daml.ws.auth"
const jwt =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU"
const subprotocol = `${tokenPrefix}${jwt},${wsProtocol}`

const ws = new WebSocket("ws://localhost:7575/v1/stream/query", [subprotocol])

ws.on("open", function open() {
ws.send(`{"templateIds": ["Iou:Iou"]}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you would typically write the JS object as an object and then encode it to JSON, yes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is Javascript, no types who cares :) but yeah JSON.stringify should do the trick. I will change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    ws.on("open", function open() {
      ws.send(JSON.stringify({templateIds: ["Iou:Iou"]}))
    })

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure :) you can at least catch dumb syntax typos like missing } or " this way.

})

ws.on("message", function incoming(data) {
console.log(data)
})

Error and Warning Reporting
===========================

Errors and warnings reported as part of the regular ``on-message`` flow.

Streaming API error messages formatted the same way as :ref:`synchronous API errors <error-format>`.

Streaming API reports only one type of warnings -- unknown template IDs, which is formatted as:

.. code-block:: none

{"warnings":{"unknownTemplateIds":<JSON Array of template ID strings>>}}

Examples:

.. code-block:: none

{"warnings": {"unknownTemplateIds": ["UnknownModule:UnknownEntity"]}}

{
"errors":["JsonReaderError. Cannot read JSON: <{\"templateIds\":[]}>. Cause: spray.json.DeserializationException: search requires at least one item in 'templateIds'"],
"status":400
}

Contracts Query Stream
======================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.digitalasset.http

import akka.NotUsed
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Source}
import akka.http.scaladsl.model.ws.{Message, TextMessage, BinaryMessage}
import akka.stream.scaladsl.{Flow, Source, Sink}
import akka.stream.Materializer
import com.digitalasset.http.EndpointsCompanion._
import com.digitalasset.http.domain.{JwtPayload, SearchForeverRequest}
Expand All @@ -21,7 +21,6 @@ import scalaz.{Liskov, NonEmptyList}
import Liskov.<~<
import com.digitalasset.http.query.ValuePredicate
import scalaz.syntax.bifunctor._
import scalaz.syntax.show._
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._
import scalaz.syntax.traverse._
Expand All @@ -30,7 +29,7 @@ import scalaz.std.option._
import scalaz.std.set._
import scalaz.std.tuple._
import scalaz.{-\/, \/, \/-}
import spray.json.{JsArray, JsObject, JsString, JsValue}
import spray.json.{JsArray, JsObject, JsValue}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -207,8 +206,8 @@ object WebSocketService {
}

implicit val EnrichedContractKeyWithStreamQuery
: StreamQuery[List[domain.EnrichedContractKey[LfV]]] =
new StreamQuery[List[domain.EnrichedContractKey[LfV]]] {
: StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] =
new StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] {

type Positive = Unit

Expand All @@ -217,10 +216,10 @@ object WebSocketService {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
override def parse(
decoder: DomainJsonDecoder,
jv: JsValue): Error \/ List[domain.EnrichedContractKey[LfV]] =
jv: JsValue): Error \/ NonEmptyList[domain.EnrichedContractKey[LfV]] =
for {
as <- SprayJson
.decode[List[domain.EnrichedContractKey[JsValue]]](jv)
.decode[NonEmptyList[domain.EnrichedContractKey[JsValue]]](jv)
.liftErr(InvalidUserInput)
bs = as.map(a => decodeWithFallback(decoder, a))
} yield bs
Expand All @@ -235,7 +234,7 @@ object WebSocketService {
override def allowPhantonArchives: Boolean = false

override def predicate(
request: List[domain.EnrichedContractKey[LfV]],
request: NonEmptyList[domain.EnrichedContractKey[LfV]],
resolveTemplateId: PackageService.ResolveTemplateId,
lookupType: TypeLookup): StreamPredicate[Positive] = {

Expand Down Expand Up @@ -269,6 +268,7 @@ class WebSocketService(
extends LazyLogging {

import WebSocketService._
import Statement.discard
import util.ErrorOps._
import com.digitalasset.http.json.JsonProtocol._

Expand Down Expand Up @@ -320,9 +320,11 @@ class WebSocketService(
msg.toStrict(config.maxDuration).map { m =>
SprayJson.parse(m.text).liftErr(InvalidUserInput)
}
case _ =>
Future successful -\/(
InvalidUserInput("Cannot process your input, Expect a single JSON message"))
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
discard { bm.dataStream.runWith(Sink.ignore) }
Future successful -\/(InvalidUserInput(
"Invalid request. Expected a single TextMessage with JSON payload, got BinaryMessage"))
}
.via(withOptPrefix(ejv => ejv.toOption flatMap readStartingOffset))
.map {
Expand All @@ -333,23 +335,29 @@ class WebSocketService(
a <- Q.parse(decoder, jv)
} yield (offPrefix, a)
}
.map {
_.flatMap {
case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
}
}
.takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error
.flatMapConcat {
case \/-((offPrefix, a)) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
case -\/(e) => Source.single(wsErrorMessage(e.shows))
case \/-(s) => s
case -\/(e) => Source.single(wsErrorMessage(e))
}
}

private def getTransactionSourceForParty[A: StreamQuery](
jwt: Jwt,
jwtPayload: JwtPayload,
offPrefix: Option[domain.StartingOffset],
request: A): Source[Message, NotUsed] = {
request: A): Error \/ Source[Message, NotUsed] = {
val Q = implicitly[StreamQuery[A]]

val (resolved, unresolved, fn) = Q.predicate(request, resolveTemplateId, lookupType)

if (resolved.nonEmpty) {
contractsService
val source = contractsService
.insertDeleteStepSource(jwt, jwtPayload.party, resolved.toList, offPrefix, Terminates.Never)
.via(convertFilterContracts(fn))
.filter(_.nonEmpty)
Expand All @@ -358,11 +366,10 @@ class WebSocketService(
.via(renderEventsAndEmitHeartbeats) // wrong place, see https://github.com/digital-asset/daml/issues/4955
.prepend(reportUnresolvedTemplateIds(unresolved))
.map(jsv => TextMessage(jsv.compactPrint))
\/-(source)
} else {
Source.single(
wsErrorMessage(
s"Cannot resolve any templateId from request: ${request: A}, " +
s"unresolved templateIds: ${unresolved: Set[domain.TemplateId.OptionalPkg]}"))
-\/(InvalidUserInput(
s"Cannot resolve any of the requested template IDs: ${unresolved: Set[domain.TemplateId.OptionalPkg]}"))
}
}

Expand Down Expand Up @@ -412,10 +419,9 @@ class WebSocketService(
.collect { case (_, Some(x)) => x }
}

private[http] def wsErrorMessage(errorMsg: String): TextMessage.Strict =
TextMessage(
JsObject("error" -> JsString(errorMsg)).compactPrint
)
private[http] def wsErrorMessage(error: Error): TextMessage.Strict = {
TextMessage(errorResponse(error).toJson.compactPrint)
}

@SuppressWarnings(Array("org.wartremover.warts.Any"))
private def convertFilterContracts[Pos](fn: domain.ActiveContract[LfV] => Option[Pos])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import com.digitalasset.ledger.api.refinements.{ApiTypes => lar}
import com.typesafe.scalalogging.StrictLogging
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._
import scalaz.\/
import scalaz.{NonEmptyList, \/}

import scala.concurrent.{ExecutionContext, Future}
import EndpointsCompanion._

Expand Down Expand Up @@ -80,11 +81,10 @@ class WebsocketEndpoints(
upgradeReq <- req.header[UpgradeToWebSocket] \/> InvalidUserInput(
s"Cannot upgrade client's connection to websocket",
)
_ = logger.info(s"GOT $wsProtocol")
payload <- preconnect(decodeJwt, upgradeReq, wsProtocol)
(jwt, jwtPayload) = payload
} yield
handleWebsocketRequest[List[domain.EnrichedContractKey[domain.LfValue]]](
handleWebsocketRequest[NonEmptyList[domain.EnrichedContractKey[domain.LfValue]]](
jwt,
jwtPayload,
upgradeReq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ object SprayJson {
def decode[A: JsonReader](a: JsValue): JsonReaderError \/ A =
\/.fromTryCatchNonFatal(a.convertTo[A]).leftMap(e => JsonReaderError(a.toString, e.description))

def decode1[F[_], A](str: String)(
implicit ev1: JsonReader[F[JsValue]],
ev2: Traverse[F],
ev3: JsonReader[A]): JsonReaderError \/ F[A] =
for {
jsValue <- parse(str)
a <- decode1[F, A](jsValue)
} yield a

def decode1[F[_], A](a: JsValue)(
implicit ev1: JsonReader[F[JsValue]],
ev2: Traverse[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ class WebsocketServiceIntegrationTest
val result = Await.result(clientMsg, 10.seconds)

result should have size 1
result.head should include("error")
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}

"fetch endpoint should send error msg when receiving malformed message" in withHttpService {
Expand All @@ -228,7 +230,9 @@ class WebsocketServiceIntegrationTest
val result = Await.result(clientMsg, 10.seconds)

result should have size 1
result.head should include("""{"error":""")
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}

// NB SC #3936: the WS connection below terminates at an appropriate time for
Expand All @@ -254,6 +258,7 @@ class WebsocketServiceIntegrationTest
import spray.json._

val initialCreate = initialIouCreate(uri)

def exercisePayload(cid: String) =
baseExercisePayload.copy(
fields = baseExercisePayload.fields updated ("contractId", JsString(cid)))
Expand Down Expand Up @@ -422,34 +427,21 @@ class WebsocketServiceIntegrationTest
}
}

"fetch should receive all contracts when empty request specified" in withHttpService {
(uri, encoder, _) =>
val f1 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri)
val f2 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "def456"), encoder, uri)

for {
r1 <- f1
_ = r1._1 shouldBe 'success
cid1 = getContractId(getResult(r1._2))

r2 <- f2
_ = r2._1 shouldBe 'success
cid2 = getContractId(getResult(r2._2))

clientMsgs <- singleClientFetchStream(uri, "[]").runWith(
collectResultsAsTextMessageSkipHeartbeats)
} yield {
inside(clientMsgs) {
case Seq(errorMsg) =>
// TODO(Leo) #4417: expected behavior is to return all active contracts (???). Make sure it is consistent with stream/query
// c1 should include(s""""contractId":"${cid1.unwrap: String}"""")
// c2 should include(s""""contractId":"${cid2.unwrap: String}"""")
errorMsg should include(
s""""error":"Cannot resolve any templateId from request: List()""")
}
}
"fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService {
(uri, _, _) =>
singleClientFetchStream(uri, "[]")
.runWith(collectResultsAsTextMessageSkipHeartbeats)
.map { clientMsgs =>
inside(clientMsgs) {
case Seq(errorMsg) =>
val errorResponse = decodeErrorResponse(errorMsg)
errorResponse.status shouldBe StatusCodes.BadRequest
inside(errorResponse.errors) {
case List(error) =>
error should include("must be a list with at least 1 element")
}
}
}: Future[Assertion]
}

private def wsConnectRequest[M](
Expand Down Expand Up @@ -509,6 +501,13 @@ class WebsocketServiceIntegrationTest
isEmpty && hasOffset
}
.valueOr(_ => false)

private def decodeErrorResponse(str: String): domain.ErrorResponse[List[String]] = {
import json.JsonProtocol._
inside(SprayJson.decode1[domain.ErrorResponse, List[String]](str)) {
case \/-(e) => e
}
}
}

object WebsocketServiceIntegrationTest {
Expand Down