Skip to content

Commit

Permalink
slow down trigger on RESOURCE_EXHAUSTED failures (#7820)
Browse files Browse the repository at this point in the history
* restate the submit stage as a Flow and derived Sink

* take submit out of the trigger-to-submit flow

* type for the failures produced directly by command submission

* directly connect the msgSource failure queue to the submitter output

* parens

* slow down submission as we exceed max parallel submissions

* restricting alterF so it will be usable with ConcurrentMap

* disable buffer for the delay

* split out the delay function

* drafting a retry loop

* degenerate test for retry loop, factoring the forAllFuture utility

* map input to retrying properly

* make retrying accessible to tests

* test happy path and fix off-by-one

* further tests for retrying

* reveal that elements can get lost

* more determinism in test

* let failures block further elements from being attempted

- Previously failures would go into a separate queue, where they awaited expiry
  of their delay and further initial upstream elements were given their first
  tries.  However, closing the upstream could mean that queue was dropped, and
  detecting that situation is not trivial.  So, instead, we don't use a separate
  queue.

* plug retrying into the trigger submission flow

* no changelog

CHANGELOG_BEGIN
CHANGELOG_END

* remove throttle; pendingCommandIds may leak

* report random parameter on failure

* revert comment about throttling

* explanation for fail in the error queue

- suggested by @cocreature; thanks
  • Loading branch information
S11001001 authored Nov 10, 2020
1 parent 62cd53d commit b35c9fc
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.scalatest

import org.scalacheck.Gen.listOfN
import org.scalacheck.Arbitrary
import org.scalatest.Assertion
import org.scalatest.Assertions.succeed
import org.scalatest.exceptions.TestFailedException
import scalaz.{Applicative, Monoid}
import scalaz.std.list._
import scalaz.std.scalaFuture._
import scalaz.syntax.traverse._

import scala.concurrent.{ExecutionContext, Future}

trait AsyncForAll {
final def forAllAsync[A](trials: Int)(f: A => Future[Assertion])(
implicit A: Arbitrary[A],
ec: ExecutionContext): Future[Assertion] = {
val runs = listOfN(trials, A.arbitrary).sample
.getOrElse(sys error "random Gen failed")

implicit val assertionMonoid: Monoid[Future[Assertion]] =
Monoid liftMonoid (Applicative[Future], Monoid instance ((_, result) => result, succeed))
runs foldMap { a =>
f(a) recoverWith {
case ae: TestFailedException =>
Future failed ae.modifyMessage(_ map { msg: String =>
msg +
"\n Random parameters:" +
s"\n arg0: $a"
})
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scalaz.syntax.bifunctor._
import scalaz.syntax.functor._
import scalaz.syntax.tag._
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._

import scala.language.higherKinds

Expand Down Expand Up @@ -230,6 +231,15 @@ class Runner(
newMap.isDefined
}

import Runner.{
DamlFun,
SingleCommandFailure,
maxParallelSubmissionsPerTrigger,
maxTriesWhenOverloaded,
overloadedRetryDelay,
retrying,
}

@throws[RuntimeException]
private def handleCommands(commands: Seq[Command]): (UUID, SubmitRequest) = {
val commandUUID = UUID.randomUUID
Expand All @@ -246,8 +256,6 @@ class Runner(
(commandUUID, SubmitRequest(commands = Some(commandsArg)))
}

import Runner.{DamlFun, maxParallelSubmissionsPerTrigger}

private def freeTriggerSubmits(
clientTime: Timestamp,
v: SValue): UnfoldState[SValue, SubmitRequest] = {
Expand Down Expand Up @@ -289,33 +297,38 @@ class Runner(
UnfoldState(v)(go)
}

// This function produces a pair of a source of trigger messages
// and a function to call in the event of a command submission
// failure.
// This function produces a source of trigger messages,
// with input of failures from command submission
private def msgSource(
client: LedgerClient,
offset: LedgerOffset,
heartbeat: Option[FiniteDuration],
party: String,
filter: TransactionFilter)(implicit materializer: Materializer)
: (Source[TriggerMsg, NotUsed], (String, StatusRuntimeException) => Unit) = {

// A queue for command submission failures together with a source
// from which messages posted to the queue can be consumed.
val (completionQueue, completionQueueSource): (
SourceQueue[Completion],
Source[Completion, NotUsed]) =
Source
.queue[Completion](10 + maxParallelSubmissionsPerTrigger, OverflowStrategy.backpressure)
.preMaterialize()
// This function, given a command ID and a runtime exception,
// posts to the completion queue.
def postSubmitFailure(commandId: String, s: StatusRuntimeException) = {
val _ = completionQueue.offer(
Completion(
commandId,
Some(Status(s.getStatus().getCode().value(), s.getStatus().getDescription()))))
}
filter: TransactionFilter): Flow[SingleCommandFailure, TriggerMsg, NotUsed] = {

// A queue for command submission failures.
val submissionFailureQueue: Flow[SingleCommandFailure, Completion, NotUsed] =
Flow[SingleCommandFailure]
// 256 comes from the default ExecutionContext.
// Why `fail`? Consider the most obvious alternatives.
//
// `backpressure`? This feeds into the Free interpreter flow, which may produce
// many command failures for one event, hence deadlock.
//
// `drop*`? A trigger will proceed as if everything was fine, without ever
// getting the notification that its reaction to one contract and attempt to
// advance its workflow failed.
//
// `fail`, on the other hand? It far better fits the trigger model to go
// tabula rasa and notice the still-unhandled contract in the ACS again
// on init, as we expect triggers to be able to do anyhow.
.buffer(256 + maxParallelSubmissionsPerTrigger, OverflowStrategy.fail)
.map {
case SingleCommandFailure(commandId, s) =>
Completion(
commandId,
Some(Status(s.getStatus().getCode().value(), s.getStatus().getDescription())))
}

// The transaction source (ledger).
val transactionSource: Source[TriggerMsg, NotUsed] =
Expand All @@ -325,14 +338,15 @@ class Runner(

// Command completion source (ledger completion stream +
// synchronous sumbmission failures).
val completionSource: Source[TriggerMsg, NotUsed] =
client.commandClient
.completionSource(List(party), offset)
.mapConcat({
case CheckpointElement(_) => List()
case CompletionElement(c) => List(c)
})
.merge(completionQueueSource)
val completionSource: Flow[SingleCommandFailure, TriggerMsg, NotUsed] =
submissionFailureQueue
.merge(
client.commandClient
.completionSource(List(party), offset)
.mapConcat {
case CheckpointElement(_) => List()
case CompletionElement(c) => List(c)
})
.map(CompletionMsg)

// Hearbeats source (we produce these repetitvely on a timer with
Expand All @@ -345,9 +359,7 @@ class Runner(
case None => Source.empty[TriggerMsg]
}

val triggerMsgSource = transactionSource.merge(completionSource).merge(heartbeatSource)

(triggerMsgSource, postSubmitFailure)
completionSource merge transactionSource merge heartbeatSource
}

private def logReceivedMsg(tm: TriggerMsg): Unit = tm match {
Expand Down Expand Up @@ -399,15 +411,14 @@ class Runner(
case HeartbeatMsg() => converter.fromHeartbeat
}

// A sink for trigger messages representing a process for the
// A flow for trigger messages representing a process for the
// accumulated state changes resulting from application of the
// messages given the starting state represented by the ACS
// argument.
private def getTriggerSink(
private def getTriggerEvaluator(
name: String,
acs: Seq[CreatedEvent],
submit: SubmitRequest => Future[None.type],
): Sink[TriggerMsg, Future[SValue]] = {
): Flow[TriggerMsg, SubmitRequest, Future[SValue]] = {
logger.info(s"Trigger ${name} is running as ${party}")

val clientTime: Timestamp =
Expand Down Expand Up @@ -467,13 +478,13 @@ class Runner(
initialState.finalState ~> logInitialState ~> initialStateOut ~> rule.initState
initialState.elemsOut ~> submissions
msgIn ~> hideIrrelevantMsgs ~> encodeMsgs ~> rule.elemsIn
Sink.ignore <~ submitSubmissions(submit) <~ submissions <~ rule.elemsOut
submissions <~ rule.elemsOut
initialStateOut ~> finalStateIn
rule.finalStates ~> finalStateIn ~> saveLastState
// format: on
new SinkShape(msgIn.in)
new FlowShape(msgIn.in, submissions.out)
}
Sink fromGraph graph
Flow fromGraph graph
}

private[this] def hideIrrelevantMsgs: Flow[TriggerMsg, TriggerMsg, NotUsed] =
Expand All @@ -497,10 +508,6 @@ class Runner(
case x @ HeartbeatMsg() => List(x) // Hearbeats don't carry any information.
}

private[this] def submitSubmissions[SR, Z](submit: SR => Future[Z]): Flow[SR, Z, NotUsed] =
Flow[SR]
.mapAsync(maxParallelSubmissionsPerTrigger)(submit)

def makeApp(func: SExpr, values: Array[SValue]): SExpr = {
SEApp(func, values.map(SEValue(_)))
}
Expand All @@ -522,6 +529,34 @@ class Runner(
} yield (acsResponses.flatMap(x => x.activeContracts), offset)
}

private[this] def submitOrFail(
implicit ec: ExecutionContext): Flow[SubmitRequest, SingleCommandFailure, NotUsed] = {
def submit(req: SubmitRequest) = {
val f: Future[Empty] = client.commandClient
.submitSingleCommand(req)
f.map(_ => None).recover {
case s: StatusRuntimeException =>
Some(SingleCommandFailure(req.getCommands.commandId, s))
// any other error will cause the trigger's stream to fail
}
}
import io.grpc.Status.Code, Code.RESOURCE_EXHAUSTED
def retryableSubmit(req: SubmitRequest) =
submit(req).map {
case Some(SingleCommandFailure(_, s))
if (s.getStatus.getCode: Code) == (RESOURCE_EXHAUSTED: Code) =>
None
case z => Some(z)
}
retrying(
initialTries = maxTriesWhenOverloaded,
backoff = overloadedRetryDelay,
parallelism = maxParallelSubmissionsPerTrigger,
retryableSubmit,
submit)
.collect { case Some(err) => err }
}

// Run the trigger given the state of the ACS. The msgFlow argument
// passed from ServiceMain is a kill switch. Other choices are
// possible e.g. 'Flow[TriggerMsg].take()' and this fact is made use
Expand All @@ -533,24 +568,13 @@ class Runner(
name: String = "")(
implicit materializer: Materializer,
executionContext: ExecutionContext): (T, Future[SValue]) = {
val (source, postFailure) =
val source =
msgSource(client, offset, trigger.heartbeat, party, transactionFilter)
def submit(req: SubmitRequest): Future[None.type] = {
val f: Future[Empty] = client.commandClient
.submitSingleCommand(req)
f.map(_ => None).recover {
case s: StatusRuntimeException =>
// XXX It would be better to split the stream and feed failures back
// into the msgSource directly. As it stands we fire-and-forget
// the (async) queuing done here, ignoring failure
postFailure(req.getCommands.commandId, s)
None
// any other error will cause the trigger's stream to fail
}
}
source
.viaMat(msgFlow)(Keep.right[NotUsed, T])
.toMat(getTriggerSink(name, acs, submit))(Keep.both)
Flow
.fromGraph(msgFlow)
.viaMat(getTriggerEvaluator(name, acs))(Keep.both)
.via(submitOrFail)
.join(source)
.run()
}
}
Expand All @@ -563,6 +587,10 @@ object Runner extends StrictLogging {
* commands that may be pending.
*/
val maxParallelSubmissionsPerTrigger = 8
val maxTriesWhenOverloaded = 6

private def overloadedRetryDelay(afterTries: Int): FiniteDuration =
(250 * (1 << (afterTries - 1))).milliseconds

// Return the time provider for a given time provider type.
def getTimeProvider(ty: TimeProviderType): TimeProvider = {
Expand All @@ -577,7 +605,30 @@ object Runner extends StrictLogging {
def unapply(v: SPAP): Some[SPAP] = Some(v)
}

private def alterF[K, V, F[_]: Functor](m: Map[K, V], k: K)(
/** Like `CommandRetryFlow` but with no notion of ledger time, and with
* delay support. Note that only the future succeeding with `None`
* indicates that a retry should be attempted; a failed future propagates
* to the stream.
*/
private[trigger] def retrying[A, B](
initialTries: Int,
backoff: Int => FiniteDuration,
parallelism: Int,
retryable: A => Future[Option[B]],
notRetryable: A => Future[B])(implicit ec: ExecutionContext): Flow[A, B, NotUsed] = {
def trial(tries: Int, value: A): Future[B] =
if (tries <= 1) notRetryable(value)
else
retryable(value).flatMap(_ cata (Future.successful, {
Future {
try Thread.sleep(backoff(initialTries - tries + 1).toMillis)
catch { case _: InterruptedException => }
}.flatMap(_ => trial(tries - 1, value))
}))
Flow[A].mapAsync(parallelism)(trial(initialTries, _))
}

private def alterF[K, V, F[a] >: Option[a]: Functor](m: Map[K, V], k: K)(
f: Option[V] => F[Option[V]]): F[Map[K, V]] = {
val ov = m get k
f(ov) map {
Expand All @@ -589,6 +640,8 @@ object Runner extends StrictLogging {
}
}

private final case class SingleCommandFailure(commandId: String, s: StatusRuntimeException)

private sealed abstract class SeenMsgs {
import Runner.{SeenMsgs => S}
def see(msg: S.One): Option[SeenMsgs] =
Expand Down
2 changes: 2 additions & 0 deletions triggers/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ da_scala_library(
"FuncTestsWallClock",
"Jwt",
"Tls",
"RunnerSpec",
"UnfoldStateSpec",
]
],
Expand Down Expand Up @@ -155,6 +156,7 @@ da_scala_library(
"//ledger/test-common",
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/scalatest-utils",
"//triggers/runner:trigger-runner-lib",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_scalacheck_scalacheck_2_12",
Expand Down
Loading

0 comments on commit b35c9fc

Please sign in to comment.