Skip to content

Commit

Permalink
Stopping the stream after emitting the 1st error
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-da committed Mar 24, 2020
1 parent cce9bbc commit b20bd4e
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.digitalasset.http

import akka.NotUsed
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Source}
import akka.http.scaladsl.model.ws.{Message, TextMessage, BinaryMessage}
import akka.stream.scaladsl.{Flow, Source, Sink}
import akka.stream.Materializer
import com.digitalasset.http.EndpointsCompanion._
import com.digitalasset.http.domain.{JwtPayload, SearchForeverRequest}
Expand Down Expand Up @@ -268,6 +268,7 @@ class WebSocketService(
extends LazyLogging {

import WebSocketService._
import Statement.discard
import util.ErrorOps._
import com.digitalasset.http.json.JsonProtocol._

Expand Down Expand Up @@ -319,9 +320,11 @@ class WebSocketService(
msg.toStrict(config.maxDuration).map { m =>
SprayJson.parse(m.text).liftErr(InvalidUserInput)
}
case _ =>
Future successful -\/(
InvalidUserInput("Cannot process your input, Expect a single JSON message"))
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
discard { bm.dataStream.runWith(Sink.ignore) }
Future successful -\/(InvalidUserInput(
"Invalid request. Expected a single TextMessage with JSON payload, got BinaryMessage"))
}
.via(withOptPrefix(ejv => ejv.toOption flatMap readStartingOffset))
.map {
Expand All @@ -337,6 +340,7 @@ class WebSocketService(
case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
}
}
.takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error
.flatMapConcat {
case \/-(s) => s
case -\/(e) => Source.single(wsErrorMessage(e))
Expand Down

0 comments on commit b20bd4e

Please sign in to comment.