Skip to content

Commit

Permalink
2nd part of stabilizing flaky websocket integration test (digital-ass…
Browse files Browse the repository at this point in the history
…et#5079)

* 2nd part of stabilizing flaky websocket integration test,

handling empty events, which we may receive a bunch in the end of the test
when there is no activity

changelog_begin
changelog_end

* fixing `Offset.semigroup`, thanks @S11001001!
  • Loading branch information
leo-da authored Mar 18, 2020
1 parent 3cbba8a commit 29c340f
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 34 deletions.
1 change: 0 additions & 1 deletion ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ da_scala_test(
":Account.dar",
"//docs:quickstart-model.dar",
],
flaky = True,
plugins = [
"@maven//:org_spire_math_kind_projector_2_12",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.digitalasset.daml.lf
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.iface
import com.digitalasset.http.util.ClientUtil.boxedRecord
import com.digitalasset.http.util.LedgerOffsetUtil
import com.digitalasset.ledger.api.refinements.{ApiTypes => lar}
import com.digitalasset.ledger.api.{v1 => lav1}
import scalaz.Isomorphism.{<~>, IsoFunctorTemplate}
Expand All @@ -19,7 +20,20 @@ import scalaz.std.vector._
import scalaz.syntax.show._
import scalaz.syntax.std.option._
import scalaz.syntax.traverse._
import scalaz.{-\/, @@, Applicative, Bitraverse, NonEmptyList, Show, Tag, Traverse, \/, \/-}
import scalaz.{
-\/,
@@,
Applicative,
Bitraverse,
NonEmptyList,
Semigroup,
Show,
Tag,
Tags,
Traverse,
\/,
\/-
}
import spray.json.JsValue

import scala.annotation.tailrec
Expand Down Expand Up @@ -144,6 +158,12 @@ object domain {

def toLedgerApi(o: Offset): lav1.ledger_offset.LedgerOffset =
lav1.ledger_offset.LedgerOffset(lav1.ledger_offset.LedgerOffset.Value.Absolute(unwrap(o)))

implicit val ordering: Ordering[Offset] =
Ordering.by(LedgerOffsetUtil.parseOffsetString _ compose unwrap)(
LedgerOffsetUtil.LongEitherLongLongOrdering)

implicit val semigroup: Semigroup[Offset] = Tag.unsubst(Semigroup[Offset @@ Tags.LastVal])
}

final case class StartingOffset(offset: Offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scalaz.{-\/, \/, \/-}

object LedgerOffsetUtil {

private val LongEitherLongLongOrdering: Ordering[Long \/ (Long, Long)] = {
private[http] val LongEitherLongLongOrdering: Ordering[Long \/ (Long, Long)] = {
import scalaz.std.tuple._
import scalaz.std.anyVal._
scalaz.Order[Long \/ (Long, Long)].toScalaOrdering
Expand All @@ -17,8 +17,11 @@ object LedgerOffsetUtil {
implicit val AbsoluteOffsetOrdering: Ordering[LedgerOffset.Value.Absolute] =
Ordering.by(parseOffset)(LongEitherLongLongOrdering)

private def parseOffset(a: LedgerOffset.Value.Absolute): Long \/ (Long, Long) = {
val offset: String = a.value
private def parseOffset(offset: LedgerOffset.Value.Absolute): Long \/ (Long, Long) = {
parseOffsetString(offset.value)
}

private[http] def parseOffsetString(offset: String): Long \/ (Long, Long) = {
offset.split('-') match {
case Array(_, a2, a3) =>
\/-((a2.toLong, a3.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,25 +267,27 @@ class WebsocketServiceIntegrationTest
def resp(iouCid: domain.ContractId): Sink[JsValue, Future[StreamState]] =
Sink
.foldAsync(NothingYet: StreamState) {
case (NothingYet, ContractDelta(Vector((ctid, ct)), Vector())) =>
case (NothingYet, ContractDelta(Vector((ctid, _)), Vector(), None)) =>
(ctid: String) shouldBe (iouCid.unwrap: String)
TestUtil.postJsonRequest(
uri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(ctid),
headersWithAuth) map {
case (statusCode, respBody) =>
case (statusCode, _) =>
statusCode.isSuccess shouldBe true
GotAcs(ctid)
}

case (GotAcs(ctid), Offset(JsString(off), Events(JsArray(Vector()), _))) =>
Future.successful(GotLive(domain.Offset(off), ctid))
case (GotAcs(ctid), ContractDelta(Vector(), _, Some(offset))) =>
Future.successful(GotLive(offset, ctid))

case (
GotLive(preOffset, consumedCtid),
evtsWrapper @ ContractDelta(
Vector((fstId, fst), (sndId, snd)),
Vector(observeConsumed))) =>
Vector(observeConsumed),
Some(lastSeenOffset)
)) =>
Future {
observeConsumed.contractId should ===(consumedCtid)
Set(fstId, sndId, consumedCtid) should have size 3
Expand All @@ -305,7 +307,20 @@ class WebsocketServiceIntegrationTest
))
}
}
ShouldHaveEnded(preOffset, 2)
ShouldHaveEnded(preOffset, 2, lastSeenOffset)
}

case (
ShouldHaveEnded(liveStartOffset, msgCount, lastSeenOffset),
ContractDelta(Vector(), Vector(), Some(currentOffset))
) =>
Future {
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = if (lastSeenOffset == currentOffset) msgCount else msgCount + 1,
lastSeenOffset = currentOffset
)
}
}

Expand All @@ -315,20 +330,22 @@ class WebsocketServiceIntegrationTest
iouCid = getContractId(getResult(creation._2))
lastState <- singleClientQueryStream(uri, query) via parseResp runWith resp(iouCid)
liveOffset = inside(lastState) {
case ShouldHaveEnded(off, 2) => off
case ShouldHaveEnded(liveStart, 2, lastSeen) =>
import domain.Offset.ordering
lastSeen should be > liveStart
liveStart
}
rescan <- (singleClientQueryStream(uri, query, Some(liveOffset))
via parseResp runWith remainingDeltas)
} yield
inside(rescan) {
case (Vector((fstId, fst), (sndId, snd)), Vector(observeConsumed)) =>
case (Vector((fstId, fst), (sndId, snd)), Vector(observeConsumed), Some(_)) =>
Set(fstId, sndId, observeConsumed.contractId) should have size 3
}
}

"fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService {
(uri, encoder, _) =>
import spray.json.{JsArray, JsString}
val templateId = domain.TemplateId(None, "Account", "Account")
val fetchRequest = """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]"""
val f1 =
Expand All @@ -343,7 +360,7 @@ class WebsocketServiceIntegrationTest

case (
NothingYet,
ContractDelta(Vector((cid, c)), Vector())
ContractDelta(Vector((cid, c)), Vector(), None)
) =>
(cid: String) shouldBe (cid1.unwrap: String)
postArchiveCommand(templateId, cid2, encoder, uri).flatMap {
Expand All @@ -356,17 +373,30 @@ class WebsocketServiceIntegrationTest
}
}: Future[StreamState]

case (GotAcs(ctid), Offset(JsString(off), Events(JsArray(Vector()), _))) =>
Future.successful(GotLive(domain.Offset(off), ctid))
case (GotAcs(ctid), ContractDelta(Vector(), _, Some(offset))) =>
Future.successful(GotLive(offset, ctid))

case (
GotLive(off, archivedCid),
ContractDelta(Vector(), Vector(observeArchivedCid))
ContractDelta(Vector(), Vector(observeArchivedCid), Some(lastSeenOffset))
) =>
Future {
(observeArchivedCid.contractId.unwrap: String) shouldBe (archivedCid: String)
(observeArchivedCid.contractId: domain.ContractId) shouldBe (cid1: domain.ContractId)
ShouldHaveEnded(off, 0)
ShouldHaveEnded(off, 0, lastSeenOffset)
}

case (
ShouldHaveEnded(liveStartOffset, msgCount, lastSeenOffset),
ContractDelta(Vector(), Vector(), Some(currentOffset))
) =>
Future {
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = if (lastSeenOffset == currentOffset) msgCount else msgCount + 1,
lastSeenOffset = currentOffset
)
}
}

Expand All @@ -383,7 +413,12 @@ class WebsocketServiceIntegrationTest
cid1,
cid2)

} yield inside(lastState) { case ShouldHaveEnded(_, 0) => 1 shouldBe 1 }
} yield
inside(lastState) {
case ShouldHaveEnded(liveStart, 0, lastSeen) =>
import domain.Offset.ordering
lastSeen should be > liveStart
}
}

"fetch should receive all contracts when empty request specified" in withHttpService {
Expand Down Expand Up @@ -436,12 +471,14 @@ class WebsocketServiceIntegrationTest
}

private val remainingDeltas: Sink[JsValue, Future[ContractDelta.T]] =
Sink.fold[ContractDelta.T, JsValue]((Vector.empty, Vector.empty)) { (acc, jsv) =>
import scalaz.std.tuple._, scalaz.std.vector._, scalaz.syntax.semigroup._
jsv match {
case ContractDelta(c, a) => acc |+| ((c, a))
case _ => acc
}
Sink.fold[ContractDelta.T, JsValue]((Vector.empty, Vector.empty, Option.empty[domain.Offset])) {
(acc, jsv) =>
import scalaz.std.tuple._, scalaz.std.vector._, scalaz.syntax.semigroup._
import domain.Offset.semigroup
jsv match {
case ContractDelta(c, a, o) => acc |+| ((c, a, o))
case _ => acc
}
}

private def assertHeartbeat(str: String): Assertion =
Expand Down Expand Up @@ -497,12 +534,15 @@ object WebsocketServiceIntegrationTest {
private final case class GotAcs(firstCid: String) extends StreamState
private final case class GotLive(liveStartOff: domain.Offset, firstCid: String)
extends StreamState
private final case class ShouldHaveEnded(liveStartOff: domain.Offset, msgCount: Int)
private final case class ShouldHaveEnded(
liveStartOffset: domain.Offset,
msgCount: Int,
lastSeenOffset: domain.Offset)
extends StreamState

private object ContractDelta {
private val tagKeys = Set("created", "archived", "error")
type T = (Vector[(String, JsValue)], Vector[domain.ArchivedContract])
type T = (Vector[(String, JsValue)], Vector[domain.ArchivedContract], Option[domain.Offset])

def unapply(
jsv: JsValue
Expand All @@ -516,14 +556,22 @@ object WebsocketServiceIntegrationTest {
creates = sets.getOrElse("created", Vector()) collect {
case (_, JsObject(fields)) => fields
}
} yield
(creates collect (Function unlift { add =>

createPairs = creates collect (Function unlift { add =>
(add get "contractId" collect { case JsString(v) => v }) tuple (add get "payload")
}), sets.getOrElse("archived", Vector()) collect {
}): Vector[(String, JsValue)]

archives = sets.getOrElse("archived", Vector()) collect {
case (_, adata) =>
import json.JsonProtocol.ArchivedContractFormat
adata.convertTo[domain.ArchivedContract]
})
}: Vector[domain.ArchivedContract]

offset = eventsWrapper
.get("offset")
.collect { case JsString(str) => domain.Offset(str) }: Option[domain.Offset]

} yield (createPairs, archives, offset)
}

private object IouAmount {
Expand All @@ -548,8 +596,6 @@ object WebsocketServiceIntegrationTest {
jsv.fields get label map ((_, JsObject(jsv.fields - label)))
}

private object Events extends JsoField("events")
private object Offset extends JsoField("offset")
private object Created extends JsoField("created")
private object Archived extends JsoField("archived")
private object MatchedQueries extends JsoField("matchedQueries")
Expand Down

0 comments on commit 29c340f

Please sign in to comment.