Skip to content

Commit

Permalink
Upgrade akka and akka-http (digital-asset#8048)
Browse files Browse the repository at this point in the history
* Upgrade akka and akka-http

Was chasing an issue somewhere and thought this might affect it in
some way. It didn’t but I might as well turn the upgrade into a PR.

changelog_begin
changelog_end

* Fix trigger service tests

changelog_begin
changelog_end

* Downgrade akka-http again

changelog_begin
changelog_end

* Upgrade akka-http again and fix tests

changelog_begin
changelog_end

* Cleanup trigger service

changelog_begin
changelog_end
  • Loading branch information
cocreature authored Nov 25, 2020
1 parent 6664794 commit d83cbdb
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 420 deletions.
18 changes: 9 additions & 9 deletions bazel-java-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ def install_java_deps():
"com.squareup:javapoet:1.11.1",
"com.storm-enroute:scalameter_2.12:0.10.1",
"com.storm-enroute:scalameter-core_2.12:0.10.1",
"com.typesafe.akka:akka-actor_2.12:2.6.1",
"com.typesafe.akka:akka-actor-typed_2.12:2.6.1",
"com.typesafe.akka:akka-http_2.12:10.1.11",
"com.typesafe.akka:akka-http-spray-json_2.12:10.1.11",
"com.typesafe.akka:akka-http-testkit_2.12:10.1.11",
"com.typesafe.akka:akka-slf4j_2.12:2.6.1",
"com.typesafe.akka:akka-stream_2.12:2.6.1",
"com.typesafe.akka:akka-stream-testkit_2.12:2.6.1",
"com.typesafe.akka:akka-testkit_2.12:2.6.1",
"com.typesafe.akka:akka-actor_2.12:2.6.10",
"com.typesafe.akka:akka-actor-typed_2.12:2.6.10",
"com.typesafe.akka:akka-http_2.12:10.1.13",
"com.typesafe.akka:akka-http-spray-json_2.12:10.1.13",
"com.typesafe.akka:akka-http-testkit_2.12:10.1.13",
"com.typesafe.akka:akka-slf4j_2.12:2.6.10",
"com.typesafe.akka:akka-stream_2.12:2.6.10",
"com.typesafe.akka:akka-stream-testkit_2.12:2.6.10",
"com.typesafe.akka:akka-testkit_2.12:2.6.10",
"com.typesafe.play:anorm_2.12:2.5.3",
"com.typesafe.play:anorm-akka_2.12:2.5.3",
"com.typesafe.scala-logging:scala-logging_2.12:3.9.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.daml.extractor

import akka.actor.ActorSystem
import akka.stream.scaladsl.{RestartSource, Sink}
import akka.stream.{KillSwitches, Materializer}
import akka.stream.{KillSwitches, Materializer, RestartSettings}
import com.daml.auth.TokenHolder
import com.daml.extractor.Types._
import com.daml.extractor.config.{ExtractorConfig, SnapshotEndSetting}
Expand Down Expand Up @@ -168,10 +168,11 @@ class Extractor[T](config: ExtractorConfig, target: T)(

RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
RestartSettings(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
)) { () =>
tokenHolder.foreach(_.refresh())
logger.info(s"Starting streaming transactions from ${startOffSet}...")
client.transactionClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,24 @@ private[http] object WebsocketTestFixture extends StrictLogging with Assertions
jwt: Jwt,
serviceUri: Uri,
query: String,
offset: Option[domain.Offset] = None,
keepOpen: Boolean = false)(implicit asys: ActorSystem): Source[Message, NotUsed] =
singleClientWSStream(jwt, "query", serviceUri, query, offset, keepOpen)
offset: Option[domain.Offset] = None)(implicit asys: ActorSystem): Source[Message, NotUsed] =
singleClientWSStream(jwt, "query", serviceUri, query, offset)

def singleClientFetchStream(
jwt: Jwt,
serviceUri: Uri,
request: String,
offset: Option[domain.Offset] = None,
keepOpen: Boolean = false)(implicit asys: ActorSystem): Source[Message, NotUsed] =
singleClientWSStream(jwt, "fetch", serviceUri, request, offset, keepOpen)
offset: Option[domain.Offset] = None
)(implicit asys: ActorSystem): Source[Message, NotUsed] =
singleClientWSStream(jwt, "fetch", serviceUri, request, offset)

def singleClientWSStream(
jwt: Jwt,
path: String,
serviceUri: Uri,
query: String,
offset: Option[domain.Offset],
keepOpen: Boolean = false)(implicit asys: ActorSystem): Source[Message, NotUsed] = {
offset: Option[domain.Offset]
)(implicit asys: ActorSystem): Source[Message, NotUsed] = {

import spray.json._, json.JsonProtocol._
val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path(s"/v1/stream/$path"))
Expand All @@ -209,7 +208,9 @@ private[http] object WebsocketTestFixture extends StrictLogging with Assertions
Seq(Map("offset" -> off.unwrap).toJson.compactPrint, query).iterator),
Source single query)
.map(TextMessage(_))
.concatMat(if (keepOpen) Source.maybe[Message] else Source.empty)(Keep.left)
// akka-http will cancel the whole stream once the input ends so we use
// Source.maybe to keep the input open.
.concatMat(Source.maybe[Message])(Keep.left)
.via(webSocketFlow)
}

Expand Down
19 changes: 11 additions & 8 deletions ledger-service/http-json/src/failure/scala/http/FailureTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package com.daml.http

import akka.http.javadsl.model.ws.PeerClosedConnectionException
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.scaladsl.Sink
import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink}

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -102,7 +103,6 @@ final class FailureTests
body.compactPrint,
headersWithParties(List(p.unwrap)))
_ = status shouldBe StatusCodes.ServiceUnavailable
_ = println(output.toString)
_ = output shouldBe "The server was not able to produce a timely response to your request.\r\nPlease try again in a short while!"
_ = proxy.toxics().get("timeout").remove()
(status, _) <- postCreateCommand(
Expand All @@ -118,7 +118,6 @@ final class FailureTests
body.compactPrint,
headersWithParties(List(p.unwrap)))
_ = status shouldBe StatusCodes.ServiceUnavailable
_ = println(output.toString)
_ = output shouldBe "The server was not able to produce a timely response to your request.\r\nPlease try again in a short while!"
} yield succeed
}
Expand Down Expand Up @@ -298,14 +297,16 @@ final class FailureTests

def respAfter(
offset: domain.Offset,
accountCid: domain.ContractId): Sink[JsValue, Future[Unit]] = {
accountCid: domain.ContractId,
stop: UniqueKillSwitch): Sink[JsValue, Future[Unit]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
ContractDelta(Vector((ctId, _)), Vector(), Some(newOffset)) <- readOne
_ = ctId shouldBe accountCid.unwrap
_ = newOffset.unwrap should be > offset.unwrap
_ = stop.shutdown
_ <- drain
} yield ()
)
Expand All @@ -323,8 +324,8 @@ final class FailureTests
r <- (singleClientQueryStream(
jwtForParties(List(p.unwrap), List(), ledgerId().unwrap),
uri,
query,
keepOpen = true) via parseResp runWith respBefore(cid)).transform(x => Success(x))
query
) via parseResp runWith respBefore(cid)).transform(x => Success(x))
_ = inside(r) {
case Failure(e: PeerClosedConnectionException) =>
e.closeCode shouldBe 1011
Expand All @@ -339,11 +340,13 @@ final class FailureTests
headers = headersWithParties(List(p.unwrap)))
cid = getContractId(getResult(r))
_ = status shouldBe 'success
_ <- singleClientQueryStream(
(stop, source) = singleClientQueryStream(
jwtForParties(List(p.unwrap), List(), ledgerId().unwrap),
uri,
query,
Some(offset)) via parseResp runWith respAfter(offset, cid)
Some(offset)
).viaMat(KillSwitches.single)(Keep.right).preMaterialize()
_ <- source via parseResp runWith respAfter(offset, cid, stop)
} yield succeed

}
Expand Down
Loading

0 comments on commit d83cbdb

Please sign in to comment.