Skip to content

Commit

Permalink
Retry with more memory [BA-5933] (#5180)
Browse files Browse the repository at this point in the history
  • Loading branch information
salonishah11 authored Sep 27, 2019
1 parent 8fedfa6 commit e956df7
Show file tree
Hide file tree
Showing 36 changed files with 501 additions and 55 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Cromwell Change Log

## 47 Release Notes

### Retry with more memory on Papiv2 [(#5180)](https://github.com/broadinstitute/cromwell/pull/5180)

Cromwell now allows user defined retries. With `memory-retry` config you can specify an array of strings which when encountered in the `stderr`
file by Cromwell, allows the task to be retried with multiplier factor mentioned in the config. More information [here](https://cromwell.readthedocs.io/en/stable/backends/Google/).

## 46 Release Notes

### Nvidia GPU Driver Update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package cromwell.backend

import akka.actor.ActorLogging
import akka.event.LoggingReceive
import common.validation.Validation.{GreaterEqualOne, GreaterEqualRefined}
import cromwell.backend.BackendJobExecutionActor._
import cromwell.backend.BackendLifecycleActor._
import cromwell.backend.OutputEvaluator.EvaluatedJobOutputs
import cromwell.core.path.Path
import cromwell.core._
import cromwell.core.path.Path
import eu.timepit.refined.refineMV
import wom.expression.IoFunctionSet
import wom.values.WomValue

Expand Down Expand Up @@ -44,7 +46,10 @@ object BackendJobExecutionActor {

sealed trait BackendJobFailedResponse extends BackendJobExecutionResponse { def throwable: Throwable; def returnCode: Option[Int] }
case class JobFailedNonRetryableResponse(jobKey: JobKey, throwable: Throwable, returnCode: Option[Int]) extends BackendJobFailedResponse
case class JobFailedRetryableResponse(jobKey: BackendJobDescriptorKey, throwable: Throwable, returnCode: Option[Int]) extends BackendJobFailedResponse
case class JobFailedRetryableResponse(jobKey: BackendJobDescriptorKey,
throwable: Throwable,
returnCode: Option[Int],
memoryMultiplier: GreaterEqualRefined = refineMV[GreaterEqualOne](1.0)) extends BackendJobFailedResponse

// Reconnection Exceptions
case class JobReconnectionNotSupportedException(jobKey: BackendJobDescriptorKey) extends Exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging with SlowJob
case Finish(FailedNonRetryableExecutionHandle(throwable, returnCode)) =>
completionPromise.success(JobFailedNonRetryableResponse(jobDescriptor.key, throwable, returnCode))
context.stop(self)
case Finish(FailedRetryableExecutionHandle(throwable, returnCode)) =>
completionPromise.success(JobFailedRetryableResponse(jobDescriptor.key, throwable, returnCode))
case Finish(FailedRetryableExecutionHandle(throwable, returnCode, memoryMultiplier)) =>
completionPromise.success(JobFailedRetryableResponse(jobDescriptor.key, throwable, returnCode, memoryMultiplier))
context.stop(self)
case Finish(cromwell.backend.async.AbortedExecutionHandle) =>
completionPromise.success(JobAbortedResponse(jobDescriptor.key))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cromwell.backend.async

import common.validation.Validation.{GreaterEqualOne, GreaterEqualRefined}
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.async.AsyncBackendJobExecutionActor.JobId
import cromwell.core.path.Path
import cromwell.core.{CallOutputs, ExecutionEvent}
import eu.timepit.refined.refineMV

/**
* Trait to encapsulate whether an execution is complete and if so provide a result. Useful in conjunction
Expand Down Expand Up @@ -35,7 +37,9 @@ final case class FailedNonRetryableExecutionHandle(throwable: Throwable, returnC
override val result = NonRetryableExecution(throwable, returnCode)
}

final case class FailedRetryableExecutionHandle(throwable: Throwable, returnCode: Option[Int] = None) extends ExecutionHandle {
final case class FailedRetryableExecutionHandle(throwable: Throwable,
returnCode: Option[Int] = None,
memoryMultiplier: GreaterEqualRefined = refineMV[GreaterEqualOne](1.0)) extends ExecutionHandle {
override val isDone = true
override val result = RetryableExecution(throwable, returnCode)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package cromwell.backend.async

import cromwell.core.path.Path
import common.exception.ThrowableAggregation
import common.validation.Validation.GreaterEqualRefined
import cromwell.core.path.Path
import wom.expression.{NoIoFunctionSet, WomExpression}

abstract class KnownJobFailureException extends Exception {
Expand All @@ -25,6 +26,18 @@ final case class StderrNonEmpty(jobTag: String, stderrLength: Long, stderrPath:
override def getMessage = s"stderr for job $jobTag has length $stderrLength and 'failOnStderr' runtime attribute was true."
}

final case class RetryWithMoreMemory(jobTag: String, stderrPath: Option[Path]) extends KnownJobFailureException {
override def getMessage = s"stderr for job $jobTag contained one of the `memory-retry` error-keys specified in the config. " +
"Job might have run out of memory."
}

final case class MemoryMultiplierNotPositive(jobTag: String,
stderrPath: Option[Path],
currentMultiplier: GreaterEqualRefined,
memoryRetryFactor: GreaterEqualRefined) extends KnownJobFailureException {
override def getMessage = s"The result of multiplying current memory multiplier $currentMultiplier with memory retry factor $memoryRetryFactor was not positive."
}


object RuntimeAttributeValidationFailure {
def apply(jobTag: String,
Expand Down
6 changes: 5 additions & 1 deletion backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilderFactory}
import cromwell.core.{CallKey, HogGroup, WorkflowId, WorkflowOptions}
import cromwell.docker.DockerInfoActor.DockerSize
import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse
import eu.timepit.refined.refineMV
import net.ceedubs.ficus.Ficus._
import wom.callable.{ExecutableCallable, MetaValueElement}
import wom.graph.CommandCallNode
Expand All @@ -26,7 +27,10 @@ import scala.util.Try
/**
* For uniquely identifying a job which has been or will be sent to the backend.
*/
case class BackendJobDescriptorKey(call: CommandCallNode, index: Option[Int], attempt: Int) extends CallKey {
case class BackendJobDescriptorKey(call: CommandCallNode,
index: Option[Int],
attempt: Int,
memoryMultiplier: GreaterEqualRefined = refineMV[GreaterEqualOne](1.0)) extends CallKey {
def node = call
private val indexString = index map { _.toString } getOrElse "NA"
lazy val tag = s"${call.fullyQualifiedName}:$indexString:$attempt"
Expand Down
2 changes: 2 additions & 0 deletions backend/src/main/scala/cromwell/backend/io/JobPaths.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait JobPaths {

def workflowPaths: WorkflowPaths
def returnCodeFilename: String = "rc"
def memoryRetryRCFilename: String = "memory_retry_rc"
def defaultStdoutFilename = "stdout"
def defaultStderrFilename = "stderr"
def isDocker: Boolean = false
Expand Down Expand Up @@ -72,6 +73,7 @@ trait JobPaths {
lazy val script = callExecutionRoot.resolve(scriptFilename)
lazy val dockerCid = callExecutionRoot.resolve(dockerCidFilename)
lazy val returnCode = callExecutionRoot.resolve(returnCodeFilename)
lazy val memoryRetryRC = callExecutionRoot.resolve(memoryRetryRCFilename)

// This is a `def` because `standardPaths` is a `var` that may be reassigned during the calculation of
// standard output and error file names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import cromwell.core.{CromwellAggregatedException, CromwellFatalExceptionMarker,
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import eu.timepit.refined.refineV
import mouse.all._
import net.ceedubs.ficus.Ficus._
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import shapeless.Coproduct
import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression, RuntimeEnvironment}
import wom.expression.WomExpression
Expand Down Expand Up @@ -84,6 +86,9 @@ trait StandardAsyncExecutionActor
val SIGINT = 130
val SIGKILL = 137

// `CheckingForMemoryRetry` action exits with code 0 if the stderr file contains keys mentioned in `memory-retry` config.
val StderrContainsRetryKeysCode = 0

/** The type of the run info when a job is started. */
type StandardAsyncRunInfo

Expand Down Expand Up @@ -180,6 +185,8 @@ trait StandardAsyncExecutionActor
*/
lazy val commandDirectory: Path = jobPaths.callExecutionRoot

lazy val memoryRetryFactor: Option[GreaterEqualRefined] = None

/**
* Returns the shell scripting for finding all files listed within a directory.
*
Expand Down Expand Up @@ -895,14 +902,22 @@ trait StandardAsyncExecutionActor
* @return The execution handle.
*/
def retryElseFail(runStatus: StandardAsyncRunState,
backendExecutionStatus: Future[ExecutionHandle]): Future[ExecutionHandle] = {
backendExecutionStatus: Future[ExecutionHandle],
retryWithMoreMemory: Boolean = false): Future[ExecutionHandle] = {

val retryable = previousFailedRetries < maxRetries

backendExecutionStatus flatMap {
case failed: FailedNonRetryableExecutionHandle if retryable =>
incrementFailedRetryCount map { _ =>
FailedRetryableExecutionHandle(failed.throwable, failed.returnCode)
val currentMemoryMultiplier = jobDescriptor.key.memoryMultiplier
(retryWithMoreMemory, memoryRetryFactor) match {
case (true, Some(multiplier)) => refineV[GreaterEqualOne](currentMemoryMultiplier.value * multiplier.value) match {
case Left(_) => FailedNonRetryableExecutionHandle(MemoryMultiplierNotPositive(jobDescriptor.key.tag, None, currentMemoryMultiplier, multiplier), failed.returnCode)
case Right(newMultiplier) => FailedRetryableExecutionHandle(failed.throwable, failed.returnCode, newMultiplier)
}
case (_, _) => FailedRetryableExecutionHandle(failed.throwable, failed.returnCode, currentMemoryMultiplier)
}
}
case _ => backendExecutionStatus
}
Expand Down Expand Up @@ -1074,18 +1089,51 @@ trait StandardAsyncExecutionActor
def handleExecutionResult(status: StandardAsyncRunState,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {

def memoryRetryRC: Future[Boolean] = {
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with double memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}

def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}

for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}

val stderr = jobPaths.standardPaths.error
lazy val stderrAsOption: Option[Path] = Option(stderr)

val stderrSizeAndReturnCode = for {
val stderrSizeAndReturnCodeAndMemoryRetry = for {
returnCodeAsString <- asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
} yield (stderrSize, returnCodeAsString)
retryWithMoreMemory <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, retryWithMoreMemory)

stderrSizeAndReturnCode flatMap {
case (stderrSize, returnCodeAsString) =>
stderrSizeAndReturnCodeAndMemoryRetry flatMap {
case (stderrSize, returnCodeAsString, retryWithMoreMemory) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isDone(status)) {
Expand All @@ -1098,6 +1146,9 @@ trait StandardAsyncExecutionActor
case Success(returnCodeAsInt) if !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt)))
retryElseFail(status, executionHandle)
case Success(returnCodeAsInt) if retryWithMoreMemory =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption), Option(returnCodeAsInt)))
retryElseFail(status, executionHandle, retryWithMoreMemory)
case Success(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
case Failure(_) =>
Expand Down
2 changes: 1 addition & 1 deletion backend/src/test/scala/cromwell/backend/BackendSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito with ScaledTim
case (JobFailedNonRetryableResponse(_, failure, _), JobFailedNonRetryableResponse(_, expectedFailure, _)) =>
failure.getClass shouldBe expectedFailure.getClass
concatenateCauseMessages(failure) should include(expectedFailure.getMessage)
case (JobFailedRetryableResponse(_, failure, _), JobFailedRetryableResponse(_, expectedFailure, _)) =>
case (JobFailedRetryableResponse(_, failure, _, _), JobFailedRetryableResponse(_, expectedFailure, _, _)) =>
failure.getClass shouldBe expectedFailure.getClass
case (response, expectation) =>
fail(s"Execution response $response wasn't conform to expectation $expectation")
Expand Down
2 changes: 2 additions & 0 deletions backend/src/test/scala/cromwell/backend/io/JobPathsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class JobPathsSpec extends FlatSpec with Matchers with BackendSpec {
fullPath(s"/cromwell-executions/wf_hello/$id/call-hello/execution/stdout")
jobPaths.toDockerPath(DefaultPathBuilder.get("/cromwell-executions/dock/path")).pathAsString shouldBe
fullPath("/cromwell-executions/dock/path")
jobPaths.memoryRetryRC.pathAsString shouldBe
fullPath(s"local-cromwell-executions/wf_hello/$id/call-hello/execution/memory_retry_rc")

val jobKeySharded: BackendJobDescriptorKey = BackendJobDescriptorKey(call, Option(0), 1)
val jobPathsSharded = new JobPathsWithDocker(workflowPaths, jobKeySharded)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: retry_with_more_memory
testFormat: workflowfailure
backends: [Papiv2-Retry-With-More-Memory]

files {
workflow: retry_with_more_memory/retry_with_more_memory.wdl
}

metadata {
workflowName: retry_with_more_memory
status: Failed
"failures.0.message": "Workflow failed"
"failures.0.causedBy.0.message": "stderr for job retry_with_more_memory.imitate_oom_error:NA:2 contained one of the `memory-retry` error-keys specified in the config. Job might have run out of memory."
"retry_with_more_memory.imitate_oom_error.-1.attempt": 2
"retry_with_more_memory.imitate_oom_error.-1.executionStatus": "Failed"
"retry_with_more_memory.imitate_oom_error.-1.runtimeAttributes.memory": "1.1 GB"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version 1.0

task imitate_oom_error {
command {
printf "Exception in thread "main" java.lang.OutOfMemoryError: testing\n\tat Test.main(Test.java:1)\n" >&2 && (exit 1)
}
runtime {
docker: "python:latest"
memory: "1 GB"
continueOnReturnCode: true
maxRetries: 1
backend: "Papiv2-Retry-With-More-Memory"
}
}

workflow retry_with_more_memory {
call imitate_oom_error
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ files {
metadata {
workflowName: workbench_health_monitor_check
status: Succeeded
"outputs.workbench_health_monitor_check.out": """{"DockerHub":{"ok":true},"Engine Database":{"ok":true},"GCS":{"ok":true},"Papi":{"ok":true},"Papi-Caching-No-Copy":{"ok":true},"Papiv2":{"ok":true},"Papiv2-Virtual-Private-Cloud":{"ok":true},"Papiv2NoDockerHubConfig":{"ok":true},"Papiv2RequesterPays":{"ok":true},"Papiv2USADockerhub":{"ok":true}}"""
"outputs.workbench_health_monitor_check.out": """{"DockerHub":{"ok":true},"Engine Database":{"ok":true},"GCS":{"ok":true},"Papi":{"ok":true},"Papi-Caching-No-Copy":{"ok":true},"Papiv2":{"ok":true},"Papiv2-Retry-With-More-Memory":{"ok":true},"Papiv2-Virtual-Private-Cloud":{"ok":true},"Papiv2NoDockerHubConfig":{"ok":true},"Papiv2RequesterPays":{"ok":true},"Papiv2USADockerhub":{"ok":true}}"""
}
6 changes: 6 additions & 0 deletions common/src/main/scala/common/validation/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ import cats.syntax.validated._
import common.Checked
import common.exception.AggregatedMessageException
import common.validation.ErrorOr.ErrorOr
import eu.timepit.refined.numeric.GreaterEqual
import eu.timepit.refined._
import eu.timepit.refined.api.Refined
import org.slf4j.Logger

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object Validation {

type GreaterEqualOne = GreaterEqual[W.`1.0`.T]
type GreaterEqualRefined = Refined[Double, GreaterEqualOne]

private type ThrowableToStringFunction = Throwable => String
private def defaultThrowableToString: ThrowableToStringFunction = {
case ite: InvocationTargetException => ite.getTargetException.getMessage
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/cromwell/core/MockIoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor {
case command: IoDeleteCommand => sender() ! IoSuccess(command, ())
case command: IoSizeCommand => sender() ! IoSuccess(command, 0L)
case command: IoContentAsStringCommand => sender() ! IoSuccess(command, "0")
case command: IoExistsCommand => sender() ! IoSuccess(command, false)

// With context
case (requestContext: Any, command: IoCopyCommand) => sender() ! (requestContext -> IoSuccess(command, ()))
case (requestContext: Any, command: IoWriteCommand) => sender() ! (requestContext -> IoSuccess(command, ()))
case (requestContext: Any, command: IoDeleteCommand) => sender() ! (requestContext -> IoSuccess(command, ()))
case (requestContext: Any, command: IoSizeCommand) => sender() ! (requestContext -> IoSuccess(command, stderrSize))
case (requestContext: Any, command: IoContentAsStringCommand) => sender() ! (requestContext -> IoSuccess(command, returnCode))
case (requestContext: Any, command: IoExistsCommand) => sender() ! (requestContext -> IoSuccess(command, false))

case withPromise: IoCommandWithPromise[_] => self ! ((withPromise.promise, withPromise.ioCommand))

Expand Down
Loading

0 comments on commit e956df7

Please sign in to comment.