Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model converter #480

Merged
merged 40 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ca47013
WIP
rossgray Jul 11, 2024
4bafc10
Debug logging
rossgray Jul 16, 2024
b0efdc2
I/O
rossgray Jul 16, 2024
6e19c62
WIP
rossgray Jul 16, 2024
9ac3878
Debugging
rossgray Jul 16, 2024
82f06fe
WIP
rossgray Jul 18, 2024
c709954
Add debug logging
rossgray Aug 8, 2024
358e139
Update i/o
rossgray Aug 8, 2024
968ebc4
Startup improvements
rossgray Aug 8, 2024
e5caf28
Better error handling
rossgray Aug 8, 2024
f1298a6
Improve inputs
rossgray Aug 8, 2024
5e4610b
More debugging
rossgray Aug 8, 2024
1415467
Fix defaults
rossgray Aug 8, 2024
4065219
Fix empty URLs
rossgray Aug 8, 2024
0428699
Update array outputs
rossgray Aug 9, 2024
32898f9
Update logging
rossgray Aug 9, 2024
b757327
Try adding saving of output files
rossgray Aug 9, 2024
8cdcdc0
Add debug logging
rossgray Aug 9, 2024
a349fe2
Don't use strict mimetypes
rossgray Aug 9, 2024
0590c81
Try adding complex input types
rossgray Aug 9, 2024
c32162e
Fix
rossgray Aug 9, 2024
4c7ec79
Update up command
rossgray Aug 12, 2024
663c8ce
Cleanup
rossgray Aug 12, 2024
dcc875c
Refactor manager classes
rossgray Aug 13, 2024
dbc7aed
Refactor up command
rossgray Aug 13, 2024
de11ea3
Make getting cog image name DRY
rossgray Aug 14, 2024
ca0001c
Update flake8
rossgray Aug 14, 2024
2e3df8a
Small refactor
rossgray Aug 14, 2024
5be2bc5
Move utils
rossgray Aug 14, 2024
52bfd6c
small refactor
rossgray Aug 14, 2024
ce91633
remove comment
rossgray Aug 14, 2024
7e6fd95
Handle object outputs
rossgray Aug 14, 2024
7ee86b2
Add better error handling
rossgray Aug 14, 2024
24355e7
Revert "Handle object outputs"
rossgray Aug 14, 2024
24c7056
Try to handle dict outputs
rossgray Aug 14, 2024
9dc4025
Try to handle dict outputs p2
rossgray Aug 14, 2024
7c65110
Update input description
rossgray Aug 14, 2024
39b3dcb
populate env vars
rossgray Aug 21, 2024
add761c
Bump version
rossgray Aug 22, 2024
aecbbbb
update cog wrapper image ref
rossgray Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
python-version: "3.10"
- uses: py-actions/flake8@v2
with:
ignore: "E203,W503"
ignore: "W503"
max-line-length: "88"
path: "./pipeline"

Expand Down
20 changes: 20 additions & 0 deletions pipeline/console/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,23 @@ def container_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> Non
help="Name of the pipeline.",
)
init_parser.set_defaults(func=container.init_dir)

# Convert
convert_parser = container_sub_parser.add_parser(
"convert",
description=(
"Create a new pipeline by converting a model from a different framework."
),
help="Create a new pipeline by converting a model from a different framework.",
)

convert_parser.add_argument(
"--name",
"-n",
type=str,
help="Name of the pipeline.",
)
convert_parser.add_argument(
"--type", "-t", type=str, help="Type of model", choices=["cog"], required=True
)
convert_parser.set_defaults(func=container.convert)
2 changes: 2 additions & 0 deletions pipeline/console/container/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .build import build_container
from .convert import convert
from .init_dir import init_dir
from .push import push_container
from .up import up_container

__all__ = [
"build_container",
"convert",
"init_dir",
"push_container",
"up_container",
Expand Down
82 changes: 82 additions & 0 deletions pipeline/console/container/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import subprocess
from argparse import Namespace

import yaml

from pipeline.container import docker_templates
from pipeline.util.frameworks import get_cog_image_name
from pipeline.util.logging import _print

from .schemas import PipelineConfig, PythonRuntime, RuntimeConfig


def convert(namespace: Namespace) -> None:
framework = namespace.type

_print(f"Initializing new pipeline from {framework}...", "INFO")

pipeline_name = getattr(namespace, "name", None)
if not pipeline_name:
pipeline_name = input("Enter a name for your pipeline: ")

if framework == "cog":
config = convert_cog(pipeline_name)
else:
raise NotImplementedError(f"Framework {framework} not supported")

with open(getattr(namespace, "file", "./pipeline.yaml"), "w") as f:
f.write(yaml.dump(config.dict(), sort_keys=False))

with open("./README.md", "w") as f:
f.write(docker_templates.readme_template)

_print(f"Successfully generated a new pipeline from {framework}.", "SUCCESS")
_print(
"Be sure to update the pipeline.yaml with the accelerators required by your "
"pipeline",
"WARNING",
)


def convert_cog(pipeline_name: str) -> PipelineConfig:

# check cog command exists
try:
subprocess.run(["cog", "--version"], check=True, capture_output=True)
except subprocess.CalledProcessError:
_print(
"cog not found, please install cog first: https://github.com/replicate/cog",
"ERROR",
)
raise

# build cog image
# tag image with a standardised name
cog_image_name = get_cog_image_name(pipeline_name)
subprocess.run(
["cog", "build", "-t", cog_image_name],
check=True,
# capture_output=True,
)

# Generate a pipeline config. Note that most of these fields will not be
# used when wrapping a Cog pipeline
config = PipelineConfig(
# not used
runtime=RuntimeConfig(
container_commands=[],
python=PythonRuntime(
version="3.10",
requirements=[],
),
),
accelerators=[],
# not used
pipeline_graph="",
pipeline_name=pipeline_name,
accelerator_memory=None,
# use a format which permits extra framework-specific options
extras={"model_framework": {"framework": "cog", "save_output_files": False}},
readme="README.md",
)
return config
3 changes: 1 addition & 2 deletions pipeline/console/container/init_dir.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from argparse import Namespace

import yaml
Expand Down Expand Up @@ -40,7 +39,7 @@ def init_dir(namespace: Namespace) -> None:
readme="README.md",
)
with open(getattr(namespace, "file", "./pipeline.yaml"), "w") as f:
f.write(yaml.dump(json.loads(default_config.json()), sort_keys=False))
f.write(yaml.dump(default_config.dict(), sort_keys=False))

with open("./new_pipeline.py", "w") as f:
f.write(python_template)
Expand Down
127 changes: 71 additions & 56 deletions pipeline/console/container/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pipeline.cloud.schemas import cluster as cluster_schemas
from pipeline.cloud.schemas import pipelines as pipelines_schemas
from pipeline.cloud.schemas import registry as registry_schemas
from pipeline.util.frameworks import get_cog_image_name, is_using_cog
from pipeline.util.logging import _print

from .pointer import create_pointer
Expand Down Expand Up @@ -51,6 +52,8 @@ def push_container(namespace: Namespace):
id=cluster_id, node_pool=node_pool_id
)

is_cog = is_using_cog(pipeline_config.extras)

# Check for file, transform to string, and put it back in config
if pipeline_config.readme is not None:
if os.path.isfile(pipeline_config.readme):
Expand All @@ -66,12 +69,15 @@ def push_container(namespace: Namespace):
json.loads(readmeless_config.json(exclude_none=True, exclude_unset=True)),
sort_keys=False,
)

pipeline_yaml_text = "```yaml\n" + pipeline_yaml_text + "\n```"
pipeline_code = Path(
pipeline_config.pipeline_graph.split(":")[0] + ".py"
).read_text()
pipeline_code = "```python\n" + pipeline_code + "\n```"

if is_cog:
pipeline_code = "This pipeline has been converted from a Cog model"
else:
pipeline_code = Path(
pipeline_config.pipeline_graph.split(":")[0] + ".py"
).read_text()
pipeline_code = "```python\n" + pipeline_code + "\n```"
try:
pipeline_config.readme = pipeline_config.readme.format(
pipeline_name=pipeline_config.pipeline_name,
Expand Down Expand Up @@ -108,11 +114,18 @@ def push_container(namespace: Namespace):
registry_info = registry_schemas.RegistryInformation.parse_raw(registry_info.text)

upload_registry = registry_info.url

if upload_registry is None:
raise ValueError("No upload registry found")
image = docker_client.images.get(pipeline_name)

if is_cog:
local_image_name = get_cog_image_name(pipeline_name)
else:
local_image_name = pipeline_name

image = docker_client.images.get(local_image_name)
assert image.id
image_hash = image.id.split(":")[1]
hash_tag = image_hash[:12]

if turbo_registry:
if len(image.attrs["RootFS"]["Layers"]) > 1:
Expand All @@ -122,66 +135,32 @@ def push_container(namespace: Namespace):
)
raise Exception("Failed to push")

hash_tag = image_hash[:12]
image_to_push = pipeline_name + ":" + hash_tag
image_to_push_reg = upload_registry + "/" + image_to_push

upload_token = None
true_pipeline_name = None
if registry_info.special_auth:
start_upload_response = http.post(
endpoint="/v4/registry/start-upload",
json_data=pipelines_schemas.PipelineStartUpload(
pipeline_name=pipeline_name,
pipeline_tag=None,
cluster=pipeline_config.cluster,
turbo_registry=True if turbo_registry else False,
accelerators=pipeline_config.accelerators,
).dict(),
pipeline_name = _auth_with_registry(
upload_registry=upload_registry,
docker_client=docker_client,
pipeline_name=pipeline_name,
cluster=pipeline_config.cluster,
)
start_upload_dict = start_upload_response.json()
upload_token = start_upload_dict.get("bearer", None)
true_pipeline_name = start_upload_dict.get("pipeline_name")

if upload_token is None:
raise ValueError("No upload token found")

# Login to upload registry
try:
docker_client.login(
username="pipeline",
password=upload_token,
registry="http://" + upload_registry,
)
except Exception as e:
_print(f"Failed to login to registry: {e}", "ERROR")
raise

_print(f"Successfully logged in to registry {upload_registry}")

# Override the tag with the pipeline name from catalyst
image_to_push = true_pipeline_name + ":" + hash_tag
image_to_push_reg = upload_registry + "/" + image_to_push
image_name = pipeline_name
if is_cog:
image_name = get_cog_image_name(pipeline_name)
remote_image = f"{upload_registry}/{image_name}:{hash_tag}"

_print(f"Pushing image to upload registry {upload_registry}", "INFO")

docker_client.images.get(pipeline_name).tag(image_to_push_reg)
# Do this after tagging, because we need to use
# the old pipeline name to tag the local image
if true_pipeline_name:
pipeline_name = true_pipeline_name

image.tag(remote_image)
_push_docker_image(
docker_client=docker_client,
image=image_to_push_reg,
upload_token=upload_token,
docker_client=docker_client, image=remote_image, upload_token=upload_token
)

new_deployment_request = http.post(
endpoint="/v4/pipelines",
json_data=pipelines_schemas.PipelineCreate(
name=pipeline_name,
image=image_to_push_reg,
image=remote_image,
input_variables=[],
output_variables=[],
accelerators=pipeline_config.accelerators,
Expand All @@ -208,10 +187,46 @@ def push_container(namespace: Namespace):
create_pointer(pointer, new_deployment.id, force=pointer_overwrite)


def _push_docker_image(
def _auth_with_registry(
upload_registry: str,
docker_client: docker.DockerClient,
image: str,
upload_token: str,
pipeline_name: str,
cluster: cluster_schemas.PipelineClusterConfig | None = None,
):
response = http.post(
endpoint="/v4/registry/start-upload",
json_data=pipelines_schemas.PipelineStartUpload(
pipeline_name=pipeline_name,
pipeline_tag=None,
cluster=cluster,
).dict(),
)
start_upload_response = pipelines_schemas.PipelineStartUploadResponse.parse_obj(
response.json()
)
upload_token = start_upload_response.bearer
true_pipeline_name = start_upload_response.pipeline_name

if upload_token is None:
raise ValueError("No upload token found")

# Login to upload registry
try:
docker_client.login(
username="pipeline",
password=upload_token,
registry="http://" + upload_registry,
)
except Exception as e:
_print(f"Failed to login to registry: {e}", "ERROR")
raise

_print(f"Successfully logged in to registry {upload_registry}")
return true_pipeline_name


def _push_docker_image(
docker_client: docker.DockerClient, image: str, upload_token: str | None
):
resp = docker_client.images.push(
image,
Expand Down
Loading
Loading