Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Sep 22, 2021
2 parents d7c96a0 + d111b05 commit 901826e
Show file tree
Hide file tree
Showing 45 changed files with 834 additions and 259 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Cromwell Change Log

## 69 Release Notes

### Bug Fixes

### DRS/`basename` Fix

The WDL `basename` function should now work as expected with DRS paths, giving the basename of the
resolved file, not just a substring of the DRS path.

## 68 Release Notes

### Virtual Private Cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package cromwell.backend

import cromwell.core.io.AsyncIoFunctions
import cromwell.core.path.PathFactory
import cromwell.filesystems.drs.{DrsPath, DrsResolver}
import wom.expression.IoFunctionSet

import scala.concurrent.Future
import scala.util.Try

trait ReadLikeFunctions extends PathFactory with IoFunctionSet with AsyncIoFunctions {

override def resolvedFileBasename(value: String): Future[String] = buildPath(value) match {
case drsPath: DrsPath => DrsResolver.getResolvedBasename(drsPath).unsafeToFuture()
case path =>
val name = path.name
if (name.nonEmpty) Future.successful(name)
else Future.failed(new Exception(s"No resolvable basename for $value. Was it a directory?"))
}

override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] =
Future.fromTry(Try(buildPath(path))) flatMap { p => asyncIo.contentAsStringAsync(p, maxBytes, failOnOverflow) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import wom.expression.IoFunctionSet.{IoDirectory, IoFile}

import scala.concurrent.Await
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

class DirectoryFunctionsSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers {
Expand All @@ -18,7 +18,8 @@ class DirectoryFunctionsSpec extends AnyFlatSpec with CromwellTimeoutSpec with M
override def copyFile(source: String, destination: String) = throw new UnsupportedOperationException()
override def glob(pattern: String) = throw new UnsupportedOperationException()
override def size(path: String) = throw new UnsupportedOperationException()
override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean) = throw new UnsupportedOperationException()
override def resolvedFileBasename(path: String): Future[String] = throw new UnsupportedOperationException()
override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean) = throw new UnsupportedOperationException()
override def pathFunctions = throw new UnsupportedOperationException()
override def writeFile(path: String, content: String) = throw new UnsupportedOperationException()
override implicit def ec = throw new UnsupportedOperationException()
Expand All @@ -32,7 +33,7 @@ class DirectoryFunctionsSpec extends AnyFlatSpec with CromwellTimeoutSpec with M
val innerDir = (rootDir / "innerDir").createDirectories()
val link = innerDir / "linkToRootDirInInnerDir"
link.symbolicLinkTo(rootDir)

def listRecursively(path: String)(visited: Vector[String] = Vector.empty): Iterator[String] = {
Await.result(functions.listDirectory(path)(visited), Duration.Inf) flatMap {
case IoFile(v) => List(v)
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ lazy val backend = project
.dependsOn(services)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(drsFileSystem)

lazy val googlePipelinesCommon = (project in backendRoot / "google" / "pipelines" / "common")
.withLibrarySettings("cromwell-pipelines-common")
Expand Down
22 changes: 22 additions & 0 deletions centaur/src/main/resources/standardTestCases/drs_basename.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: drs_basename
testFormat: WorkflowSuccess
backends: ["papi-v2-usa"]
skipDescribeEndpointValidation: true

files {
workflow: drs_tests/drs_basename.wdl
options-dir: "Error: BA-6546 The environment variable CROMWELL_BUILD_RESOURCES_DIRECTORY must be set/export pointing to a valid path such as '${YOUR_CROMWELL_DIR}/target/ci/resources'"
options-dir: ${?CROMWELL_BUILD_RESOURCES_DIRECTORY}
options: ${files.options-dir}/papi_v2_usa.options.json
inputs: drs_tests/drs_basename.inputs
}

metadata {
workflowName: drs_basename
status: Succeeded
"outputs.drs_basename.basenames.0": "E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"outputs.drs_basename.basenames.1": "E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"outputs.drs_basename.basenames.2": "E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"outputs.drs_basename.basenames.3": "E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"outputs.drs_basename.basenames.4": "E18_20161004_Neurons_Sample_49_S048_L004_R2_005.foo.bar"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
# This comment and file path were copied from drs_usa_hca.inputs:
# Are you here because HCA DRS data moved, again? Do not rage `rm -f` this test without first
# reading BT-182 for the multiple reasons why HCA is different from a "hello world"!
"drs_basename.file":
"drs://jade.datarepo-dev.broadinstitute.org/v1_4641bafb-5190-425b-aea9-9c7b125515c8_e37266ba-790d-4641-aa76-854d94be2fbe"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version 1.0

workflow drs_basename {
input {
File file
}

String workflow_level_basename = basename(file)

call pump_up_the_base { input:
file = file,
pre_basenamed = workflow_level_basename,
basednamed_at_call_site = basename(file)
}

output {
Array[String] basenames = pump_up_the_base.basenames
}
}

task pump_up_the_base {
input {
File file
String pre_basenamed
String basednamed_at_call_site
}

String task_level_basename = basename(file)

command <<<
echo ~{pre_basenamed}
echo ~{basednamed_at_call_site}
echo ~{task_level_basename}
echo ~{basename(file)}
# Also validate the two-parameter version of the function with a DRS input for file extension replacements:
echo ~{basename(file, ".fastq.gz")}.foo.bar
>>>

output {
Array[String] basenames = read_lines(stdout())
}

runtime {
docker: "ubuntu"
backend: "papi-v2-usa"
}

parameter_meta {
# We don't actually want to use this file, just play around with its basename:
file: { localization_optional: true }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ workflow taskless_engine_functions {

Array[String] strings = ["a", "b"]

String filepath = "gs://not/a/real/file.txt"
# This is a local file path so that all test backends can interpret it. In Cromwell instances without a local
# backend, we might expect this to fail because "that doesn't look like a path"
String filepath = "/not/a/real/file.txt"

Array[Array[Int]] matrix = [
[1, 0],
Expand Down
3 changes: 2 additions & 1 deletion centaurCwlRunner/src/main/scala/centaur/cwl/Outputs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package centaur.cwl

import centaur.api.CentaurCromwellClient
import centaur.test.metadata.WorkflowFlatMetadata._
import common.collections.EnhancedCollections._
import common.validation.IOChecked._
import cromwell.api.model.SubmittedWorkflow
import cromwell.core.path.PathBuilder
import cwl.ontology.Schema
import cwl.{Cwl, CwlDecoder, MyriadOutputType}
import io.circe.Json
import io.circe.syntax._
import scalaz.syntax.std.map._
import mouse.map._
import shapeless.Poly1
import spray.json.{JsObject, JsString, JsValue}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloud.nio.impl.drs
import cats.data.NonEmptyList
import cats.effect.{IO, Resource}
import cats.implicits._
import cloud.nio.impl.drs.DrsPathResolver.{FatalRetryDisposition, RegularRetryDisposition}
import cloud.nio.impl.drs.MarthaResponseSupport._
import common.exception.toIO
import io.circe._
Expand Down Expand Up @@ -45,17 +46,32 @@ abstract class DrsPathResolver(drsConfig: DrsConfig, retryInternally: Boolean =
}

private def httpResponseToMarthaResponse(drsPathForDebugging: String)(httpResponse: HttpResponse): IO[MarthaResponse] = {
val marthaResponseEntityOption = Option(httpResponse.getEntity).map(EntityUtils.toString)
val responseStatusLine = httpResponse.getStatusLine
val status = responseStatusLine.getStatusCode

val exceptionMsg = errorMessageFromResponse(drsPathForDebugging, marthaResponseEntityOption, responseStatusLine, drsConfig.marthaUrl)
val responseEntityOption = (responseStatusLine.getStatusCode == HttpStatus.SC_OK).valueOrZero(marthaResponseEntityOption)
val responseContentIO = toIO(responseEntityOption, exceptionMsg)
lazy val retryMessage = {
val reason = responseStatusLine.getReasonPhrase
s"Unexpected response during resolution of '$drsPathForDebugging': HTTP status $status: $reason"
}

responseContentIO.flatMap{ responseContent =>
IO.fromEither(decode[MarthaResponse](responseContent))
}.handleErrorWith {
e => IO.raiseError(new RuntimeException(s"Unexpected response during DRS resolution: ${ExceptionUtils.getMessage(e)}"))
status match {
case 408 | 429 =>
IO.raiseError(new RuntimeException(retryMessage) with RegularRetryDisposition)
case s if s / 100 == 4 =>
IO.raiseError(new RuntimeException(retryMessage) with FatalRetryDisposition)
case s if s / 100 == 5 =>
IO.raiseError(new RuntimeException(retryMessage) with RegularRetryDisposition)
case _ =>
val marthaResponseEntityOption = Option(httpResponse.getEntity).map(EntityUtils.toString)
val exceptionMsg = errorMessageFromResponse(drsPathForDebugging, marthaResponseEntityOption, responseStatusLine, drsConfig.marthaUrl)
val responseEntityOption = (responseStatusLine.getStatusCode == HttpStatus.SC_OK).valueOrZero(marthaResponseEntityOption)
val responseContentIO = toIO(responseEntityOption, exceptionMsg)

responseContentIO.flatMap { responseContent =>
IO.fromEither(decode[MarthaResponse](responseContent))
}.handleErrorWith {
e => IO.raiseError(new RuntimeException(s"Unexpected response during DRS resolution: ${ExceptionUtils.getMessage(e)}"))
}
}
}

Expand Down Expand Up @@ -118,6 +134,11 @@ abstract class DrsPathResolver(drsConfig: DrsConfig, retryInternally: Boolean =

object DrsPathResolver {
final val ExtractUriErrorMsg = "No access URL nor GCS URI starting with 'gs://' found in Martha response!"
sealed trait RetryDisposition
// Should immediately fail the download attempt.
trait FatalRetryDisposition extends RetryDisposition
// Should increase the attempt counter and continue retrying if more retry attempts remain.
trait RegularRetryDisposition extends RetryDisposition
}

object MarthaField extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package cromwell.cloudsupport.gcp.auth

import java.io.StringWriter
import java.security.KeyPairGenerator
import java.util.Base64

import com.google.api.client.http.{HttpHeaders, HttpResponseException}
import common.assertion.CromwellTimeoutSpec
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode.OptionLookup
Expand Down Expand Up @@ -45,7 +41,7 @@ class GoogleAuthModeSpec extends AnyFlatSpec with CromwellTimeoutSpec with Match
}
}

object GoogleAuthModeSpec {
object GoogleAuthModeSpec extends ServiceAccountTestSupport {
def assumeHasApplicationDefaultCredentials(): Unit = {
tryApplicationDefaultCredentials match {
case Failure(exception) => cancel(exception.getMessage)
Expand All @@ -59,23 +55,6 @@ object GoogleAuthModeSpec {
()
}

private def toJson(contents: (String, String)*): String = {
// Generator doesn't matter as long as it generates JSON. Using `jsonFactory` to get an extra line hit of coverage.
val factory = GoogleAuthMode.jsonFactory
val writer = new StringWriter()
val generator = factory.createJsonGenerator(writer)
generator.enablePrettyPrint()
generator.writeStartObject()
contents foreach {
case (key, value) =>
generator.writeFieldName(key)
generator.writeString(value)
}
generator.writeEndObject()
generator.close()
writer.toString
}

lazy val userCredentialsContents: String = {
toJson(
"type" -> "authorized_user",
Expand All @@ -85,34 +64,6 @@ object GoogleAuthModeSpec {
)
}

lazy val serviceAccountPemContents: String = {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(1024)
val keyPair = keyPairGenerator.genKeyPair

// extract the encoded private key, this is an unencrypted PKCS#8 private key
val privateKey = keyPair.getPrivate
val byteEncoded = privateKey.getEncoded
val base64Encoded = Base64.getEncoder.encodeToString(byteEncoded)
s"""|-----BEGIN PRIVATE KEY-----
|$base64Encoded
|-----END PRIVATE KEY-----
|""".stripMargin
}

// Hide me from git secrets false positives
private val theStringThatShallNotBeNamed = List("private", "key").mkString("_")

lazy val serviceAccountJsonContents: String = {
toJson(
"type" -> "service_account",
"client_id" -> "the_account_id",
"client_email" -> "the_email",
theStringThatShallNotBeNamed -> serviceAccountPemContents,
s"${theStringThatShallNotBeNamed}_id" -> "the_key_id"
)
}

lazy val refreshTokenOptions: OptionLookup = Map("refresh_token" -> "the_refresh_token")
lazy val userServiceAccountOptions: OptionLookup = Map("user_service_account_json" -> serviceAccountJsonContents)
}
Loading

0 comments on commit 901826e

Please sign in to comment.