Skip to content

Commit

Permalink
Model converter (#480)
Browse files Browse the repository at this point in the history
* WIP

* Debug logging

* I/O

* WIP

* Debugging

* WIP

* Add debug logging

* Update i/o

* Startup improvements

* Better error handling

* Improve inputs

* More debugging

* Fix defaults

* Fix empty URLs

* Update array outputs

* Update logging

* Try adding saving of output files

* Add debug logging

* Don't use strict mimetypes

* Try adding complex input types

* Fix

* Update up command

* Cleanup

* Refactor manager classes

* Refactor up command

* Make getting cog image name DRY

* Update flake8

* Small refactor

* Move utils

* small refactor

* remove comment

* Handle object outputs

* Add better error handling

* Revert "Handle object outputs"

This reverts commit 8847cd1.

* Try to handle dict outputs

* Try to handle dict outputs p2

* Update input description

* populate env vars

* Bump version

* update cog wrapper image ref
  • Loading branch information
rossgray authored Sep 9, 2024
1 parent 9cecf56 commit 34f2c38
Show file tree
Hide file tree
Showing 16 changed files with 819 additions and 199 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
python-version: "3.10"
- uses: py-actions/flake8@v2
with:
ignore: "E203,W503"
ignore: "W503"
max-line-length: "88"
path: "./pipeline"

Expand Down
20 changes: 20 additions & 0 deletions pipeline/console/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,23 @@ def container_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> Non
help="Name of the pipeline.",
)
init_parser.set_defaults(func=container.init_dir)

# Convert
convert_parser = container_sub_parser.add_parser(
"convert",
description=(
"Create a new pipeline by converting a model from a different framework."
),
help="Create a new pipeline by converting a model from a different framework.",
)

convert_parser.add_argument(
"--name",
"-n",
type=str,
help="Name of the pipeline.",
)
convert_parser.add_argument(
"--type", "-t", type=str, help="Type of model", choices=["cog"], required=True
)
convert_parser.set_defaults(func=container.convert)
2 changes: 2 additions & 0 deletions pipeline/console/container/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .build import build_container
from .convert import convert
from .init_dir import init_dir
from .push import push_container
from .up import up_container

__all__ = [
"build_container",
"convert",
"init_dir",
"push_container",
"up_container",
Expand Down
82 changes: 82 additions & 0 deletions pipeline/console/container/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import subprocess
from argparse import Namespace

import yaml

from pipeline.container import docker_templates
from pipeline.util.frameworks import get_cog_image_name
from pipeline.util.logging import _print

from .schemas import PipelineConfig, PythonRuntime, RuntimeConfig


def convert(namespace: Namespace) -> None:
framework = namespace.type

_print(f"Initializing new pipeline from {framework}...", "INFO")

pipeline_name = getattr(namespace, "name", None)
if not pipeline_name:
pipeline_name = input("Enter a name for your pipeline: ")

if framework == "cog":
config = convert_cog(pipeline_name)
else:
raise NotImplementedError(f"Framework {framework} not supported")

with open(getattr(namespace, "file", "./pipeline.yaml"), "w") as f:
f.write(yaml.dump(config.dict(), sort_keys=False))

with open("./README.md", "w") as f:
f.write(docker_templates.readme_template)

_print(f"Successfully generated a new pipeline from {framework}.", "SUCCESS")
_print(
"Be sure to update the pipeline.yaml with the accelerators required by your "
"pipeline",
"WARNING",
)


def convert_cog(pipeline_name: str) -> PipelineConfig:

# check cog command exists
try:
subprocess.run(["cog", "--version"], check=True, capture_output=True)
except subprocess.CalledProcessError:
_print(
"cog not found, please install cog first: https://github.com/replicate/cog",
"ERROR",
)
raise

# build cog image
# tag image with a standardised name
cog_image_name = get_cog_image_name(pipeline_name)
subprocess.run(
["cog", "build", "-t", cog_image_name],
check=True,
# capture_output=True,
)

# Generate a pipeline config. Note that most of these fields will not be
# used when wrapping a Cog pipeline
config = PipelineConfig(
# not used
runtime=RuntimeConfig(
container_commands=[],
python=PythonRuntime(
version="3.10",
requirements=[],
),
),
accelerators=[],
# not used
pipeline_graph="",
pipeline_name=pipeline_name,
accelerator_memory=None,
# use a format which permits extra framework-specific options
extras={"model_framework": {"framework": "cog", "save_output_files": False}},
readme="README.md",
)
return config
3 changes: 1 addition & 2 deletions pipeline/console/container/init_dir.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from argparse import Namespace

import yaml
Expand Down Expand Up @@ -40,7 +39,7 @@ def init_dir(namespace: Namespace) -> None:
readme="README.md",
)
with open(getattr(namespace, "file", "./pipeline.yaml"), "w") as f:
f.write(yaml.dump(json.loads(default_config.json()), sort_keys=False))
f.write(yaml.dump(default_config.dict(), sort_keys=False))

with open("./new_pipeline.py", "w") as f:
f.write(python_template)
Expand Down
127 changes: 71 additions & 56 deletions pipeline/console/container/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pipeline.cloud.schemas import cluster as cluster_schemas
from pipeline.cloud.schemas import pipelines as pipelines_schemas
from pipeline.cloud.schemas import registry as registry_schemas
from pipeline.util.frameworks import get_cog_image_name, is_using_cog
from pipeline.util.logging import _print

from .pointer import create_pointer
Expand Down Expand Up @@ -51,6 +52,8 @@ def push_container(namespace: Namespace):
id=cluster_id, node_pool=node_pool_id
)

is_cog = is_using_cog(pipeline_config.extras)

# Check for file, transform to string, and put it back in config
if pipeline_config.readme is not None:
if os.path.isfile(pipeline_config.readme):
Expand All @@ -66,12 +69,15 @@ def push_container(namespace: Namespace):
json.loads(readmeless_config.json(exclude_none=True, exclude_unset=True)),
sort_keys=False,
)

pipeline_yaml_text = "```yaml\n" + pipeline_yaml_text + "\n```"
pipeline_code = Path(
pipeline_config.pipeline_graph.split(":")[0] + ".py"
).read_text()
pipeline_code = "```python\n" + pipeline_code + "\n```"

if is_cog:
pipeline_code = "This pipeline has been converted from a Cog model"
else:
pipeline_code = Path(
pipeline_config.pipeline_graph.split(":")[0] + ".py"
).read_text()
pipeline_code = "```python\n" + pipeline_code + "\n```"
try:
pipeline_config.readme = pipeline_config.readme.format(
pipeline_name=pipeline_config.pipeline_name,
Expand Down Expand Up @@ -108,11 +114,18 @@ def push_container(namespace: Namespace):
registry_info = registry_schemas.RegistryInformation.parse_raw(registry_info.text)

upload_registry = registry_info.url

if upload_registry is None:
raise ValueError("No upload registry found")
image = docker_client.images.get(pipeline_name)

if is_cog:
local_image_name = get_cog_image_name(pipeline_name)
else:
local_image_name = pipeline_name

image = docker_client.images.get(local_image_name)
assert image.id
image_hash = image.id.split(":")[1]
hash_tag = image_hash[:12]

if turbo_registry:
if len(image.attrs["RootFS"]["Layers"]) > 1:
Expand All @@ -122,66 +135,32 @@ def push_container(namespace: Namespace):
)
raise Exception("Failed to push")

hash_tag = image_hash[:12]
image_to_push = pipeline_name + ":" + hash_tag
image_to_push_reg = upload_registry + "/" + image_to_push

upload_token = None
true_pipeline_name = None
if registry_info.special_auth:
start_upload_response = http.post(
endpoint="/v4/registry/start-upload",
json_data=pipelines_schemas.PipelineStartUpload(
pipeline_name=pipeline_name,
pipeline_tag=None,
cluster=pipeline_config.cluster,
turbo_registry=True if turbo_registry else False,
accelerators=pipeline_config.accelerators,
).dict(),
pipeline_name = _auth_with_registry(
upload_registry=upload_registry,
docker_client=docker_client,
pipeline_name=pipeline_name,
cluster=pipeline_config.cluster,
)
start_upload_dict = start_upload_response.json()
upload_token = start_upload_dict.get("bearer", None)
true_pipeline_name = start_upload_dict.get("pipeline_name")

if upload_token is None:
raise ValueError("No upload token found")

# Login to upload registry
try:
docker_client.login(
username="pipeline",
password=upload_token,
registry="http://" + upload_registry,
)
except Exception as e:
_print(f"Failed to login to registry: {e}", "ERROR")
raise

_print(f"Successfully logged in to registry {upload_registry}")

# Override the tag with the pipeline name from catalyst
image_to_push = true_pipeline_name + ":" + hash_tag
image_to_push_reg = upload_registry + "/" + image_to_push
image_name = pipeline_name
if is_cog:
image_name = get_cog_image_name(pipeline_name)
remote_image = f"{upload_registry}/{image_name}:{hash_tag}"

_print(f"Pushing image to upload registry {upload_registry}", "INFO")

docker_client.images.get(pipeline_name).tag(image_to_push_reg)
# Do this after tagging, because we need to use
# the old pipeline name to tag the local image
if true_pipeline_name:
pipeline_name = true_pipeline_name

image.tag(remote_image)
_push_docker_image(
docker_client=docker_client,
image=image_to_push_reg,
upload_token=upload_token,
docker_client=docker_client, image=remote_image, upload_token=upload_token
)

new_deployment_request = http.post(
endpoint="/v4/pipelines",
json_data=pipelines_schemas.PipelineCreate(
name=pipeline_name,
image=image_to_push_reg,
image=remote_image,
input_variables=[],
output_variables=[],
accelerators=pipeline_config.accelerators,
Expand All @@ -208,10 +187,46 @@ def push_container(namespace: Namespace):
create_pointer(pointer, new_deployment.id, force=pointer_overwrite)


def _push_docker_image(
def _auth_with_registry(
upload_registry: str,
docker_client: docker.DockerClient,
image: str,
upload_token: str,
pipeline_name: str,
cluster: cluster_schemas.PipelineClusterConfig | None = None,
):
response = http.post(
endpoint="/v4/registry/start-upload",
json_data=pipelines_schemas.PipelineStartUpload(
pipeline_name=pipeline_name,
pipeline_tag=None,
cluster=cluster,
).dict(),
)
start_upload_response = pipelines_schemas.PipelineStartUploadResponse.parse_obj(
response.json()
)
upload_token = start_upload_response.bearer
true_pipeline_name = start_upload_response.pipeline_name

if upload_token is None:
raise ValueError("No upload token found")

# Login to upload registry
try:
docker_client.login(
username="pipeline",
password=upload_token,
registry="http://" + upload_registry,
)
except Exception as e:
_print(f"Failed to login to registry: {e}", "ERROR")
raise

_print(f"Successfully logged in to registry {upload_registry}")
return true_pipeline_name


def _push_docker_image(
docker_client: docker.DockerClient, image: str, upload_token: str | None
):
resp = docker_client.images.push(
image,
Expand Down
Loading

0 comments on commit 34f2c38

Please sign in to comment.