Skip to content

Commit

Permalink
Add workflow option that allows SSH into Google Genomics workers [BA-…
Browse files Browse the repository at this point in the history
  • Loading branch information
acoffman authored Feb 11, 2020
1 parent 343c9f2 commit 551bbda
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docs/wf_options/Google.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These workflow options provide Google-specific information for workflows running
| `monitoring_script` | `string` | Specifies a GCS URL to a script that will be invoked prior to the user command being run. For example, if the value for monitoring_script is `"gs://bucket/script.sh"`, it will be invoked as `./script.sh > monitoring.log &`. The value `monitoring.log` file will be automatically de-localized. |
| `monitoring_image` | `string` | Specifies a Docker image to monitor the task. This image will run concurrently with the task container, and provides an alternative mechanism to `monitoring_script` (the latter runs *inside* the task container). For example, one can use `quay.io/broadinstitute/cromwell-monitor`, which reports cpu/memory/disk utilization metrics to [Stackdriver](https://cloud.google.com/monitoring/). |
| `google_labels` | `object` | An object containing only string values. Represent custom labels to send with PAPI job requests. Per the PAPI specification, each key and value must conform to the regex `[a-z]([-a-z0-9]*[a-z0-9])?`. |
| `enable_ssh_access` | `boolean` | If set to true, will enable SSH access to the Google Genomics worker machines. Please note that this is a community contribution and is not officially supported by the Cromwell development team.
| `delete_intermediate_output_files` | `boolean` | **Experimental:** Any `File` variables referenced in call `output` sections that are not found in the workflow `output` section will be considered an intermediate `File`. When the workflow finishes and this option is set to `true`, all intermediate `File` objects will be deleted from GCS. Cromwell must be run with the configuration value `system.delete-workflow-files` set to `true`. The default for both values is `false`. NOTE: The behavior of this option on other backends is unspecified. |

<!-- Pasted into then regenerated at https://www.tablesgenerator.com/markdown_tables -->
Expand All @@ -28,6 +29,7 @@ These workflow options provide Google-specific information for workflows running
"auth_bucket": "gs://my-auth-bucket/private",
"monitoring_script": "gs://bucket/script.sh",
"monitoring_image": "quay.io/broadinstitute/cromwell-monitor",
"enable_ssh_access": false,
"google_labels": {
"custom-label": "custom-value"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.backend.google.pipelines.common
object WorkflowOptionKeys {
val MonitoringScript = "monitoring_script"
val MonitoringImage = "monitoring_image"
val EnableSSHAccess = "enable_ssh_access"
val GoogleProject = "google_project"
val GoogleComputeServiceAccount = "google_compute_service_account"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ case class GenomicsFactory(applicationName: String, authMode: GoogleAuthMode, en
with Localization
with UserAction
with Delocalization
with MemoryRetryCheckAction {
with MemoryRetryCheckAction
with SSHAccessAction {

override def build(initializer: HttpRequestInitializer): PipelinesApiRequestFactory = new PipelinesApiRequestFactory {
implicit lazy val googleProjectMetadataLabelDecoder: Decoder[ProjectLabels] = deriveDecoder
Expand Down Expand Up @@ -139,7 +140,8 @@ case class GenomicsFactory(applicationName: String, authMode: GoogleAuthMode, en
val memoryRetryAction: List[Action] = checkForMemoryRetryActions(createPipelineParameters, mounts)
val deLocalization: List[Action] = deLocalizeActions(createPipelineParameters, mounts)
val monitoring: List[Action] = monitoringActions(createPipelineParameters, mounts)
val allActions = containerSetup ++ localization ++ userAction ++ memoryRetryAction ++ deLocalization ++ monitoring
val sshAccess: List[Action] = sshAccessActions(createPipelineParameters, mounts)
val allActions = containerSetup ++ localization ++ userAction ++ memoryRetryAction ++ deLocalization ++ monitoring ++ sshAccess

// adding memory as environment variables makes it easy for a user to retrieve the new value of memory
// on the machine to utilize in their command blocks if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cromwell.backend.google.pipelines.v2alpha1.api

import com.google.api.services.genomics.v2alpha1.model.{Action, Mount}
import cromwell.backend.google.pipelines.common.WorkflowOptionKeys
import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestFactory.CreatePipelineParameters

import scala.collection.JavaConverters._

trait SSHAccessAction {

def sshAccessActions(createPipelineParameters: CreatePipelineParameters, mounts: List[Mount]) : List[Action] = {
val workflowOptions = createPipelineParameters.jobDescriptor.workflowDescriptor.workflowOptions

workflowOptions.getBoolean(WorkflowOptionKeys.EnableSSHAccess).toOption match {
case Some(true) => sshAccessAction(mounts)
case _ => List.empty
}
}

private def sshAccessAction(mounts: List[Mount]): List[Action] = {
val ports = Map("22" -> new Integer(22))

val sshAction = ActionBuilder.withImage("gcr.io/cloud-genomics-pipelines/tools")
.setEntrypoint("ssh-server")
.setFlags(List(ActionFlag.RunInBackground.toString).asJava)
.setMounts(mounts.asJava)
.setPortMappings(mapAsJavaMap(ports))

List(sshAction)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ case class LifeSciencesFactory(applicationName: String, authMode: GoogleAuthMode
with Localization
with UserAction
with Delocalization
with MemoryRetryCheckAction {
with MemoryRetryCheckAction
with SSHAccessAction {

override def build(initializer: HttpRequestInitializer): PipelinesApiRequestFactory = new PipelinesApiRequestFactory {
implicit lazy val googleProjectMetadataLabelDecoder: Decoder[ProjectLabels] = deriveDecoder
Expand Down Expand Up @@ -139,7 +140,8 @@ case class LifeSciencesFactory(applicationName: String, authMode: GoogleAuthMode
val memoryRetryAction: List[Action] = checkForMemoryRetryActions(createPipelineParameters, mounts)
val deLocalization: List[Action] = deLocalizeActions(createPipelineParameters, mounts)
val monitoring: List[Action] = monitoringActions(createPipelineParameters, mounts)
val allActions = containerSetup ++ localization ++ userAction ++ memoryRetryAction ++ deLocalization ++ monitoring
val sshAccess: List[Action] = sshAccessActions(createPipelineParameters, mounts)
val allActions = containerSetup ++ localization ++ userAction ++ memoryRetryAction ++ deLocalization ++ monitoring ++ sshAccess

// adding memory as environment variables makes it easy for a user to retrieve the new value of memory
// on the machine to utilize in their command blocks if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cromwell.backend.google.pipelines.v2beta.api

import com.google.api.services.lifesciences.v2beta.model.{Action, Mount}

import cromwell.backend.google.pipelines.common.WorkflowOptionKeys
import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestFactory.CreatePipelineParameters

import scala.collection.JavaConverters._

trait SSHAccessAction {

def sshAccessActions(createPipelineParameters: CreatePipelineParameters, mounts: List[Mount]) : List[Action] = {
val workflowOptions = createPipelineParameters.jobDescriptor.workflowDescriptor.workflowOptions

workflowOptions.getBoolean(WorkflowOptionKeys.EnableSSHAccess).toOption match {
case Some(true) => sshAccessAction(mounts)
case _ => List.empty
}
}

private def sshAccessAction(mounts: List[Mount]): List[Action] = {
val ports = Map("22" -> new Integer(22))

val sshAction = ActionBuilder.withImage("gcr.io/cloud-genomics-pipelines/tools")
.setEntrypoint("ssh-server")
.setRunInBackground(true)
.setMounts(mounts.asJava)
.setPortMappings(mapAsJavaMap(ports))

List(sshAction)

}

}

0 comments on commit 551bbda

Please sign in to comment.