diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala index e04a508ad880..f00adb880237 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/ServiceMain.scala @@ -399,11 +399,8 @@ object Server { case Right(()) => Behaviors.same } case TriggerInitializationFailure(runningTrigger, cause) => - // The trigger has failed to start. Send the runner a stop - // message. There's no point in it remaining alive since - // its child actor is stopped and won't be restarted. + // The trigger has failed to start. server.logTriggerStatus(runningTrigger, "stopped: initialization failure") - runningTrigger.runner ! TriggerRunner.Stop // No need to update the running triggers tables since // this trigger never made it there. Behaviors.same diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunner.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunner.scala index 66ed2d621ee9..ed869b023d87 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunner.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunner.scala @@ -37,19 +37,12 @@ class TriggerRunner( import TriggerRunner.{Message, Stop} - // Spawn a trigger runner impl. Supervise it. If it fails to start - // its runner, send it a stop message (the server will later send us - // a stop message in due course in this case so this actor will get - // garbage collected too). If it something bad happens when the - // trigger is running, try to restart it up to 3 times. + // Spawn a trigger runner impl. Supervise it. private val child = ctx.spawn( Behaviors - .supervise( - Behaviors - .supervise(TriggerRunnerImpl(ctx.self, config)) - .onFailure[InitializationException](stop)) - .onFailure[RuntimeException]( + .supervise(TriggerRunnerImpl(ctx.self, config)) + .onFailure( restart.withLimit(config.maxFailureNumberOfRetries, config.failureRetryTimeRange)), name ) diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala index 5ddfc2c35614..514aa03dba80 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala @@ -10,6 +10,7 @@ import com.daml.ledger.api.refinements.ApiTypes.Party import io.grpc.netty.NettyChannelBuilder import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.util.{Failure, Success} import scalaz.syntax.tag._ import com.daml.lf.CompiledPackages @@ -137,9 +138,10 @@ object TriggerRunnerImpl { // Report the failure to the server. config.server ! TriggerInitializationFailure(runningTrigger, cause.toString) // Tell our monitor there's been a failure. The - // monitor's supervisor strategy will respond to this by - // writing the exception to the log and stopping this - // actor. + // monitor's supervisor strategy will respond to + // this by writing the exception to the log and + // attempting to restart this actor up to some + // number of times. throw new InitializationException("Couldn't start: " + cause.toString) } } diff --git a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala index 02928998db28..412bf7dadfa3 100644 --- a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala +++ b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/ServiceTest.scala @@ -400,24 +400,11 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg // Confirm that the running trigger ends up stopped and that // its history matches our expectations. _ <- assertTriggerIds(uri, "Alice", _.isEmpty) - _ <- assertTriggerStatus( - uri, - aliceTrigger, - _ == - Vector( - "starting", - "running", - "stopped: runtime failure", - "starting", - "stopped: initialization failure")) + _ <- assertTriggerStatus(uri, aliceTrigger, _.last == "stopped: initialization failure") } yield succeed } - // TODO(SF, 2020-06-05): This test is temporarily disabled as too - // fragile. There is a race condition on the trigger restart and the - // network being unavailable (see - // https://dev.azure.com/digitalasset/adadc18a-d7df-446a-aacb-86042c1619c6/_apis/build/builds/45230/logs/124). - ignore should "restart a failing trigger if possible" in withHttpService(Some(dar)) { + it should "restart a trigger failing due to a dropped connection" in withHttpService(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => // Simulate the ledger briefly being unavailable due to network // connectivity loss. Our restart strategy means that the running @@ -428,33 +415,24 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg aliceTrigger <- parseTriggerId(resp) // Proceed when it is confirmed to be running. _ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger)) - // Simulate brief network connectivity loss. This will cause the - // running trigger's flow graph to complete with failure. Don't - // wait around to restore the network or the restart strategy - // will in turn lead to the stop strategy killing the trigger - // due to the lack of ability to initialize the restarted - // trigger. + // Simulate brief network connectivity loss. _ <- Future { ledgerProxy.disable() } _ <- Future { ledgerProxy.enable() } - // To conclude, check that the trigger survived the network - // outage and that its history indicates it went through a - // restart to do so. + // Check that the trigger survived the outage and that its + // history shows it went through a restart. _ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger)) _ <- assertTriggerStatus( uri, aliceTrigger, - _ == - Vector( - "starting", - "running", - "stopped: runtime failure", - "starting", - "running" - )) + triggerStatus => { + triggerStatus.count(_ == "stopped: runtime failure") == 1 && + triggerStatus.last == "running" + } + ) } yield succeed } - it should "stop a trigger when the user script fails init" in withHttpService(Some(dar)) { + it should "restart triggers with script init errors" in withHttpService(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => for { resp <- startTrigger(uri, s"$testPkgId:ErrorTrigger:trigger", "Alice") @@ -462,15 +440,17 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg _ <- assertTriggerStatus( uri, aliceTrigger, - _ == - Vector( - "starting", - "stopped: initialization failure", - )) + triggerStatus => { + triggerStatus.count(_ == "starting") == + ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 && + triggerStatus.last == "stopped: initialization failure" + } + ) + _ <- assertTriggerIds(uri, "Alice", _.isEmpty) } yield succeed } - it should "restart triggers with errors in user script" in withHttpService(Some(dar)) { + it should "restart triggers with script update errors" in withHttpService(Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => for { resp <- startTrigger(uri, s"$testPkgId:LowLevelErrorTrigger:trigger", "Alice") @@ -478,68 +458,58 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg _ <- assertTriggerStatus( uri, aliceTrigger, - _ == - Vector( - "starting", - "running", - "stopped: runtime failure", - "starting", - "running", - "stopped: runtime failure", - "starting", - "running", - "stopped: runtime failure", - "starting", - "running", - "stopped: runtime failure" - ) + triggerStatus => { + triggerStatus + .count(_ == "running") == ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 && + triggerStatus.last == "stopped: runtime failure" + } ) _ <- assertTriggerIds(uri, "Alice", _.isEmpty) } yield succeed } - it should "stopping a trigger without providing a token should be unauthorized" in withHttpService( - None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => - val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff" - val req = HttpRequest( - method = HttpMethods.DELETE, - uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")), - ) - for { - resp <- Http().singleRequest(req) - body <- responseBodyToString(resp) - JsObject(fields) = body.parseJson - _ <- fields.get("status") should equal(Some(JsNumber(422))) - _ <- fields.get("errors") should equal( - Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token")))) - } yield succeed - } - - it should "stopping a trigger that can't parse as a UUID gives a 404 response" in withHttpService( - None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => - val uuid: String = "No More Mr Nice Guy" - val req = HttpRequest( - method = HttpMethods.DELETE, - uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")), - ) - for { - resp <- Http().singleRequest(req) - _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) - } yield succeed - } - - it should "stopping an unknown trigger gives an error response" in withHttpService(None) { + it should "not act on a stop request without a token" in withHttpService(None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => - val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff") + val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff" + val req = HttpRequest( + method = HttpMethods.DELETE, + uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")), + ) for { - resp <- stopTrigger(uri, uuid, "Alice") - _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) + resp <- Http().singleRequest(req) body <- responseBodyToString(resp) JsObject(fields) = body.parseJson - _ <- fields.get("status") should equal(Some(JsNumber(404))) + _ <- fields.get("status") should equal(Some(JsNumber(422))) _ <- fields.get("errors") should equal( - Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'")))) + Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token")))) + } yield succeed + } + + it should "give a 404 response for a stop request with unparseable UUID" in withHttpService(None) { + (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => + val uuid: String = "No More Mr Nice Guy" + val req = HttpRequest( + method = HttpMethods.DELETE, + uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")), + ) + for { + resp <- Http().singleRequest(req) + _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) } yield succeed } + it should "give a 404 error response for a stop request on an unknown UUID" in withHttpService( + None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => + val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff") + for { + resp <- stopTrigger(uri, uuid, "Alice") + _ <- assert(resp.status.isFailure() && resp.status.intValue() == 404) + body <- responseBodyToString(resp) + JsObject(fields) = body.parseJson + _ <- fields.get("status") should equal(Some(JsNumber(404))) + _ <- fields.get("errors") should equal( + Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'")))) + } yield succeed + } + }