-
Notifications
You must be signed in to change notification settings - Fork 205
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
slow down trigger on RESOURCE_EXHAUSTED failures (#7820)
* restate the submit stage as a Flow and derived Sink * take submit out of the trigger-to-submit flow * type for the failures produced directly by command submission * directly connect the msgSource failure queue to the submitter output * parens * slow down submission as we exceed max parallel submissions * restricting alterF so it will be usable with ConcurrentMap * disable buffer for the delay * split out the delay function * drafting a retry loop * degenerate test for retry loop, factoring the forAllFuture utility * map input to retrying properly * make retrying accessible to tests * test happy path and fix off-by-one * further tests for retrying * reveal that elements can get lost * more determinism in test * let failures block further elements from being attempted - Previously failures would go into a separate queue, where they awaited expiry of their delay and further initial upstream elements were given their first tries. However, closing the upstream could mean that queue was dropped, and detecting that situation is not trivial. So, instead, we don't use a separate queue. * plug retrying into the trigger submission flow * no changelog CHANGELOG_BEGIN CHANGELOG_END * remove throttle; pendingCommandIds may leak * report random parameter on failure * revert comment about throttling * explanation for fail in the error queue - suggested by @cocreature; thanks
- Loading branch information
Showing
5 changed files
with
232 additions
and
95 deletions.
There are no files selected for viewing
38 changes: 38 additions & 0 deletions
38
libs-scala/scalatest-utils/src/main/scala/scalatest/AsyncForAll.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package com.daml.scalatest | ||
|
||
import org.scalacheck.Gen.listOfN | ||
import org.scalacheck.Arbitrary | ||
import org.scalatest.Assertion | ||
import org.scalatest.Assertions.succeed | ||
import org.scalatest.exceptions.TestFailedException | ||
import scalaz.{Applicative, Monoid} | ||
import scalaz.std.list._ | ||
import scalaz.std.scalaFuture._ | ||
import scalaz.syntax.traverse._ | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
trait AsyncForAll { | ||
final def forAllAsync[A](trials: Int)(f: A => Future[Assertion])( | ||
implicit A: Arbitrary[A], | ||
ec: ExecutionContext): Future[Assertion] = { | ||
val runs = listOfN(trials, A.arbitrary).sample | ||
.getOrElse(sys error "random Gen failed") | ||
|
||
implicit val assertionMonoid: Monoid[Future[Assertion]] = | ||
Monoid liftMonoid (Applicative[Future], Monoid instance ((_, result) => result, succeed)) | ||
runs foldMap { a => | ||
f(a) recoverWith { | ||
case ae: TestFailedException => | ||
Future failed ae.modifyMessage(_ map { msg: String => | ||
msg + | ||
"\n Random parameters:" + | ||
s"\n arg0: $a" | ||
}) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.