Skip to content

Commit

Permalink
Separate jobs from schema load (Azure#27019)
Browse files Browse the repository at this point in the history
Signed-off-by: Brynn Yin <biyi@microsoft.com>

Signed-off-by: Brynn Yin <biyi@microsoft.com>
  • Loading branch information
brynn-code authored Oct 25, 2022
1 parent e4ab9f3 commit d31e98c
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
if distribution:
distribution = DistributionConfiguration._from_rest_object(distribution)

# Note: we need to refine the logic here if more specific type logic here.
jobs = rest_component_version.component_spec.pop("jobs", None)
if _type == NodeType.PIPELINE and jobs:
jobs = PipelineComponent._resolve_sub_nodes(jobs)

new_instance = create_instance_func()
init_kwargs = dict(
id=obj.id,
Expand All @@ -191,6 +196,7 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
inputs=inputs,
outputs=outputs,
distribution=distribution,
jobs=jobs,
**(
create_schema_func({BASE_PATH_CONTEXT_KEY: "./"}).load(
rest_component_version.component_spec, unknown=INCLUDE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from azure.ai.ml.constants._component import ComponentSource, NodeType
from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
from azure.ai.ml.entities._builders import BaseNode, Command
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
Expand Down Expand Up @@ -323,6 +323,16 @@ def _load_from_rest_pipeline_job(cls, data: Dict):
_source=ComponentSource.REMOTE_WORKSPACE_JOB,
)

@classmethod
def _resolve_sub_nodes(cls, rest_jobs):
sub_nodes = {}
for node_name, node in rest_jobs.items():
if LoopNode._is_loop_node_dict(node):
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
else:
sub_nodes[node_name] = BaseNode._from_rest_object(node)
return sub_nodes

@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
return PipelineComponentSchema(context=context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
}


# TODO: Remove this as both rest type and sdk type are snake case now.
def get_output_type_mapping_from_rest():
"""Get output type mapping."""
return {
Expand Down Expand Up @@ -255,6 +256,7 @@ def from_rest_inputs_to_dataset_literal(
if input_value is None:
continue

# TODO: Remove this as both rest type and sdk type are snake case now.
type_transfer_dict = get_output_type_mapping_from_rest()
# deal with invalid input type submitted by feb api
# todo: backend help convert node level input/output type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def from_dict_to_rest_io(
rest_obj = rest_object_class.from_dict(val)
rest_io_objects[key] = rest_obj
else:
msg = "Got unsupported type of output: {}:" + f"{type(val)}"
msg = "Got unsupported type of input/output: {}:" + f"{type(val)}"
raise ValidationException(
message=msg.format(val),
no_personal_data_message=msg.format("[val]"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,7 @@ def _load_from_rest(cls, obj: JobBase) -> "PipelineJob":
from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {}
from_rest_outputs = from_rest_data_outputs(properties.outputs) or {}
# Unpack the component jobs
sub_nodes = {}
if properties.jobs:
for node_name, node in properties.jobs.items():
if LoopNode._is_loop_node_dict(node):
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
else:
sub_nodes[node_name] = BaseNode._from_rest_object(node)
sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {}
# backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back
settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None
settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path

import pytest
import yaml

from azure.ai.ml import Input, load_component, load_job
from azure.ai.ml.entities import PipelineComponent, PipelineJob
Expand Down Expand Up @@ -416,3 +417,29 @@ def test_invalid_nested_pipeline_component_with_group(self) -> None:
"'group' is defined as a parameter group but got input '${{parent.inputs.top_group}}' with type '<class 'str'>'"
in str(e.value)
)

def test_simple_jobs_from_rest(self) -> None:
test_path = "./tests/test_configs/components/pipeline_component_jobs_rest_data.json"
with open(test_path, "r") as f:
json_in_file = yaml.safe_load(f)
json_in_file = json_in_file['properties']['component_spec']['jobs']
jobs = PipelineComponent._resolve_sub_nodes(json_in_file)
node_dict = {key: node._to_rest_object() for key, node in jobs.items()}['component_a_job']
assert node_dict['computeId'] == '${{parent.inputs.node_compute}}'
assert node_dict['outputs'] == {
'output_binding': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
'output_binding2': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
'output_data': {'job_output_type': 'uri_folder', 'mode': 'Upload'},
'output_data_legacy': {'job_output_type': 'uri_folder', 'mode': 'Upload'}}
assert node_dict['inputs'] == {
'binding_input': {'job_input_type': 'literal', 'value': '${{parent.inputs.component_in_path}}'},
'data_input': {'job_input_type': 'uri_file',
'mode': 'Download',
'uri': 'https://my-blob/path/to/data'},
'data_input_legacy': {'job_input_type': 'uri_file',
'mode': 'Download',
'uri': 'https://my-blob/path/to/data'},
'literal_input': {'job_input_type': 'literal', 'value': '11'},
'literal_input2': {'job_input_type': 'literal', 'value': '12'}}
assert node_dict['resources'] == {'instance_count': 1, 'properties': {
'target_selector': {'my_resource_only': 'false', 'allow_spot_vm': 'true'}}, 'shm_size': '2g'}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def pipeline_func(pipeline_input):
except HttpResponseError as ex:
assert "CancelPipelineRunInTerminalStatus" in str(ex)

# TODO: Enable this when type fixed on master.
@pytest.mark.skip(reason="marshmallow.exceptions.ValidationError: miss required jobs.node.component")
@pytest.mark.parametrize(
"test_case_i,test_case_name",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"id": "mock_id",
"name": "1",
"type": "Microsoft.MachineLearningServices/workspaces/components/versions",
"system_data": {
"created_by": "Brynn Yin",
"created_by_type": "User",
"created_at": "2022-10-25T03:39:27.465966Z",
"last_modified_by": "Brynn Yin",
"last_modified_by_type": "User",
"last_modified_at": "2022-10-25T03:39:28.104555Z"
},
"properties": {
"properties": {},
"tags": {
"tag": "tagvalue",
"owner": "sdkteam"
},
"is_anonymous": false,
"is_archived": false,
"component_spec": {
"name": "test_392226085584",
"version": "1",
"display_name": "Hello World Pipeline Component",
"is_deterministic": "False",
"type": "pipeline",
"description": "This is the basic pipeline component",
"tags": {
"tag": "tagvalue",
"owner": "sdkteam"
},
"inputs": {
"component_in_path": {
"type": "uri_folder",
"optional": "False",
"description": "A path"
},
"component_in_number": {
"type": "number",
"optional": "True",
"default": "10.99",
"description": "A number"
},
"node_compute": {
"type": "string",
"optional": "False",
"default": "cpu-cluster"
}
},
"jobs": {
"component_a_job": {
"componentId": "mock_id",
"type": "command",
"computeId": "${{parent.inputs.node_compute}}",
"resources": {
"instance_count": "1",
"shm_size": "2g",
"properties": {
"target_selector": {
"my_resource_only": "false",
"allow_spot_vm": "true"
}
}
},
"inputs": {
"binding_input": {
"job_input_type": "literal",
"value": "${{parent.inputs.component_in_path}}"
},
"literal_input": {
"job_input_type": "literal",
"value": "11"
},
"literal_input2": {
"job_input_type": "Literal",
"value": "12"
},
"data_input": {
"job_input_type": "uri_file",
"mode": "Download",
"uri": "https://my-blob/path/to/data"
},
"data_input_legacy": {
"job_input_type": "UriFile",
"mode": "Download",
"uri": "https://my-blob/path/to/data"
}
},
"outputs": {
"output_data": {
"mode": "Upload",
"job_output_type": "uri_folder"
},
"output_data_legacy": {
"mode": "Upload",
"job_output_type": "UriFolder"
},
"output_binding": {
"value": "${{parent.outputs.output}}",
"job_output_type": "literal"
},
"output_binding2": {
"value": "${{parent.outputs.output}}",
"job_output_type": "Literal"
}
}
}
},
"$schema": "https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json"
}
}
}

0 comments on commit d31e98c

Please sign in to comment.