Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API error handling/reporting improvements #5141

Merged
merged 7 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Streaming API error handling
  • Loading branch information
leo-da committed Mar 24, 2020
commit cce9bbc953f4f891d11ed8129ee26cff4b8b9b0e
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scalaz.{Liskov, NonEmptyList}
import Liskov.<~<
import com.digitalasset.http.query.ValuePredicate
import scalaz.syntax.bifunctor._
import scalaz.syntax.show._
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._
import scalaz.syntax.traverse._
Expand All @@ -30,7 +29,7 @@ import scalaz.std.option._
import scalaz.std.set._
import scalaz.std.tuple._
import scalaz.{-\/, \/, \/-}
import spray.json.{JsArray, JsObject, JsString, JsValue}
import spray.json.{JsArray, JsObject, JsValue}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -333,23 +332,28 @@ class WebSocketService(
a <- Q.parse(decoder, jv)
} yield (offPrefix, a)
}
.map {
_.flatMap {
case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
}
}
.flatMapConcat {
case \/-((offPrefix, a)) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
case -\/(e) => Source.single(wsErrorMessage(e.shows))
case \/-(s) => s
case -\/(e) => Source.single(wsErrorMessage(e))
}
}

private def getTransactionSourceForParty[A: StreamQuery](
jwt: Jwt,
jwtPayload: JwtPayload,
offPrefix: Option[domain.StartingOffset],
request: A): Source[Message, NotUsed] = {
request: A): Error \/ Source[Message, NotUsed] = {
val Q = implicitly[StreamQuery[A]]

val (resolved, unresolved, fn) = Q.predicate(request, resolveTemplateId, lookupType)

if (resolved.nonEmpty) {
contractsService
val source = contractsService
.insertDeleteStepSource(jwt, jwtPayload.party, resolved.toList, offPrefix, Terminates.Never)
.via(convertFilterContracts(fn))
.filter(_.nonEmpty)
Expand All @@ -358,11 +362,10 @@ class WebSocketService(
.via(renderEventsAndEmitHeartbeats) // wrong place, see https://github.com/digital-asset/daml/issues/4955
.prepend(reportUnresolvedTemplateIds(unresolved))
.map(jsv => TextMessage(jsv.compactPrint))
\/-(source)
} else {
Source.single(
wsErrorMessage(
s"Cannot resolve any templateId from request: ${request: A}, " +
s"unresolved templateIds: ${unresolved: Set[domain.TemplateId.OptionalPkg]}"))
-\/(InvalidUserInput(
s"Cannot resolve any of the requested template IDs: ${unresolved: Set[domain.TemplateId.OptionalPkg]}"))
}
}

Expand Down Expand Up @@ -412,10 +415,9 @@ class WebSocketService(
.collect { case (_, Some(x)) => x }
}

private[http] def wsErrorMessage(errorMsg: String): TextMessage.Strict =
TextMessage(
JsObject("error" -> JsString(errorMsg)).compactPrint
)
private[http] def wsErrorMessage(error: Error): TextMessage.Strict = {
TextMessage(errorResponse(error).toJson.compactPrint)
}

@SuppressWarnings(Array("org.wartremover.warts.Any"))
private def convertFilterContracts[Pos](fn: domain.ActiveContract[LfV] => Option[Pos])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ object SprayJson {
def decode[A: JsonReader](a: JsValue): JsonReaderError \/ A =
\/.fromTryCatchNonFatal(a.convertTo[A]).leftMap(e => JsonReaderError(a.toString, e.description))

def decode1[F[_], A](str: String)(
implicit ev1: JsonReader[F[JsValue]],
ev2: Traverse[F],
ev3: JsonReader[A]): JsonReaderError \/ F[A] =
for {
jsValue <- parse(str)
a <- decode1[F, A](jsValue)
} yield a

def decode1[F[_], A](a: JsValue)(
implicit ev1: JsonReader[F[JsValue]],
ev2: Traverse[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ class WebsocketServiceIntegrationTest
val result = Await.result(clientMsg, 10.seconds)

result should have size 1
result.head should include("error")
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}

"fetch endpoint should send error msg when receiving malformed message" in withHttpService {
Expand All @@ -228,7 +230,9 @@ class WebsocketServiceIntegrationTest
val result = Await.result(clientMsg, 10.seconds)

result should have size 1
result.head should include("""{"error":""")
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}

// NB SC #3936: the WS connection below terminates at an appropriate time for
Expand Down Expand Up @@ -446,8 +450,10 @@ class WebsocketServiceIntegrationTest
// TODO(Leo) #4417: expected behavior is to return all active contracts (???). Make sure it is consistent with stream/query
// c1 should include(s""""contractId":"${cid1.unwrap: String}"""")
// c2 should include(s""""contractId":"${cid2.unwrap: String}"""")
errorMsg should include(
s""""error":"Cannot resolve any templateId from request: List()""")
val errorResponse = decodeErrorResponse(errorMsg)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors shouldBe List(
"Cannot resolve any of the requested template IDs: Set()")
}
}
}
Expand Down Expand Up @@ -509,6 +515,13 @@ class WebsocketServiceIntegrationTest
isEmpty && hasOffset
}
.valueOr(_ => false)

private def decodeErrorResponse(str: String): domain.ErrorResponse[List[String]] = {
import json.JsonProtocol._
inside(SprayJson.decode1[domain.ErrorResponse, List[String]](str)) {
case \/-(e) => e
}
}
}

object WebsocketServiceIntegrationTest {
Expand Down