-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy path__init__.py
193 lines (158 loc) · 6.58 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
BEELINE Run (:mod:`BLRun`) module contains the following main class:
- :class:`BLRun.BLRun` and three additional classes used in the definition of BLRun class
- :class:`BLRun.ConfigParser`
- :class:`BLRun.InputSettings`
- :class:`BLRun.OutputSettings`
"""
import yaml
import argparse
import itertools
from collections import defaultdict
from glob import glob
import pathlib
from pathlib import Path
import multiprocessing
from multiprocessing import Pool, cpu_count
import concurrent.futures
from typing import Dict, List
import yaml
import argparse
import itertools
from collections import defaultdict
from pathlib import Path
import multiprocessing
from multiprocessing import Pool, cpu_count
import concurrent.futures
from typing import Dict, List
from BLRun.runner import Runner
import os
import pandas as pd
class InputSettings(object):
def __init__(self,
datadir, datasets, algorithms) -> None:
self.datadir = datadir
self.datasets = datasets
self.algorithms = algorithms
class OutputSettings(object):
'''
Structure for storing the names of directories that output should
be written to
'''
def __init__(self, base_dir, output_prefix: Path) -> None:
self.base_dir = base_dir
self.output_prefix = output_prefix
class BLRun(object):
'''
The BLRun object is created by parsing a user-provided configuration
file. Its methods provide for further processing its inputs into
a series of jobs to be run, as well as running these jobs.
'''
def __init__(self,
input_settings: InputSettings,
output_settings: OutputSettings) -> None:
self.input_settings = input_settings
self.output_settings = output_settings
self.runners: Dict[int, Runner] = self.__create_runners()
def __create_runners(self) -> Dict[int, List[Runner]]:
'''
Instantiate the set of runners based on parameters provided via the
configuration file. Each runner is supplied an interactome, collection,
the set of algorithms to be run, and graphspace credentials, in
addition to the custom parameters each runner may or may not define.
'''
runners: Dict[int, Runner] = defaultdict(list)
order = 0
for dataset in self.input_settings.datasets:
for runner in self.input_settings.algorithms:
data = {}
data['name'] = runner[0]
data['params'] = runner[1]
data['inputDir'] = Path.cwd().joinpath(self.input_settings.datadir.joinpath(dataset['name']))
data['exprData'] = dataset['exprData']
data['cellData'] = dataset['cellData']
data['trueEdges'] = dataset['trueEdges']
if 'should_run' in data['params'] and \
data['params']['should_run'] is False:
print("Skipping %s" % (data['name']))
continue
runners[order] = Runner(data)
order += 1
return runners
def execute_runners(self, parallel=False, num_threads=1):
'''
Run each of the algorithms
'''
base_output_dir = self.output_settings.base_dir
batches = self.runners.keys()
for batch in batches:
if parallel==True:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
futures = [executor.submit(runner.run, base_output_dir)
for runner in self.runners[batch]]
# https://stackoverflow.com/questions/35711160/detect-failed-tasks-in-concurrent-futures
# Re-raise exception if produced
for future in concurrent.futures.as_completed(futures):
future.result()
executor.shutdown(wait=True)
else:
for runner in self.runners[batch]:
runner.run(output_dir=base_output_dir)
class ConfigParser(object):
'''
Define static methods for parsing a config file that sets a large number
of parameters for the pipeline
'''
@staticmethod
def parse(config_file_handle) -> BLRun:
config_map = yaml.full_load(config_file_handle)
return BLRun(
ConfigParser.__parse_input_settings(
config_map['input_settings']),
ConfigParser.__parse_output_settings(
config_map['output_settings']))
@staticmethod
def __parse_input_settings(input_settings_map) -> InputSettings:
input_dir = input_settings_map['input_dir']
dataset_dir = input_settings_map['dataset_dir']
# Check if datasets have been specified or not
if 'datasets' in input_settings_map:
datasets_specified = True
else:
datasets_specified = False
# If no datasets specified, run all datasets in dataset_dir
if datasets_specified is False:
subfolder_dir = glob(os.path.join(input_dir, dataset_dir, "*/"), recursive = True)
datasets = []
for x in subfolder_dir:
datasets.append({"name": pathlib.Path(x).name,
"exprData": "ExpressionData.csv",
"cellData": "PseudoTime.csv",
"trueEdges": "refNetwork.csv"})
# If datasets specified, run the corresponding datasets
else:
datasets = input_settings_map['datasets']
if datasets is None:
print("Please specify input datasets!")
return InputSettings(
Path(input_dir, dataset_dir),
datasets,
ConfigParser.__parse_algorithms(
input_settings_map['algorithms']))
@staticmethod
def __parse_algorithms(algorithms_list):
algorithms = []
for algorithm in algorithms_list:
combos = [dict(zip(algorithm['params'], val))
for val in itertools.product(
*(algorithm['params'][param]
for param in algorithm['params']))]
for combo in combos:
algorithms.append([algorithm['name'],combo])
return algorithms
@staticmethod
def __parse_output_settings(output_settings_map) -> OutputSettings:
output_dir = Path(output_settings_map['output_dir'])
output_prefix = Path(output_settings_map['output_prefix'])
return OutputSettings(output_dir,
output_prefix)