Skip to content

Commit

Permalink
Adjust import workflow configuration to support S3 (#357)
Browse files Browse the repository at this point in the history
* A simple modification to the dataset import "marking mechanism" inferrence, to allow multiple mechanisms per study in some cases.

* Add s3 support to workflow configure

* Add note about variables unset

* Version bump
  • Loading branch information
jimmymathews authored Sep 20, 2024
1 parent 6007294 commit 26de250
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml.unversioned
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ workflow = [
"tabulate==0.9.0",
"Pillow==9.5.0",
"tables==3.9.2",
"boto3==1.35.23",
]
all = [
"bokeh==3.4.1",
Expand All @@ -99,6 +100,7 @@ all = [
"secure==0.3.0",
"tables==3.9.2",
"tqdm==4.66.4",
"boto3==1.35.23",
]
dev = [
"autopep8",
Expand Down
132 changes: 113 additions & 19 deletions spatialprofilingtoolbox/workflow/scripts/configure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""CLI utility to configure an SPT workflow run."""

from typing import Literal
from typing import cast
import re
from argparse import ArgumentParser
from argparse import RawDescriptionHelpFormatter
Expand All @@ -9,8 +10,8 @@
chmod,
makedirs,
)
from os import environ as os_environ
from os.path import (
isdir,
exists,
join,
abspath,
Expand All @@ -20,7 +21,9 @@
from configparser import ConfigParser
from importlib.resources import as_file
from importlib.resources import files
from typing import cast
from attr import define
from boto3 import client as boto3_client
from botocore.exceptions import ClientError

from spatialprofilingtoolbox import __version__ as SPT_VERSION

Expand Down Expand Up @@ -101,6 +104,12 @@ def _record_configuration_command(variables: dict[str, str], configuration_file:

with open('run.sh', 'wt', encoding='utf-8') as file:
file.write('#!/bin/sh\n\n')
if 'input_path' in variables and source_of_reference(variables['input_path']) == 's3':
file.write('# Unsetting the below is a workaround for Nextflow\'s default non-support for session-specific credentials.\n')
file.write('# This is intended to force Nextflow to fall back to ~/.aws/credentials, for which Nextflow DOES support\n')
file.write('# session-specific credentials.\n')
file.write('unset AWS_ACCESS_KEY_ID\n')
file.write('unset AWS_SECRET_ACCESS_KEY\n\n')
file.write('nextflow run .\n')

file_stat = stat('configure.sh')
Expand All @@ -109,94 +118,162 @@ def _record_configuration_command(variables: dict[str, str], configuration_file:
chmod('run.sh', file_stat.st_mode | S_IEXEC)


def source_of_reference(path_or_uri: str) -> Literal['s3', 'local']:
if re.search('^s3://', path_or_uri):
return 's3'
return 'local'


@define
class SPTS3Resource:
bucket: str
dataset: str
filename: str | None

def is_directory(self) -> bool:
return self.filename is None

def get_key_string(self) -> str:
if self.filename is not None:
return '/'.join([self.dataset, self.filename])
raise ValueError('This resource represents a directory, has no "Key" serialization.')


def parse_s3_reference(uri: str) -> SPTS3Resource:
match = re.search(r'^s3://([\w\-]+)/([\w\-]+)/([\w\-\.]+)$', uri)
if match:
groups3 = cast(tuple[str, str, str], match.groups())
return SPTS3Resource(*groups3)
else:
match = re.search(r'^s3://([\w\-]+)/([\w\-]+)/?$', uri)
if match:
groups2 = cast(tuple[str, str], match.groups())
return SPTS3Resource(*groups2, None)
raise ValueError(f'Could not parse uri "{uri}" as S3 resource for SPT dataset.')


def exists_s3_or_local(path_or_uri: str) -> bool:
source = source_of_reference(path_or_uri)
if source == 's3':
uri = path_or_uri
resource = parse_s3_reference(uri)
client = boto3_client('s3')
if resource.is_directory():
listing = client.list_objects(Bucket=resource.bucket, Prefix=resource.dataset)
return 'Contents' in listing
else:
try:
client.head_object(Bucket=resource.bucket, Key=resource.get_key_string())
except ClientError as error:
if error.response['Error']['Code'] == '404':
return False
raise ValueError('When checking file existence, got an unrelated access error.')
return True
if source == 'local':
path = path_or_uri
return exists(path)
raise ValueError(f'Reference is neither a local path nor a URI: {path_or_uri}')


def _process_filename_inputs(options: dict[str, str | bool]) -> None:
if not 'input_path' in options:
return
input_path = cast(str, options['input_path'])
del options['input_path']
if isdir(input_path):
if exists_s3_or_local(input_path):
file_manifest_path = join(input_path, 'file_manifest.tsv')
if exists(file_manifest_path):
if exists_s3_or_local(file_manifest_path):
options['input_path'] = input_path
options['file_manifest_filename'] = file_manifest_path
else:
raise FileNotFoundError(file_manifest_path)
else:
if source_of_reference(input_path) == 's3':
resource = parse_s3_reference(input_path)
raise ValueError(f'\nS3 URI is wrong: {input_path}\nCheck the:\n bucket: "{resource.bucket}"\n dataset: "{resource.dataset}"\n')
raise FileNotFoundError(input_path)

if source_of_reference(file_manifest_path) == 's3':
local_file_manifest = '_file_manifest.temp.tsv'
resource = parse_s3_reference(file_manifest_path)
client = boto3_client('s3')
client.download_file(resource.bucket, resource.get_key_string(), local_file_manifest)
else:
local_file_manifest = file_manifest_path

samples_file = get_input_filename_by_identifier(
input_file_identifier='Samples file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
options['samples'] = False
if not samples_file is None:
samples_file_abs = join(input_path, samples_file)
if exists(samples_file_abs):
if exists_s3_or_local(samples_file_abs):
options['samples_file'] = samples_file_abs
options['samples'] = True

subjects_file = get_input_filename_by_identifier(
input_file_identifier='Subjects file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
options['subjects'] = False
if not subjects_file is None:
subjects_file_abs = join(input_path, subjects_file)
if exists(subjects_file_abs):
if exists_s3_or_local(subjects_file_abs):
options['subjects_file'] = subjects_file_abs
options['subjects'] = True

study_file = get_input_filename_by_identifier(
input_file_identifier='Study file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
if not study_file is None:
study_file_abs = join(input_path, study_file)
if not exists(study_file_abs):
if not exists_s3_or_local(study_file_abs):
raise FileNotFoundError(f'Did not find study file ({study_file}).')
options['study_file'] = study_file_abs
options['study'] = True

diagnosis_file = get_input_filename_by_identifier(
input_file_identifier='Diagnosis file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
if not diagnosis_file is None:
diagnosis_file_abs = join(input_path, diagnosis_file)
if not exists(diagnosis_file_abs):
if not exists_s3_or_local(diagnosis_file_abs):
raise FileNotFoundError(f'Did not find diagnosis file ({diagnosis_file}).')
options['diagnosis_file'] = diagnosis_file_abs
options['diagnosis'] = True

interventions_file = get_input_filename_by_identifier(
input_file_identifier='Interventions file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
if not interventions_file is None:
interventions_file_abs = join(input_path, interventions_file)
if not exists(interventions_file_abs):
if not exists_s3_or_local(interventions_file_abs):
raise FileNotFoundError(f'Did not find interventions file ({interventions_file}).')
options['interventions_file'] = interventions_file_abs
options['interventions'] = True

channels_file = get_input_filename_by_identifier(
input_file_identifier='Channels file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
if not channels_file is None:
channels_file_abs = join(input_path, channels_file)
if not exists(channels_file_abs):
if not exists_s3_or_local(channels_file_abs):
raise FileNotFoundError(f'Did not find channels file ({channels_file}).')
options['channels_file'] = channels_file_abs
options['channels'] = True

phenotypes_file = get_input_filename_by_identifier(
input_file_identifier='Phenotypes file',
file_manifest_filename=file_manifest_path,
file_manifest_filename=local_file_manifest,
)
if not phenotypes_file is None:
phenotypes_file_abs = join(input_path, phenotypes_file)
if not exists(phenotypes_file_abs):
if not exists_s3_or_local(phenotypes_file_abs):
raise FileNotFoundError(f'Did not find phenotypes file ({phenotypes_file}).')
options['phenotypes_file'] = phenotypes_file_abs
options['phenotypes'] = True
Expand Down Expand Up @@ -323,6 +400,23 @@ def parse_arguments():

if not workflow_configuration.is_database_visitor:
_process_filename_inputs(config_variables)
if source_of_reference(config_variables['input_path']) == 's3':
def copy_profile_from_saml_to_default():
if 'AWS_PROFILE' in os_environ:
profile = os_environ['AWS_PROFILE']
elif 'SAML2AWS_PROFILE' in os_environ:
profile = os_environ['SAML2AWS_PROFILE']
else:
print('Warning: No AWS profile could be determined, implicitly using "default".')
return
config = ConfigParser()
filename = expanduser('~/.aws/credentials')
config.read(filename)
config['default'] = config[profile]
with open(filename, 'wt', encoding='utf-8') as file:
config.write(file)
print(f'Note: Overwrote "default" profile in ~/.aws/credentials with "{profile}" values.')
copy_profile_from_saml_to_default()

_write_config_file(config_variables)
_write_pipeline_script(workflow_configuration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Source file parsing for imaging/feature-assessment channel metadata."""
from typing import cast
import re

from pandas import DataFrame
from pandas import Series
Expand Down Expand Up @@ -219,6 +220,10 @@ def _infer_common_marking_mechanism(channels: DataFrame) -> str:
mechanisms = list(set(row['Marking mechanism'] for i, row in channels.iterrows()))
if len(mechanisms) > 1:
logger.warning('Encountered multiple marking mechanisms: %s', mechanisms)
computational = [m for m in mechanisms if re.search('[Cc]omputational [Ii]dentification', m)]
for mechanism in computational:
logger.info(f'Excluding "{mechanism}" as computationally-generated, not the primary marking mechanism.')
mechanisms.remove(mechanism)
if len(mechanisms) == 1:
mechanism = mechanisms[0]
else:
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.30.0
0.31.0

0 comments on commit 26de250

Please sign in to comment.