Skip to content

Commit

Permalink
Perf test: WebSockets (pekko-http) (#3526)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Feb 21, 2024
1 parent 8ef0b68 commit 2be43a1
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 41 deletions.
9 changes: 7 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ lazy val tests: ProjectMatrix = (projectMatrix in file("tests"))
)
.dependsOn(core, files, circeJson, cats)

lazy val perfServerJavaOptions = List(
"-Xms16g",
"-Xmx16g",
"-XX:+AlwaysPreTouch"
)
lazy val flightRecordingJavaOpts = "-XX:StartFlightRecording=filename=recording.jfr,dumponexit=true,duration=120s"

lazy val perfTests: ProjectMatrix = (projectMatrix in file("perf-tests"))
Expand Down Expand Up @@ -530,8 +535,8 @@ lazy val perfTests: ProjectMatrix = (projectMatrix in file("perf-tests"))
.settings(
fork := true,
connectInput := true,
Compile / run / javaOptions += flightRecordingJavaOpts,
Test / run / javaOptions -= flightRecordingJavaOpts
Compile / run / javaOptions ++= flightRecordingJavaOpts :: perfServerJavaOptions,
Test / run / javaOptions --= flightRecordingJavaOpts :: perfServerJavaOptions
)
.jvmPlatform(scalaVersions = List(scala2_13))
.dependsOn(core, pekkoHttpServer, http4sServer, nettyServer, nettyServerCats, playServer, vertxServer, vertxServerCats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ trait Endpoints {
)
}

val wsBaseEndpoint = endpoint.get.in("ws" / "ts")

def genServerEndpoints[F[_]](routeCount: Int)(reply: String => F[String]): List[ServerEndpoint[Any, F]] =
serverEndpoints[F](reply).flatMap(gen => (0 to routeCount).map(i => gen(i)))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sttp.tapir.perf.apis

import cats.effect.{ExitCode, IO, IOApp}
import sttp.tapir.perf.Common._

import scala.reflect.runtime.universe

Expand Down
7 changes: 3 additions & 4 deletions perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import sttp.tapir.integ.cats.effect.CatsMonadError
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis._
import sttp.tapir.server.http4s.{Http4sServerInterpreter, Http4sServerOptions}
import sttp.tapir.{CodecFormat, endpoint, webSocketBody}
import sttp.tapir.{CodecFormat, webSocketBody}

object Http4sCommon {
// Websocket response is returned with a lag, so that we can have more concurrent users talking to the server.
Expand Down Expand Up @@ -74,8 +74,7 @@ object Tapir extends Endpoints {

implicit val mErr: MonadError[IO] = new CatsMonadError[IO]

private val wsEndpoint = endpoint.get
.in("ts")
private val wsEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](Fs2Streams[IO])
.concatenateFragmentedFrames(false)
Expand All @@ -95,7 +94,7 @@ object Tapir extends Endpoints {

def wsApp(withServerLog: Boolean = false): WebSocketBuilder2[IO] => HttpApp[IO] = { wsb =>
val serverOptions = buildOptions(Http4sServerOptions.customiseInterceptors[IO], withServerLog)
Router("/ws" -> {
Router("/" -> {
Http4sServerInterpreter[IO](serverOptions)
.toWebSocketRoutes(
wsEndpoint.serverLogicSuccess(_ =>
Expand Down
104 changes: 70 additions & 34 deletions perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,100 @@ import cats.effect.IO
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.HttpEntity
import org.apache.pekko.http.scaladsl.model.ws.{Message, TextMessage}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.stream.scaladsl.FileIO
import org.apache.pekko.stream.scaladsl.{FileIO, Flow, Sink, Source}
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.perf.Common._
import sttp.tapir.perf.apis._
import sttp.tapir.server.pekkohttp.{PekkoHttpServerInterpreter, PekkoHttpServerOptions}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}

object PekkoCommon {
// Define a source that emits the current timestamp every 100 milliseconds
// We have to .take exact number of items, which will guarantee returning a proper final frame. Otherwise the stream would never end,
// causing Gatling to hang for long and then fail.
val timestampSource = Source.tick(0.seconds, 100.milliseconds, ()).take(WebSocketRequestsPerUser.toLong).map { _ =>
System.currentTimeMillis()
}
}

object Vanilla {
val wsRoute: Route = path("ws" / "ts") {
handleWebSocketMessages(timeStampWebSocketFlow.map(ts => TextMessage(ts.toString)))
}

def timeStampWebSocketFlow: Flow[Message, Long, Any] = {
// Incoming messages are ignored, but we need to define a sink for them
val sink = Flow[Message].to(Sink.ignore)
Flow.fromSinkAndSource(sink, PekkoCommon.timestampSource)
}

val router: Int => ActorSystem => Route = (nRoutes: Int) =>
(_: ActorSystem) =>
concat(
(0 to nRoutes).flatMap { (n: Int) =>
List(
get {
path(("path" + n.toString) / IntNumber) { id =>
complete((n + id).toString)
}
},
post {
path(("path" + n.toString)) {
entity(as[String]) { _ =>
complete((n).toString)
wsRoute ::
(0 to nRoutes).flatMap { (n: Int) =>
List(
get {
path(("path" + n.toString) / IntNumber) { id =>
complete((n + id).toString)
}
}
},
post {
path(("pathBytes" + n.toString)) {
entity(as[Array[Byte]]) { bytes =>
complete(s"Received ${bytes.length} bytes")
},
post {
path(("path" + n.toString)) {
entity(as[String]) { _ =>
complete((n).toString)
}
}
}
},
post {
path(("pathFile" + n.toString)) {
extractRequestContext { ctx =>
entity(as[HttpEntity]) { httpEntity =>
val path = newTempFilePath()
val sink = FileIO.toPath(path)
val finishedWriting = httpEntity.dataBytes.runWith(sink)(ctx.materializer)
onSuccess(finishedWriting) { _ =>
complete(s"File saved to $path")
},
post {
path(("pathBytes" + n.toString)) {
entity(as[Array[Byte]]) { bytes =>
complete(s"Received ${bytes.length} bytes")
}
}
},
post {
path(("pathFile" + n.toString)) {
extractRequestContext { ctx =>
entity(as[HttpEntity]) { httpEntity =>
val path = newTempFilePath()
val sink = FileIO.toPath(path)
val finishedWriting = httpEntity.dataBytes.runWith(sink)(ctx.materializer)
onSuccess(finishedWriting) { _ =>
complete(s"File saved to $path")
}
}
}
}
}
}
)
}: _*
)
}.toList: _*
)
}

object Tapir extends Endpoints {
import sttp.tapir._
val wsSink = Flow[Long].to(Sink.ignore)
val wsEndpoint = wsBaseEndpoint
.out(
webSocketBody[Long, CodecFormat.TextPlain, Long, CodecFormat.TextPlain](PekkoStreams)
.concatenateFragmentedFrames(false)
)
val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Future] { _ =>
Future.successful {
Flow.fromSinkAndSource(wsSink, PekkoCommon.timestampSource)
}
}

def router(nRoutes: Int, withServerLog: Boolean = false): ActorSystem => Route = { (actorSystem: ActorSystem) =>
val serverOptions = buildOptions(PekkoHttpServerOptions.customiseInterceptors(ExecutionContext.Implicits.global), withServerLog)
PekkoHttpServerInterpreter(serverOptions)(actorSystem.dispatcher).toRoute(
genEndpointsFuture(nRoutes)
wsServerEndpoint :: genEndpointsFuture(nRoutes)
)
}
}
Expand Down

0 comments on commit 2be43a1

Please sign in to comment.