Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger service: Remove data in messages from TriggerRunnerImpl to Server #6554

Merged
merged 2 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.lf.engine.trigger

import java.util.UUID

import akka.actor.typed.ActorRef
import akka.http.scaladsl.Http.ServerBinding

Expand All @@ -16,16 +18,18 @@ final case class Started(binding: ServerBinding) extends Message

case object Stop extends Message

final case class TriggerStarting(runningTrigger: RunningTrigger) extends Message
// Messages passed to the server from a TriggerRunnerImpl

final case class TriggerStarting(triggerInstance: UUID) extends Message

final case class TriggerStarted(runningTrigger: RunningTrigger) extends Message
final case class TriggerStarted(triggerInstance: UUID) extends Message

final case class TriggerInitializationFailure(
runningTrigger: RunningTrigger,
triggerInstance: UUID,
cause: String
) extends Message

final case class TriggerRuntimeFailure(
runningTrigger: RunningTrigger,
triggerInstance: UUID,
cause: String
) extends Message
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ class Server(
new TriggerRunner.Config(
ctx.self,
triggerInstance,
triggerName,
credentials,
compiledPackages,
trigger,
Expand Down Expand Up @@ -358,29 +357,29 @@ object Server {
def running(binding: ServerBinding): Behavior[Message] =
Behaviors
.receiveMessage[Message] {
case TriggerStarting(runningTrigger) =>
server.logTriggerStatus(runningTrigger.triggerInstance, "starting")
case TriggerStarting(triggerInstance) =>
server.logTriggerStatus(triggerInstance, "starting")
Behaviors.same

// Running triggers are added to the store optimistically when the user makes a start
// request so we don't need to add an entry here.
case TriggerStarted(runningTrigger) =>
server.logTriggerStatus(runningTrigger.triggerInstance, "running")
case TriggerStarted(triggerInstance) =>
server.logTriggerStatus(triggerInstance, "running")
Behaviors.same

// Trigger failures are handled by the TriggerRunner actor using a restart strategy with
// exponential backoff. The trigger is never really "stopped" this way (though it could
// fail and restart indefinitely) so in particular we don't need to change the store of
// running triggers. Entries are removed from there only when the user explicitly stops
// the trigger with a request.
case TriggerInitializationFailure(runningTrigger, cause) =>
case TriggerInitializationFailure(triggerInstance, cause) =>
server
.logTriggerStatus(runningTrigger.triggerInstance, "stopped: initialization failure")
.logTriggerStatus(triggerInstance, "stopped: initialization failure")
// Don't send any messages to the runner here (it's under
// the management of a supervision strategy).
Behaviors.same
case TriggerRuntimeFailure(runningTrigger, cause) =>
server.logTriggerStatus(runningTrigger.triggerInstance, "stopped: runtime failure")
case TriggerRuntimeFailure(triggerInstance, cause) =>
server.logTriggerStatus(triggerInstance, "stopped: runtime failure")
// Don't send any messages to the runner here (it's under
// the management of a supervision strategy).
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scalaz.syntax.tag._
import com.daml.lf.CompiledPackages
import com.daml.lf.data.Ref.Identifier
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
import com.daml.ledger.api.v1.event.CreatedEvent
Expand All @@ -30,10 +29,8 @@ object TriggerRunnerImpl {
case class Config(
server: ActorRef[Message],
triggerInstance: UUID,
triggerName: Identifier,
credentials: UserCredentials,
// TODO(SF, 2020-06-09): Add access token field here in the
// presence of authentication.
// TODO(SF, 2020-06-09): Add access token field here in the presence of authentication.
compiledPackages: CompiledPackages,
trigger: Trigger,
ledgerConfig: LedgerConfig,
Expand All @@ -53,10 +50,9 @@ object TriggerRunnerImpl {
Behaviors.setup { ctx =>
val name = ctx.self.path.name
implicit val ec: ExecutionContext = ctx.executionContext
val triggerInstance = config.triggerInstance
// Report to the server that this trigger is starting.
val runningTrigger =
RunningTrigger(config.triggerInstance, config.triggerName, config.credentials)
config.server ! TriggerStarting(runningTrigger)
config.server ! TriggerStarting(triggerInstance)
ctx.log.info(s"Trigger $name is starting")
val appId = ApplicationId(name)
val clientConfig = LedgerClientConfiguration(
Expand All @@ -79,13 +75,13 @@ object TriggerRunnerImpl {
// The stop endpoint can't send a message to a runner
// that isn't in the running triggers table so this is
// an odd case.
config.server ! TriggerInitializationFailure(runningTrigger, cause.toString)
config.server ! TriggerInitializationFailure(triggerInstance, cause.toString)
// However we got here though, one thing is clear. We
// don't want to restart the actor.
throw new InitializationHalted("User stopped") // Don't retry!
} else {
// Report the failure to the server.
config.server ! TriggerInitializationFailure(runningTrigger, cause.toString)
config.server ! TriggerInitializationFailure(triggerInstance, cause.toString)
// Tell our monitor there's been a failure. The
// monitor's supervision strategy will respond to this
// (including logging the exception).
Expand All @@ -96,7 +92,7 @@ object TriggerRunnerImpl {
// The stop endpoint can't send a message to a runner
// that isn't in the running triggers table so this is
// an odd case.
config.server ! TriggerInitializationFailure(runningTrigger, "User stopped")
config.server ! TriggerInitializationFailure(triggerInstance, "User stopped")
// However we got here though, one thing is clear. We
// don't want to restart the actor.
throw new InitializationHalted("User stopped") // Don't retry!
Expand Down Expand Up @@ -127,12 +123,12 @@ object TriggerRunnerImpl {
}
// Report to the server that this trigger is entering
// the running state.
config.server ! TriggerStarted(runningTrigger)
config.server ! TriggerStarted(triggerInstance)
running(killSwitch)
} catch {
case cause: Throwable =>
// Report the failure to the server.
config.server ! TriggerInitializationFailure(runningTrigger, cause.toString)
config.server ! TriggerInitializationFailure(triggerInstance, cause.toString)
// Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to
// this by writing the exception to the log and
Expand All @@ -158,7 +154,7 @@ object TriggerRunnerImpl {
Behaviors.stopped
case Failed(cause) =>
// Report the failure to the server.
config.server ! TriggerRuntimeFailure(runningTrigger, cause.toString)
config.server ! TriggerRuntimeFailure(triggerInstance, cause.toString)
// Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to this by
// writing the exception to the log and attempting to
Expand Down