{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Lesson 2-3: Inference Pipelines\n", "\n", "[![Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/zenml-io/zenbytes/blob/main/2-3_Inference_Pipelines.ipynb)\n", "\n", "***Key Concepts:*** *Inference Pipelines*\n", "\n", "In the last lesson, we learned how to add model deployment as a step in our ML pipeline, which allows us to automatically deploy models into production after training them. We also saw how to interact with the served model manually.\n", "\n", "In practice, querying the model is just one of many steps you would have to perform at inference time. Whenever you receive a request, you might need to preprocess the data you received, and you might also have some postprocessing code that you want to run after your model, like converting outputs to a different format, sending alerts, etc.\n", "\n", "That is why it makes sense to use ML pipelines not only for model training but also for inference. To prevent [training-serving skew](https://developers.google.com/machine-learning/guides/rules-of-ml#training-serving_skew), we might want to reuse some of the steps from our training pipeline when defining the inference pipeline. This is particularly important for steps like data preprocessing, which we expect to behave similarly in both environments.\n", "\n", "Note that the two pipelines are decoupled and can run independently from each other. In practice, the overall workflow looks like this: \n", "1. We run the training pipeline to train and deploy a model,\n", "2. Whenever an inference request comes in, the inference pipeline sends data to the currently deployed model and receives the corresponding model prediction,\n", "3. Whenever we rerun the training pipeline, a new model will be trained and deployed that will overwrite the previously deployed model (or slowly phase it out).\n", "\n", "![Training and Inference Pipelines GIF](_assets/2-3/training_inference_pipelines.gif)\n", "\n", "In this notebook, we will build a rudimentary inference pipeline to interact with our served model. \n", "The pipeline will consist of the following three steps:\n", "1. Load a data sample\n", "2. Load the model (prediction service)\n", "3. Inference the model on the data sample\n", "\n", "First, let's import zenml and rebuild and rerun the deployment pipeline from the last lesson:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install \"zenml[server]\"\n", "!zenml integration install sklearn mlflow -y\n", "!rm -rf .zen\n", "!zenml init\n", "!zenml experiment-tracker register mlflow_tracker --flavor=mlflow\n", "!zenml model-deployer register mlflow --flavor=mlflow\n", "!zenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker\n", "!zenml stack set mlflow_stack\n", "\n", "%pip install pyparsing==2.4.2 # required for Colab\n", "\n", "import IPython\n", "\n", "# automatically restart kernel\n", "IPython.Application.instance().kernel.do_shutdown(restart=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from zenml.environment import Environment\n", "\n", "if Environment.in_google_colab(): # Colab only setup\n", "\n", " # clone zenbytes repo to get source code of previous lessons\n", " !git clone https://github.com/zenml-io/zenbytes.git # noqa\n", " !mv zenbytes/steps .\n", " !mv zenbytes/pipelines ." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from zenml.integrations.mlflow.steps import (\n", " MLFlowDeployerParameters, \n", " mlflow_model_deployer_step\n", ")\n", "\n", "from pipelines.training_pipeline import train_evaluate_deploy_pipeline\n", "from steps.deployment_trigger import deployment_trigger\n", "from steps.evaluator import evaluator\n", "from steps.importer import importer\n", "from steps.mlflow_trainer import svc_trainer_mlflow\n", "\n", "train_evaluate_deploy_pipeline(\n", " importer=importer(),\n", " trainer=svc_trainer_mlflow(),\n", " evaluator=evaluator(),\n", " deployment_trigger=deployment_trigger(),\n", " model_deployer=mlflow_model_deployer_step(\n", " MLFlowDeployerParameters(timeout=20)\n", " ), # new\n", ").run(unlisted=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we are ready to build our inference pipeline:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from zenml.pipelines import pipeline\n", "\n", "\n", "@pipeline\n", "def inference_pipeline(\n", " inference_data_loader,\n", " prediction_service_loader,\n", " predictor,\n", "):\n", " \"\"\"Basic inference pipeline.\"\"\"\n", " inference_data = inference_data_loader()\n", " model_deployment_service = prediction_service_loader()\n", " predictor(model_deployment_service, inference_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In practice, the inference data loader might receive a single sample from an API request or load a batch of data from a data lake or similar. For simplicity, we will mock this component for now and just load an 8x8 random noise image." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from zenml.steps import step\n", "\n", "\n", "@step\n", "def inference_data_loader() -> np.ndarray:\n", " \"\"\"Load some inference data.\"\"\"\n", " return np.random.rand(1, 64) # flattened 8x8 random noise image" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's define the `prediction_service_loader` step. We can use the exact same code here that we used for manually querying the model service in the last lesson, just wrapped in a ZenML step:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from zenml.services import BaseService\n", "from zenml.client import Client\n", "from zenml.steps import step, Output\n", "\n", "\n", "@step(enable_cache=False)\n", "def prediction_service_loader() -> BaseService:\n", " \"\"\"Load the model service of our train_evaluate_deploy_pipeline.\"\"\"\n", " client = Client()\n", " model_deployer = client.active_stack.model_deployer\n", " services = model_deployer.find_model_server(\n", " pipeline_name=\"train_evaluate_deploy_pipeline\",\n", " pipeline_step_name=\"model_deployer\",\n", " running=True,\n", " )\n", " service = services[0]\n", " return service" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, let's write the `predictor` step that will inference our served model on the inference data sample. This step will start the service, call its `predict()` endpoint to get logits, and then perform an `argmax` operation to retrieve the class with the highest predicted probability." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@step\n", "def predictor(\n", " service: BaseService,\n", " data: np.ndarray,\n", ") -> Output(predictions=list):\n", " \"\"\"Run a inference request against a prediction service\"\"\"\n", " service.start(timeout=10) # should be a NOP if already started\n", " prediction = service.predict(data)\n", " prediction = prediction.argmax(axis=-1)\n", " print(f\"Prediction is: {[prediction.tolist()]}\")\n", " return [prediction.tolist()]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's put it all together to initialize and run our inference pipeline:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize an inference pipeline run\n", "my_inference_pipeline = inference_pipeline(\n", " inference_data_loader=inference_data_loader(),\n", " prediction_service_loader=prediction_service_loader(),\n", " predictor=predictor(),\n", ")\n", "\n", "my_inference_pipeline.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And that completes our second ZenBytes chapter on training, deployment, and inference. Our training and inference pipelines are still relatively basic, but we will add more and more features over the coming lessons.\n", "\n", "In the next chapter on data management, we will add additional steps for data validation and drift detection to our pipelines, which are essential steps to ensure that our models receive the kind of data we expect. See you in the [next lesson](3-1_Data_Skew.ipynb)!" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.13 64-bit ('zenbytes-dev')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "vscode": { "interpreter": { "hash": "ec45946565c50b1d690aa5a9e3c974f5b62b9cc8d8934e441e52186140f79402" } } }, "nbformat": 4, "nbformat_minor": 2 }