Skip to content

Commit

Permalink
Put initialization failures through retries too (digital-asset#6230)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
shayne-fletcher authored Jun 4, 2020
1 parent ae463b6 commit e23a488
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,8 @@ object Server {
case Right(()) => Behaviors.same
}
case TriggerInitializationFailure(runningTrigger, cause) =>
// The trigger has failed to start. Send the runner a stop
// message. There's no point in it remaining alive since
// its child actor is stopped and won't be restarted.
// The trigger has failed to start.
server.logTriggerStatus(runningTrigger, "stopped: initialization failure")
runningTrigger.runner ! TriggerRunner.Stop
// No need to update the running triggers tables since
// this trigger never made it there.
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,12 @@ class TriggerRunner(

import TriggerRunner.{Message, Stop}

// Spawn a trigger runner impl. Supervise it. If it fails to start
// its runner, send it a stop message (the server will later send us
// a stop message in due course in this case so this actor will get
// garbage collected too). If it something bad happens when the
// trigger is running, try to restart it up to 3 times.
// Spawn a trigger runner impl. Supervise it.
private val child =
ctx.spawn(
Behaviors
.supervise(
Behaviors
.supervise(TriggerRunnerImpl(ctx.self, config))
.onFailure[InitializationException](stop))
.onFailure[RuntimeException](
.supervise(TriggerRunnerImpl(ctx.self, config))
.onFailure(
restart.withLimit(config.maxFailureNumberOfRetries, config.failureRetryTimeRange)),
name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.daml.ledger.api.refinements.ApiTypes.Party
import io.grpc.netty.NettyChannelBuilder

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scalaz.syntax.tag._
import com.daml.lf.CompiledPackages
Expand Down Expand Up @@ -137,9 +138,10 @@ object TriggerRunnerImpl {
// Report the failure to the server.
config.server ! TriggerInitializationFailure(runningTrigger, cause.toString)
// Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to this by
// writing the exception to the log and stopping this
// actor.
// monitor's supervisor strategy will respond to
// this by writing the exception to the log and
// attempting to restart this actor up to some
// number of times.
throw new InitializationException("Couldn't start: " + cause.toString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,24 +400,11 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
// Confirm that the running trigger ends up stopped and that
// its history matches our expectations.
_ <- assertTriggerIds(uri, "Alice", _.isEmpty)
_ <- assertTriggerStatus(
uri,
aliceTrigger,
_ ==
Vector(
"starting",
"running",
"stopped: runtime failure",
"starting",
"stopped: initialization failure"))
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "stopped: initialization failure")
} yield succeed
}

// TODO(SF, 2020-06-05): This test is temporarily disabled as too
// fragile. There is a race condition on the trigger restart and the
// network being unavailable (see
// https://dev.azure.com/digitalasset/adadc18a-d7df-446a-aacb-86042c1619c6/_apis/build/builds/45230/logs/124).
ignore should "restart a failing trigger if possible" in withHttpService(Some(dar)) {
it should "restart a trigger failing due to a dropped connection" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Simulate the ledger briefly being unavailable due to network
// connectivity loss. Our restart strategy means that the running
Expand All @@ -428,118 +415,101 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
aliceTrigger <- parseTriggerId(resp)
// Proceed when it is confirmed to be running.
_ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger))
// Simulate brief network connectivity loss. This will cause the
// running trigger's flow graph to complete with failure. Don't
// wait around to restore the network or the restart strategy
// will in turn lead to the stop strategy killing the trigger
// due to the lack of ability to initialize the restarted
// trigger.
// Simulate brief network connectivity loss.
_ <- Future { ledgerProxy.disable() }
_ <- Future { ledgerProxy.enable() }
// To conclude, check that the trigger survived the network
// outage and that its history indicates it went through a
// restart to do so.
// Check that the trigger survived the outage and that its
// history shows it went through a restart.
_ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger))
_ <- assertTriggerStatus(
uri,
aliceTrigger,
_ ==
Vector(
"starting",
"running",
"stopped: runtime failure",
"starting",
"running"
))
triggerStatus => {
triggerStatus.count(_ == "stopped: runtime failure") == 1 &&
triggerStatus.last == "running"
}
)
} yield succeed
}

it should "stop a trigger when the user script fails init" in withHttpService(Some(dar)) {
it should "restart triggers with script init errors" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
resp <- startTrigger(uri, s"$testPkgId:ErrorTrigger:trigger", "Alice")
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerStatus(
uri,
aliceTrigger,
_ ==
Vector(
"starting",
"stopped: initialization failure",
))
triggerStatus => {
triggerStatus.count(_ == "starting") ==
ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
triggerStatus.last == "stopped: initialization failure"
}
)
_ <- assertTriggerIds(uri, "Alice", _.isEmpty)
} yield succeed
}

it should "restart triggers with errors in user script" in withHttpService(Some(dar)) {
it should "restart triggers with script update errors" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
resp <- startTrigger(uri, s"$testPkgId:LowLevelErrorTrigger:trigger", "Alice")
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerStatus(
uri,
aliceTrigger,
_ ==
Vector(
"starting",
"running",
"stopped: runtime failure",
"starting",
"running",
"stopped: runtime failure",
"starting",
"running",
"stopped: runtime failure",
"starting",
"running",
"stopped: runtime failure"
)
triggerStatus => {
triggerStatus
.count(_ == "running") == ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
triggerStatus.last == "stopped: runtime failure"
}
)
_ <- assertTriggerIds(uri, "Alice", _.isEmpty)
} yield succeed
}

it should "stopping a trigger without providing a token should be unauthorized" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(422)))
_ <- fields.get("errors") should equal(
Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token"))))
} yield succeed
}

it should "stopping a trigger that can't parse as a UUID gives a 404 response" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "No More Mr Nice Guy"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
} yield succeed
}

it should "stopping an unknown trigger gives an error response" in withHttpService(None) {
it should "not act on a stop request without a token" in withHttpService(None) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- stopTrigger(uri, uuid, "Alice")
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
resp <- Http().singleRequest(req)
body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(404)))
_ <- fields.get("status") should equal(Some(JsNumber(422)))
_ <- fields.get("errors") should equal(
Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'"))))
Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token"))))
} yield succeed
}

it should "give a 404 response for a stop request with unparseable UUID" in withHttpService(None) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "No More Mr Nice Guy"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
} yield succeed
}

it should "give a 404 error response for a stop request on an unknown UUID" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
for {
resp <- stopTrigger(uri, uuid, "Alice")
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(404)))
_ <- fields.get("errors") should equal(
Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'"))))
} yield succeed
}

}

0 comments on commit e23a488

Please sign in to comment.