-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathexample-minimal.py
56 lines (44 loc) · 1.42 KB
/
example-minimal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import d6tflow
import pandas as pd
# define 2 tasks that load raw data
class Task1(d6tflow.tasks.TaskCache):
def run(self):
df = pd.DataFrame({'a': range(3)})
self.save(df) # quickly save dataframe
class Task2(Task1):
pass
# define another task that depends on data from task1 and task2
@d6tflow.requires({'input1': Task1, 'input2': Task2})
class Task3(d6tflow.tasks.TaskCache):
multiplier = d6tflow.IntParameter(default=2)
def run(self):
df1 = self.input()['input1'].load() # quickly load input data
df2 = self.input()['input2'].load() # quickly load input data
df = df1.join(df2, lsuffix='1', rsuffix='2')
df['b'] = df['a1'] * self.multiplier # use task parameter
self.save(df)
# Execute task including all its dependencies
flow = d6tflow.Workflow(Task3)
flow.run()
'''
* 3 ran successfully:
- 1 Task1()
- 1 Task2()
- 1 Task3(multiplier=2)
'''
# quickly load output data. Task1().outputLoad() also works
flow.outputLoad()
'''
a1 a2 b
0 0 0 0
1 1 1 2
2 2 2 4
'''
# Intelligently rerun workflow after changing parameters
flow2 = d6tflow.Workflow(Task3, {'multiplier': 3})
flow2.preview()
'''
└─--[Task3-{'multiplier': '3'} (PENDING)] => this changed and needs to run
|--[Task1-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
└─--[Task2-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
'''