Skip to content

Commit

Permalink
Refactor startTrigger (digital-asset#6219)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
shayne-fletcher authored Jun 4, 2020
1 parent 09254a0 commit 2c3efc6
Showing 1 changed file with 28 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Scheduler}
import akka.actor.typed.PostStop
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
Expand Down Expand Up @@ -196,40 +195,33 @@ object Server {
new AkkaExecutionSequencerPool("TriggerService")(untypedSystem)
implicit val dateTimeFormat: RootJsonFormat[LocalDateTime] = LocalDateTimeJsonFormat

def startTrigger(
ctx: ActorContext[Server.Message],
token: (Jwt, JwtPayload),
trigger: Trigger,
triggerName: Identifier,
ledgerConfig: LedgerConfig,
maxInboundMessageSize: Int,
maxFailureNumberOfRetries: Int,
failureRetryTimeRange: Duration,
): JsValue = {
val jwt: Jwt = token._1
val jwtPayload: JwtPayload = token._2
val party: Party = Party(jwtPayload.party);
val triggerInstance = UUID.randomUUID
val ref = ctx.spawn(
TriggerRunner(
new TriggerRunner.Config(
ctx.self,
triggerInstance,
triggerName,
jwt,
server.compiledPackages,
trigger,
ledgerConfig,
maxInboundMessageSize,
maxFailureNumberOfRetries,
failureRetryTimeRange,
party
def startTrigger(token: (Jwt, JwtPayload), triggerName: Identifier): Either[String, JsValue] = {
for {
trigger <- Trigger.fromIdentifier(server.compiledPackages, triggerName).right
jwt = token._1
jwtPayload = token._2
party = Party(jwtPayload.party);
triggerInstance = UUID.randomUUID
_ = ctx.spawn(
TriggerRunner(
new TriggerRunner.Config(
ctx.self,
triggerInstance,
triggerName,
jwt,
server.compiledPackages,
trigger,
ledgerConfig,
maxInboundMessageSize,
maxFailureNumberOfRetries,
failureRetryTimeRange,
party
),
triggerInstance.toString
),
triggerInstance.toString
),
triggerInstance.toString + "-monitor"
)
JsObject(("triggerId", triggerInstance.toString.toJson))
triggerInstance.toString + "-monitor"
)
} yield (JsObject(("triggerId", triggerInstance.toString.toJson)))
}

def stopTrigger(uuid: UUID, token: (Jwt, JwtPayload)): Option[JsValue] = {
Expand Down Expand Up @@ -278,21 +270,10 @@ object Server {
complete(
errorResponse(StatusCodes.UnprocessableEntity, unauthorized.message)),
token =>
Trigger
.fromIdentifier(server.compiledPackages, params.triggerName) match {
startTrigger(token, params.triggerName) match {
case Left(err) =>
complete(errorResponse(StatusCodes.UnprocessableEntity, err))
case Right(trigger) =>
val triggerInstance = startTrigger(
ctx,
token,
trigger,
params.triggerName,
ledgerConfig,
maxInboundMessageSize,
maxFailureNumberOfRetries,
failureRetryTimeRange
)
case Right(triggerInstance) =>
complete(successResponse(triggerInstance))
}
)
Expand Down

0 comments on commit 2c3efc6

Please sign in to comment.