-
Notifications
You must be signed in to change notification settings - Fork 18
/
pipeline.py
79 lines (61 loc) · 2.49 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import kfp.dsl as dsl
from kfp.dsl import PipelineVolume
# To compile the pipeline:
# dsl-compile --py pipeline.py --output pipeline.tar.gz
from constants import PROJECT_ROOT, CONDA_PYTHON_CMD
def git_clone_op(repo_url: str):
image = 'alpine/git:latest'
commands = [
f"git clone {repo_url} {PROJECT_ROOT}",
f"cd {PROJECT_ROOT}"]
volume_op = dsl.VolumeOp(
name="create pipeline volume",
resource_name="pipeline-pvc",
modes=["ReadWriteOnce"],
size="3Gi"
)
op = dsl.ContainerOp(
name='git clone',
image=image,
command=['sh'],
arguments=['-c', ' && '.join(commands)],
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": volume_op.volume}
)
return op
def preprocess_op(image: str, pvolume: PipelineVolume, data_dir: str):
return dsl.ContainerOp(
name='preprocessing',
image=image,
command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/preprocessing.py"],
arguments=["--data_dir", data_dir],
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": pvolume}
)
def train_and_eval_op(image: str, pvolume: PipelineVolume, data_dir: str, ):
return dsl.ContainerOp(
name='training and evaluation',
image=image,
command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/train.py"],
arguments=["--data_dir", data_dir],
file_outputs={'output': f'{PROJECT_ROOT}/output.txt'},
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": pvolume}
)
@dsl.pipeline(
name='Fashion MNIST Training Pipeline',
description='Fashion MNIST Training Pipeline to be executed on KubeFlow.'
)
def training_pipeline(image: str = 'benjamintanweihao/kubeflow-mnist',
repo_url: str = 'https://github.com/benjamintanweihao/kubeflow-mnist.git',
data_dir: str = '/workspace'):
git_clone = git_clone_op(repo_url=repo_url)
preprocess_data = preprocess_op(image=image,
pvolume=git_clone.pvolume,
data_dir=data_dir)
_training_and_eval = train_and_eval_op(image=image,
pvolume=preprocess_data.pvolume,
data_dir=data_dir)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(training_pipeline, __file__ + '.tar.gz')