Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API error handling/reporting improvements #5141

Merged
merged 7 commits into from
Mar 24, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Stopping the stream after emitting the 1st error
  • Loading branch information
leo-da committed Mar 24, 2020
commit b20bd4ecf91edaf83b9aabe4861b17afb39348a8
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.digitalasset.http

import akka.NotUsed
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Source}
import akka.http.scaladsl.model.ws.{Message, TextMessage, BinaryMessage}
import akka.stream.scaladsl.{Flow, Source, Sink}
import akka.stream.Materializer
import com.digitalasset.http.EndpointsCompanion._
import com.digitalasset.http.domain.{JwtPayload, SearchForeverRequest}
Expand Down Expand Up @@ -268,6 +268,7 @@ class WebSocketService(
extends LazyLogging {

import WebSocketService._
import Statement.discard
import util.ErrorOps._
import com.digitalasset.http.json.JsonProtocol._

Expand Down Expand Up @@ -319,9 +320,11 @@ class WebSocketService(
msg.toStrict(config.maxDuration).map { m =>
SprayJson.parse(m.text).liftErr(InvalidUserInput)
}
case _ =>
Future successful -\/(
InvalidUserInput("Cannot process your input, Expect a single JSON message"))
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
discard { bm.dataStream.runWith(Sink.ignore) }
Future successful -\/(InvalidUserInput(
"Invalid request. Expected a single TextMessage with JSON payload, got BinaryMessage"))
}
.via(withOptPrefix(ejv => ejv.toOption flatMap readStartingOffset))
.map {
Expand All @@ -337,6 +340,7 @@ class WebSocketService(
case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
}
}
.takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error
.flatMapConcat {
case \/-(s) => s
case -\/(e) => Source.single(wsErrorMessage(e))
Expand Down