Skip to content
This repository has been archived by the owner on Jan 29, 2024. It is now read-only.

Commit

Permalink
Refactor run.py to run with both seldon and mlflow and other improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
stefannica committed Apr 13, 2022
1 parent 621f305 commit 97e9aa6
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 354 deletions.
22 changes: 22 additions & 0 deletions pipelines/inference_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from zenml.pipelines import pipeline

# Path to a pip requirements file that contains requirements necessary to run
# the pipeline


@pipeline(
enable_cache=False,
requirements_file="../requirements.txt",
required_integrations=["seldon", "mlflow", "evidently"],
)
def inference_pipeline(
dynamic_importer,
prediction_service_loader,
predictor,
):
# Link all the steps artifacts together
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader()
predictor(model_deployment_service, batch_data)
25 changes: 15 additions & 10 deletions pipelines/training_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os

from zenml.pipelines import pipeline

# Path to a pip requirements file that contains requirements necessary to run
# the pipeline
DEPLOYER_TAKES_IN_MODEL=False


@pipeline(enable_cache=False, requirements_file='../requirements.txt', required_integrations=['seldon', 'mlflow', 'evidently'])
@pipeline(
enable_cache=False,
requirements_file="../requirements.txt",
required_integrations=["seldon", "mlflow", "evidently"],
)
def continuous_deployment_pipeline(
importer,
trainer,
Expand All @@ -20,12 +22,15 @@ def continuous_deployment_pipeline(
X_train, X_test, y_train, y_test = importer()
model = trainer(X_train=X_train, y_train=y_train)
evaluator(X_test=X_test, y_test=y_test, model=model)

reference, comparison = get_reference_data(X_train, X_test)
drift_report, _ = drift_detector(reference, comparison)

alerter(drift_report)
# new

# new
deployment_decision = deployment_trigger(drift_report)
model_deployer(deployment_decision, model)
if DEPLOYER_TAKES_IN_MODEL:
model_deployer(deployment_decision, model)
else:
model_deployer(deployment_decision)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
git+https://github.com/zenml-io/zenml.git@misc/zenbytes
zenml
notebook
235 changes: 152 additions & 83 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,48 @@
from rich import print
from datetime import datetime

import pipelines.training_pipeline

from pipelines.training_pipeline import continuous_deployment_pipeline
from pipelines.inference_pipeline import inference_pipeline
from steps.deployment_trigger import deployment_trigger
from steps.discord_bot import discord_alert
from steps.dynamic_importer import dynamic_importer
from steps.evaluator import evaluator
from steps.importer import importer, get_reference_data
from steps.trainer import svc_trainer_mlflow # type: ignore [import]
from steps.seldon_deployer import SeldonDeployerConfig, seldon_model_deployer
from steps.mlflow_service_loader import (
mlflow_service_loader,
MLFlowDeploymentLoaderStepConfig,
)
from steps.predictor import predictor
from steps.seldon_service_loader import (
seldon_service_loader,
SeldonDeploymentLoaderStepConfig,
)
from steps.trainer import svc_trainer_mlflow # type: ignore [import]


from zenml.pipelines import Schedule
from zenml.repository import Repository
from zenml.services import load_last_service_from_step

from zenml.integrations.evidently.steps import (
EvidentlyProfileConfig,
EvidentlyProfileStep,
)

from zenml.integrations.mlflow.steps import mlflow_deployer_step, MLFlowDeployerConfig

from zenml.integrations.seldon.model_deployers import SeldonModelDeployer
from zenml.integrations.seldon.services import (
SeldonDeploymentConfig,
SeldonDeploymentService,
)
from zenml.integrations.seldon.steps import (
SeldonDeployerStepConfig,
seldon_model_deployer_step,
)


@click.command()
@click.option(
Expand All @@ -45,121 +72,163 @@
help="Run the inference pipeline to send a prediction request "
"to the deployed model",
)
@click.option("--interval-second", help="How long between scheudle pipelines.", type=int, default=None)
@click.option("--kubernetes-context", help="Kubernetes context to use.")
@click.option("--namespace", help="Kubernetes namespace to use.")
@click.option("--base-url", help="Seldon core ingress base URL.")
@click.option(
"--stop-service",
is_flag=True,
default=False,
help="Stop the prediction service when done",
"--interval-second",
help="How long between schedule pipelines.",
type=int,
default=None,
)
@click.option(
"--secret",
"-x",
type=str,
default="seldon-init-container-secret",
help="Specify the name of a Kubernetes secret to be passed to Seldon Core "
"deployments to authenticate to the Artifact Store",
)
def main(
deploy: bool,
predict: bool,
interval_second: int,
kubernetes_context: str,
namespace: str,
base_url: str,
stop_service: bool,
secret: str,
):
"""Run the Seldon example continuous deployment or inference pipeline
"""Run the example continuous deployment or inference pipeline
Example usage:
python run.py --deploy --predict \
--kubernetes-context=zenml-eks-sandbox \
--namespace=kubeflow \
--base-url=http://abb84c444c7804aa98fc8c097896479d-377673393.us-east-1.elb.amazonaws.com \
python run.py --deploy --predict --min-accuracy 0.80 \
--secret seldon-init-container-secret
"""

# detect the active model deployer and use Seldon Core or MLflow
# depending on what's available
model_deployer = Repository().active_stack.model_deployer
use_seldon = model_deployer and isinstance(model_deployer, SeldonModelDeployer)
pipelines.training_pipeline.DEPLOYER_TAKES_IN_MODEL = use_seldon

evidently_profile_config = EvidentlyProfileConfig(
column_mapping=None,
profile_sections=["datadrift"])

if stop_service:
service = load_last_service_from_step(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
running=True,
)
if service:
service.stop(timeout=100)
return
column_mapping=None, profile_sections=["datadrift"]
)

if deploy:

if use_seldon:
model_deployer_step = seldon_model_deployer_step(
config=SeldonDeployerStepConfig(
service_config=SeldonDeploymentConfig(
model_name="mnist",
replicas=1,
implementation="SKLEARN_SERVER",
secret_name=secret,
),
timeout=120,
)
)
else:
model_deployer_step = mlflow_deployer_step(name="model_deployer")(
config=MLFlowDeployerConfig(workers=1)
)

# Initialize a continuous deployment pipeline run
deployment = continuous_deployment_pipeline(
importer=importer(),
trainer=svc_trainer_mlflow(),
evaluator=evaluator(),

# EvidentlyProfileStep takes reference_dataset and comparison dataset
get_reference_data=get_reference_data(),
drift_detector=EvidentlyProfileStep(config=evidently_profile_config),

# Add discord
alerter=discord_alert(),

deployment_trigger=deployment_trigger(),
model_deployer=seldon_model_deployer(
config=SeldonDeployerConfig(
model_name="mnist",
step_name="model_deployer",
replicas=1,
implementation="SKLEARN_SERVER",
secret_name="seldon-init-container-secret",
kubernetes_context=kubernetes_context,
namespace=namespace,
base_url=base_url,
timeout=120,
)
),
model_deployer=model_deployer_step,
)

if interval_second is not None:
deployment.run(
schedule=Schedule(start_time=datetime.now(), interval_second=interval_second)
schedule=Schedule(
start_time=datetime.now(), interval_second=interval_second
)
)
else:
deployment.run()

if predict:
# Coming soon

if use_seldon:
service_loader_step = seldon_service_loader(
config=SeldonDeploymentLoaderStepConfig(
pipeline_name="continuous_deployment_pipeline",
step_name="seldon_model_deployer_step",
model_name="mnist",
)
)
else:
service_loader_step = mlflow_service_loader(
MLFlowDeploymentLoaderStepConfig(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
)
)

# Initialize an inference pipeline run
# inference = inference_pipeline(
# dynamic_importer=dynamic_importer(),
# predict_preprocessor=predict_preprocessor,
# prediction_service_loader=prediction_service_loader(
# SeldonDeploymentLoaderStepConfig(
# pipeline_name="continuous_deployment_pipeline",
# step_name="model_deployer",
# )
# ),
# predictor=predictor(),
# )

# inference.run()
raise NotImplementedError("Predict pipeline coming soon!")

try:
service = load_last_service_from_step(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
running=True,
)
print(
f"The Seldon prediction server is running remotely as a Kubernetes "
f"service and accepts inference requests at:\n"
f" {service.prediction_url}\n"
f"To stop the service, re-run the same command and supply the "
f"`--stop-service` argument."
inference = inference_pipeline(
dynamic_importer=dynamic_importer(),
prediction_service_loader=service_loader_step,
predictor=predictor(),
)
except KeyError:
print(
"No Seldon prediction server is currently running. The deployment "
"pipeline must run first to train a model and deploy it. Execute "
"the same command with the `--deploy` argument to deploy a model."

inference.run()

if use_seldon:
services = model_deployer.find_model_server(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="seldon_model_deployer_step",
model_name="mnist",
)
if services:
service = services[0]
if service.is_running:
print(
f"The Seldon prediction server is running remotely as a Kubernetes "
f"service and accepts inference requests at:\n"
f" {service.prediction_url}\n"
f"To stop the service, re-run the same command and supply the "
f"`--stop-service` argument."
)
elif service.is_failed:
print(
f"The Seldon prediction server is in a failed state:\n"
f" Last state: '{service.status.state.value}'\n"
f" Last error: '{service.status.last_error}'"
)

else:
print(
"No Seldon prediction server is currently running. The deployment "
"pipeline must run first to train a model and deploy it. Execute "
"the same command with the `--deploy` argument to deploy a model."
)
else:
try:
service = load_last_service_from_step(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
running=True,
)
print(
f"The MLflow prediction server is running locally as a daemon process "
f"and accepts inference requests at:\n"
f" {service.prediction_uri}\n"
f"To stop the service, re-run the same command and supply the "
f"`--stop-service` argument."
)
except KeyError:
print(
"No MLflow prediction server is currently running. The deployment "
"pipeline must run first to train a model and deploy it. Execute "
"the same command with the `--deploy` argument to deploy a model."
)


if __name__ == "__main__":
main()
main()
Loading

0 comments on commit 97e9aa6

Please sign in to comment.