- Kubernetes For local running, install Docker Desktop (For Mac) or minikube (For Linux). If on Mac, enable Kubernetes on Docker -> Preference -> Kubernetes.
- Pachyderm
# For macOS:
$ brew tap pachyderm/tap && brew install pachyderm/tap/pachctl@1.9
# For Debian based linux (64 bit) or Window 10+ on WSL:
$ curl -o /tmp/pachctl.deb -L https://github.com/pachyderm/pachyderm/releases/download/v1.9.0rc1/pachctl_1.9.0rc1_amd64.deb && sudo dpkg -i /tmp/pachctl.deb
# For all other linux flavors
$ curl -o /tmp/pachctl.tar.gz -L https://github.com/pachyderm/pachyderm/releases/download/v1.9.0rc1/pachctl_1.9.0rc1_linux_amd64.tar.gz && tar -xvf /tmp/pachctl.tar.gz -C /tmp && sudo cp /tmp/pachctl_1.9.0rc1_linux_amd64/pachctl /usr/local/bin
- Deploy Pachyderm on Kubernetes
pachctl deploy local
It will take a while, run kubectl get pods
to see if it's done.
- Install pyenv and pipenv
In model/test_model.py
compare a few models and see their metrics, with MovieLens 100K dataset. Replace SVD
with other models or
customized models to see the 5-Fold cross validation for different models. Finally I choose
SVD for it's fast and adequate accuracy. Other available models (import from surprise
):
BaselineOnly,
CoClustering,
KNNBaseline,
KNNBasic,
KNNWithMeans,
KNNWithZScore,
NMF,
NormalPredictor,
SVD,
SVDpp,
SlopeOne
Adding a new algorithm is straight forward:
from surprise import AlgoBase
class MyOwnAlgorithm(AlgoBase):
def __init__(self):
# Always call base method before doing anything.
AlgoBase.__init__(self)
def fit(self, trainset):
# Here again: call base method before doing anything.
AlgoBase.fit(self, trainset)
# Doing something with trainset, this is the train part of our algorithm
# self.some_information = process(trainset)
# Finally return the algorithm object
return self
def estimate(self, u, i):
# u is the user id, i is the item id (here movie)
# calculate rate, this is the prediction part of our algorithm
rate = ...
# return the predicted rate
return rate
Due to the limitation of scikit-surprise, the model can only take (user_id movie_id) so I think next improvement will be taking other attributes in dataset, like user's age, country, movie's genre, year, etc. and extend the algorithm and scikit-surprise framework to fit/predict with that information.
In Pachyderm, the whole data processing system is called a Workflow. Shortly introduced the concepts and the human workflow of using Pachyderm here.
In Pachyderm, data is putting in Repos. This repo is similar to a Git repo that has branch and commit. So it's like a version control but for data. Repo is used both as input and output for Pachyderm pipelines. Because data changing happens like Git commit, the version history of all data is available and you can rollback to your previous version of data without implement that logic in code. For example I keep the trained model in train
repo, I automatically get every version of my trained model weights without extra work. And Pachyderm will calculate and store the different in data, so it doesn't use many disk space.
Repos can be created manually and also by the execution of a Pachyderm pipeline output. For this workflow I create two repos training
and streaming
:
pachctl create repo training
pachctl create repo streaming
training
repo will receive input data from the REST API (discussed and implemented in later section) to train and update model. streaming
repo will also receive data from the REST API of user_id movie_id
to be predicted. Two repos are generated by my pipelines, discussed in next sections.
A Pipeline in Pachyderm is provided as user code that built into a docker image. So I write the program -> build a docker image -> push docker image -> create/update pipeline spec in Pachyderm. Pachyderm will run the pipeline as a service, which will watch if there's any new data in the input Repo, if so, launch a Pachyderm job, to be a single execution of my code. And if everything goes well the job succeed, Pachyderm will collect output and make it a commit to the output Repo. The output repo will always be same name as the pipeline. And if that Repo is input of another pipeline, that pipeline will trigger job to execute. So you don't need to worry about the workflow of this going on process.
The first pipeline is the train part. The program, dockerfile and pipeline spec file is under /train
dir. The program is checking if there's a saved model from previous runs. If so, it loads that saved model. If not, it trains a model with MovieLens 100K dataset. In both case, this pipeline is executed because there's new training data comming in. It trains the model comming in data with the model and save the updated model to output Repo: train
. Steps to deploy the pipeline from scratch:
cd train
# Replace yifangma93 to your docker account
docker build . -t yifangma93/movie-recommend-train
docker push yifangma93/movie-recommend-train
# Also replace yifangma93 in pipeline.json to your docker account
pachctl create pipeline -f pipeline.json
You can also used my pre built docker image, that's only run pachctl create pipeline -f pipeline.json
.
Predict pipeline files are under /predict
dir. The pipeline is executed if there's new model saved or new data to predict come in streaming
Repo and the program will do predict with the latest saved model and output prediction result (rating). Result will be collected by Pachyderm and put in predict
Repo. Steps to deploy the pipeline from scratch:
cd predict
# Replace yifangma93 to your docker account
docker build . -t yifangma93/movie-recommend-predict
docker push yifangma93/movie-recommend-predict
# Also replace yifangma93 in pipeline.json to your docker account
pachctl create pipeline -f pipeline.json
You can also used my pre built docker image, that's only run pachctl create pipeline -f pipeline.json
.
Run pachctl list jobs
to see job status. If it fails, copy the job_ID and:
pachctl logs --job=<job_ID>
pachctl logs --pipeline=<pipeline_name>
Traceback in the python code will be displayed. Fix them and rebuild docker image, push docker image and update Pachyderm pipeline:
pachctl update pipeline -f pipeline.json
Pachyderm workflow will improve the model with new training data and will make prediction when new data to predict submitted. To protect it against invalid output and also provide an abstract interface that other service can interact with it, I build a REST API to interact with this Pachyderm workflow. API is a Flask app, in /api
directory. To run it:
pipenv sync
pipenv run flask run
A sample session with it:
- Add some new training data, in the format "user_id movie_id rating timestamp":
curl -i -X POST \
-H "Content-Type:application/json" \
-d \
'{
"data": ["244 346 2 880606923",
"244 355 3 880606924"]
}' \
'http://localhost:5000/train'
# ->
# {
# "filename": "train-2019-05-09T16:33:25.613314.txt"
# }
- Sumbit some data to predict, in the format "user_id movie_id":
curl -i -X POST \
-H "Content-Type:application/json" \
-d \
'{
"data": ["233 346", "125 378"]
}' \
'http://localhost:5000/predict'
# ->
# {
# "filename": "predict-2019-05-09T16:35:04.313814.txt"
# }
- Fetch prediction result:
curl -i -X POST \
-H "Content-Type:application/json" \
-d \
'{
"filename": "predict-2019-05-09T16:35:04.313814.txt"
}' \
'http://localhost:5000/result'
# -> {
# "data": "233 346 4.3221557752788815\n125 378 3.857757926778991\n",
# "ready": true
# }
- If you do the same prediction without further training you'll get the same result (
POST /predict
thenPOST /result
) - If you submit some new data to
/train
thenPOST /predict
,POST /result
will give you different prediction. Our model updates! - Run
pachctl list jobs
to see a history of train and predict pipeline.