-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not delete "intermediary" files if they are outside of the workflo… #6538
Conversation
…w root execution directory [BW-825].
finalOutputs.outputs.values.toSet | ||
) | ||
|
||
actualIntermediateFiles shouldBe Set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I test that the log message was emitted?
input { | ||
} | ||
command { | ||
gsutil cp gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/no_input_delete/source_files/*.* gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/no_input_delete/test_execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this requires the source_files directory to have the appropriate data. We copy it over to test_execution so that if a previous test case failed due to the file being incorrectly deleted, the next run does not fail. But of course all test runs are hitting the same directory, so it's possible that a failing test could impact other test runs happening at "the same time".
There are other Centaur tests that reference test data in buckets outside of the ci folder. However, they do not live within cloud-cromwell-dev-self-cleaning (and they do not rely on modifying data), so this is definitely an unusual case, and I want to make sure folks are comfortable with it.
val potentialIntermediaries = allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val checkedIntermediaries = potentialIntermediaries.filter(p => rootWorkflowRoots.exists(r => p.pathAsString.startsWith(r.pathAsString))) | ||
for ( path <- potentialIntermediaries.diff(checkedIntermediaries) ) { | ||
log.warning(s"Did not delete $path because it is not contained within a workflow root directory for $rootWorkflowId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be warning or info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I verified the message is getting emitted when the centaur test runs:
no_input_delete complete. Final Outputs:
{
"no_input_delete.done": true
}
2021-10-01 20:26:34,467 cromwell-system-akka.dispatchers.engine-dispatcher-12 INFO - WorkflowExecutionActor-91b8fe05-ab57-4375-bb02-e5b5eceb05f4 [UUID(91b8fe05)]: Workflow no_input_delete_setup complete. Final Outputs:
{
"no_input_delete_setup.done": "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/no_input_delete/test_execution/no_input_delete.txt: Creation time: Fri, 01 Oct 2021 20:17:34 GMT Update time: Fri, 01 Oct 2021 20:17:34 GMT Storage class: MULTI_REGIONAL Content-Length: 34 Content-Type: text/plain Hash (crc32c): utmokA== Hash (md5): 8cj2wBOOYQBaVlnfmlvYxw== ETag: CIb43oeEqvMCEAE= Generation: 1633119454608390 Metageneration: 1"
}
2021-10-01 20:26:34,493 cromwell-system-akka.actor.default-dispatcher-4 WARN - Did not delete gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/no_input_delete/test_execution/no_input_delete.txt because it is not contained within a workflow root directory for 91b8fe05-ab57-4375-bb02-e5b5eceb05f4
2021-10-01 20:26:34,498 cromwell-system-akka.actor.default-dispatcher-4 INFO - Root workflow 91b8fe05-ab57-4375-bb02-e5b5eceb05f4 does not have any intermediate output files to delete.
2021-10-01 20:26:34,504 cromwell-system-akka.dispatchers.engine-dispatcher-29 INFO - WorkflowActor-91b8fe05-ab57-4375-bb02-e5b5eceb05f4 [UUID(91b8fe05)]: Successfully deleted intermediate output file(s) for root workflow 91b8fe05-ab57-4375-bb02-e5b5eceb05f4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be info. In the realm of the user's workflow, it is potentially a mistake - but in the realm of Cromwell the application that we monitor and maintain, it is working as designed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
def gatherIntermediateOutputFiles(allOutputs: Set[WomValue], finalOutputs: Set[WomValue]): Set[Path] = { | ||
val allOutputFiles = allOutputs.flatMap(getWomSingleFiles) | ||
val finalOutputFiles = finalOutputs.flatMap(getWomSingleFiles) | ||
allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val potentialIntermediaries = allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val checkedIntermediaries = potentialIntermediaries.filter(p => rootWorkflowRoots.exists(r => p.pathAsString.startsWith(r.pathAsString))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this "startsWith" check robust enough? Should I do be doing any sort of absolute path resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general the best way is to compare canonicalized paths (I don't have an example handy but the codebase should yield some)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see "canonical" in reference to Java Files (where I agree it's definitely the right method to use). Since these are our own Path objects, and they can refer to things like gcs paths, are we able to use the Java File getCanonicalPath method?
Our Path class does have a toAbsolutePath method, so I switched to using that and then the Path class startsWith method. I think that should be good, because there are Path-specific implementations of both methods (CloudStoragePath in this case). In reality though, I would expect that we are already dealing with absolute paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the right direction.
@@ -581,8 +582,10 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |||
|
|||
def deleteFiles() = { | |||
val rootWorkflowId = data.workflowDescriptor.get.rootWorkflowId | |||
val rootWorkflowRoots = data.initializationData.data.values.collect({case Some(i: StandardInitializationData) => i.workflowPaths.workflowRoot}).toSet[Path] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mild preference: make this a method on WorkflowActorData
or AllBackendInitializationData
to sequester this implementation behind a clearly named function (I don't know the code well enough to say which I prefer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it's much nicer within AllBackendInitializationData
.
val deleteActor = context.actorOf(DeleteWorkflowFilesActor.props( | ||
rootWorkflowId = rootWorkflowId, | ||
rootWorkflowRoots = rootWorkflowRoots, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mild preference: is there a way to name this that more clearly suggests its purpose, such as rootWorkflowProtectedPaths
? Especially since it's only used in deletion, seemingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I didn't like that name either. But I don't think "Protected" is the right thing to add because these aren't the paths that protected-- everything else is protected. I changed the variable to rootWorkflowRootPaths to try to clarify what the second "root" refers to.
def gatherIntermediateOutputFiles(allOutputs: Set[WomValue], finalOutputs: Set[WomValue]): Set[Path] = { | ||
val allOutputFiles = allOutputs.flatMap(getWomSingleFiles) | ||
val finalOutputFiles = finalOutputs.flatMap(getWomSingleFiles) | ||
allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val potentialIntermediaries = allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val checkedIntermediaries = potentialIntermediaries.filter(p => rootWorkflowRoots.exists(r => p.pathAsString.startsWith(r.pathAsString))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general the best way is to compare canonicalized paths (I don't have an example handy but the codebase should yield some)
val potentialIntermediaries = allOutputFiles.diff(finalOutputFiles).flatMap(toPath) | ||
val checkedIntermediaries = potentialIntermediaries.filter(p => rootWorkflowRoots.exists(r => p.pathAsString.startsWith(r.pathAsString))) | ||
for ( path <- potentialIntermediaries.diff(checkedIntermediaries) ) { | ||
log.warning(s"Did not delete $path because it is not contained within a workflow root directory for $rootWorkflowId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be info. In the realm of the user's workflow, it is potentially a mistake - but in the realm of Cromwell the application that we monitor and maintain, it is working as designed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small comment otherwise LGTM 👍 - I think it would be helpful (probably for debugging purposes later on) to add another INFO log here stating something like - Deleting files for root workflow <workflow_id>.
@salonishah11 we do already have a logged message "Successfully deleted intermediate output file(s) for root workflow $rootWorkflowIdForLogging." So unless we want to log a message for each individual file (which seems like it could be a lot… but maybe useful information?), I think we are already set with that. |
@cahrens Yes you are right. If we could somehow add the list of files that were deleted in that message or some other log message, in my opinion, it would be useful for debugging to figure out which files being deleted were associated with which workflow. But if you think it could be a lot of messages to add then I am fine with not adding it 👍 |
deleteCommands foreach sendIoCommand | ||
deleteCommands foreach { cmd => | ||
sendIoCommand(cmd) | ||
log.info(s"Calling '$cmd' to delete intermediary file for root workflow $rootWorkflowId.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to just print the file name instead of the entire IoCommand object. This might work:
log.info(s"Calling '${cmd.file.pathAsString}' to delete intermediary file for root workflow $rootWorkflowId.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified what I'm logging to only include the file path.
…w root execution directory BW-825.