Skip to content

Commit

Permalink
Support Parallel Run Step partition by keys (Azure#26844)
Browse files Browse the repository at this point in the history
* v1

* fix lint

* pass through partition keys

* marshal and unmarshal the partition keys in the parallel component

* e2e test passed

* add parallel component partition keys UT

* add test case

* fix UT

* fix lint

* fix e2e

* fix UT

* recording

* recording twice

* add comments

* recording

* fix recording

* fix crlf to lf

* skip under none live mode

* update recording

* fix comment

* fix lint

* fix ut

* import json

Co-authored-by: Xiaole Wen <xiwe@microsoft.com>
Co-authored-by: Clement Wang <clwan@microsoft.com>
  • Loading branch information
3 people authored Oct 26, 2022
1 parent e5db378 commit 4c9b464
Show file tree
Hide file tree
Showing 27 changed files with 3,075 additions and 2,769 deletions.
10 changes: 10 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, **kwargs):
self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
self._error_threshold = kwargs.pop("error_threshold", None)
self._mini_batch_size = kwargs.pop("mini_batch_size", None)
self._partition_keys = kwargs.pop("partition_keys", None)
self._logging_level = kwargs.pop("logging_level", None)
self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
self._init = False
Expand Down Expand Up @@ -60,6 +61,15 @@ def mini_batch_size(self) -> int:
def mini_batch_size(self, value: int):
self._mini_batch_size = value

@property
def partition_keys(self) -> List:
"""The keys used to partition dataset into mini-batches."""
return self._partition_keys

@partition_keys.setter
def partition_keys(self, value: List):
self._partition_keys = value

@property
def logging_level(self) -> str:
"""A string of the logging level name"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class ParallelComponentSchema(ComponentSchema):
mini_batch_size = fields.Str(
metadata={"description": "The The batch size of current job."},
)
partition_keys = fields.List(
fields.Str(),
metadata={"description": "The keys used to partition input data into mini-batches"}
)

input_data = fields.Str()
retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE)
max_concurrency_per_instance = fields.Integer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class ParameterizedParallelSchema(PathAwareSchema):
mini_batch_size = fields.Str(
metadata={"description": "The batch size of current job."},
)
partition_keys = fields.List(
fields.Str(),
metadata={"description": "The keys used to partition input data into mini-batches"}
)
input_data = fields.Str()
resources = NestedField(JobResourceConfigurationSchema)
retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE)
Expand Down
18 changes: 17 additions & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import copy
import logging
import re
import json
from enum import Enum
from typing import Dict, List, Union

Expand Down Expand Up @@ -69,6 +70,12 @@ class Parallel(BaseNode):
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, the partition keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: List
:param input_data: The input data.
:type input_data: str
:param inputs: Inputs of the component/job.
Expand Down Expand Up @@ -104,6 +111,7 @@ def __init__(
mini_batch_error_threshold: int = None,
input_data: str = None,
task: Dict[str, Union[ParallelTask, str]] = None,
partition_keys: List = None,
mini_batch_size: int = None,
resources: JobResourceConfiguration = None,
environment_variables: Dict = None,
Expand Down Expand Up @@ -147,6 +155,7 @@ def __init__(
raise ValueError("mini_batch_size unit must be kb, mb or gb")

self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.input_data = input_data
self._retry_settings = retry_settings
self.logging_level = logging_level
Expand All @@ -166,6 +175,8 @@ def __init__(
self.mini_batch_error_threshold or self.component.mini_batch_error_threshold
)
self.mini_batch_size = self.mini_batch_size or self.component.mini_batch_size
self.partition_keys = self.partition_keys or self.component.partition_keys

if not self.task:
self.task = self.component.task
# task.code is based on self.component.base_path
Expand Down Expand Up @@ -270,6 +281,7 @@ def _to_job(self) -> ParallelJob:
properties=self.properties,
compute=self.compute,
resources=self.resources,
partition_keys=self.partition_keys,
mini_batch_size=self.mini_batch_size,
task=self.task,
retry_settings=self.retry_settings,
Expand Down Expand Up @@ -318,6 +330,8 @@ def _to_rest_object(self, **kwargs) -> dict:
retry_settings=get_rest_dict_for_node_attrs(self.retry_settings),
logging_level=self.logging_level,
mini_batch_size=self.mini_batch_size,
partition_keys=json.dumps(self.partition_keys)
if self.partition_keys is not None else self.partition_keys,
resources=get_rest_dict_for_node_attrs(self.resources),
)
)
Expand Down Expand Up @@ -353,7 +367,8 @@ def _from_rest_object(cls, obj: dict) -> "Parallel":
# distribution, sweep won't have distribution
if "distribution" in obj and obj["distribution"]:
obj["distribution"] = DistributionConfiguration._from_rest_object(obj["distribution"])

if "partition_keys" in obj and obj["partition_keys"]:
obj["partition_keys"] = json.dumps(obj["partition_keys"])
return Parallel(**obj)

def _build_inputs(self):
Expand Down Expand Up @@ -392,6 +407,7 @@ def __call__(self, *args, **kwargs) -> "Parallel":
node.tags = self.tags
node.display_name = self.display_name
node.mini_batch_size = self.mini_batch_size
node.partition_keys = self.partition_keys
node.logging_level = self.logging_level
node.max_concurrency_per_instance = self.max_concurrency_per_instance
node.error_threshold = self.error_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os
from typing import Dict, Union
from typing import Dict, Union, List

from azure.ai.ml._restclient.v2022_02_01_preview.models import AmlToken, ManagedIdentity
from azure.ai.ml.constants._component import ComponentSource
Expand Down Expand Up @@ -31,6 +31,7 @@ def parallel_run_function(
mini_batch_error_threshold: int = None,
task: RunFunction = None,
mini_batch_size: str = None,
partition_keys: List = None,
input_data: str = None,
inputs: Dict = None,
outputs: Dict = None,
Expand Down Expand Up @@ -136,6 +137,12 @@ def parallel_run_function(
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter.
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, the partition keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: List
:param input_data: The input data.
:type input_data: str
:param inputs: a dict of inputs used by this parallel.
Expand Down Expand Up @@ -190,6 +197,7 @@ def parallel_run_function(
mini_batch_error_threshold=mini_batch_error_threshold,
task=task,
mini_batch_size=mini_batch_size,
partition_keys=partition_keys,
input_data=input_data,
_source=ComponentSource.BUILDER,
is_deterministic=is_deterministic,
Expand All @@ -216,6 +224,7 @@ def parallel_run_function(
mini_batch_error_threshold=mini_batch_error_threshold,
task=task,
mini_batch_size=mini_batch_size,
partition_keys=partition_keys,
input_data=input_data,
**kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
if distribution:
distribution = DistributionConfiguration._from_rest_object(distribution)

if _type == "parallel":
import json
if "partition_keys" in rest_component_version.component_spec:
rest_component_version.component_spec["partition_keys"] \
= json.loads(rest_component_version.component_spec["partition_keys"])
# Note: we need to refine the logic here if more specific type logic here.
jobs = rest_component_version.component_spec.pop("jobs", None)
if _type == NodeType.PIPELINE and jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import os
import re
from typing import Any, Dict, Union
import json
from typing import Any, Dict, Union, List

from marshmallow import Schema

Expand All @@ -16,9 +17,10 @@
from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel
from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
from azure.ai.ml._restclient.v2022_05_01.models import ComponentVersionData

from ..._schema import PathAwareSchema
from .._util import convert_ordered_dict_to_dict, validate_attribute_type
from .._util import validate_attribute_type
from .component import Component


Expand Down Expand Up @@ -53,6 +55,12 @@ class ParallelComponent(Component, ParameterizedParallel): # pylint: disable=to
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter.
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, partition_keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: list
:param input_data: The input data.
:type input_data: str
:param resources: Compute Resource configuration for the component.
Expand Down Expand Up @@ -86,6 +94,7 @@ def __init__(
mini_batch_error_threshold: int = None,
task: ParallelTask = None,
mini_batch_size: str = None,
partition_keys: List = None,
input_data: str = None,
resources: JobResourceConfiguration = None,
inputs: Dict = None,
Expand Down Expand Up @@ -116,6 +125,7 @@ def __init__(
# and fill in later with job defaults.
self.task = task
self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.input_data = input_data
self.retry_settings = retry_settings
self.logging_level = logging_level
Expand Down Expand Up @@ -225,9 +235,12 @@ def _attr_type_map(cls) -> dict:
"resources": (dict, JobResourceConfiguration),
}

def _to_dict(self) -> Dict:
"""Dump the parallel component content into a dictionary."""
return convert_ordered_dict_to_dict({**self._other_parameter, **super(ParallelComponent, self)._to_dict()})
def _to_rest_object(self) -> ComponentVersionData:
rest_object = super()._to_rest_object()
if self.partition_keys:
rest_object.properties.component_spec["partition_keys"]= \
json.dumps(self.partition_keys)
return rest_object

@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ParallelJob(Job, ParameterizedParallel, JobIOMixin):
:type task: ParallelTask
:param mini_batch_size: The mini batch size.
:type mini_batch_size: str
:param partition_keys: The partition keys.
:type partition_keys: list
:param input_data: The input data.
:type input_data: str
:param inputs: Inputs of the job.
Expand Down Expand Up @@ -107,6 +109,7 @@ def _to_component(self, context: Dict = None, **kwargs):
return ParallelComponent(
base_path=context[BASE_PATH_CONTEXT_KEY],
mini_batch_size=self.mini_batch_size,
partition_keys=self.partition_keys,
input_data=self.input_data,
task=self.task,
retry_settings=self.retry_settings,
Expand Down Expand Up @@ -137,6 +140,7 @@ def _to_node(self, context: Dict = None, **kwargs):
inputs=self.inputs,
outputs=self.outputs,
mini_batch_size=self.mini_batch_size,
partition_keys=self.partition_keys,
input_data=self.input_data,
# task will be inherited from component & base_path will be set correctly.
retry_settings=self.retry_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ---------------------------------------------------------

import logging
from typing import Dict, Union
from typing import Dict, Union, List

from ..job_resource_configuration import JobResourceConfiguration
from .parallel_task import ParallelTask
Expand Down Expand Up @@ -47,10 +47,12 @@ def __init__(
input_data: str = None,
task: ParallelTask = None,
mini_batch_size: int = None,
partition_keys: List = None,
resources: Union[dict, JobResourceConfiguration] = None,
environment_variables: Dict = None,
):
self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.task = task
self.retry_settings = retry_settings
self.input_data = input_data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_parallel_component_version_as_a_function_with_inputs(self):
"error_threshold": None,
"logging_level": None,
"max_concurrency_per_instance": None,
"partition_keys": None,
"mini_batch_error_threshold": None,
"mini_batch_size": 10485760,
"retry_settings": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,30 @@ def test_serialize_deserialize_basic(self, mock_machinelearning_client: MLClient

assert component_entity.code
assert component_entity.code == f"{str(Path('./tests/test_configs/python').resolve())}:1"

def test_serialize_deserialize_partition_keys(self, mock_machinelearning_client: MLClient):
test_path = "./tests/test_configs/components/parallel_component_with_partition_keys.yml"
component_entity = load_component_entity_from_yaml(test_path, mock_machinelearning_client)
rest_path = "./tests/test_configs/components/parallel_component_with_partition_keys_rest.json"
target_entity = load_component_entity_from_rest_json(rest_path)

# skip check code and environment
component_dict = component_entity._to_dict()
assert component_dict["id"]
component_dict = pydash.omit(
dict(component_dict),
"task.code",
"id",
)
expected_dict = pydash.omit(
dict(target_entity._to_dict()),
"task.code",
"creation_context",
"id",
)

assert component_dict == expected_dict
assert component_dict["partition_keys"] == ["foo", "bar"]

assert component_entity.code
assert component_entity.code == f"{str(Path('./tests/test_configs/python').resolve())}:1"
3 changes: 3 additions & 0 deletions sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,7 @@ def parallel_in_pipeline(job_data_path):
},
"name": "node1",
"mini_batch_size": 5,
"partition_keys": None,
"retry_settings": None,
"logging_level": "DEBUG",
"max_concurrency_per_instance": 1,
Expand Down Expand Up @@ -1945,6 +1946,7 @@ def parallel_in_pipeline(job_data_path):
},
"outputs": {},
"mini_batch_size": 1,
"partition_keys": None,
"task": {
"type": "run_function",
"entry_script": "score.py",
Expand Down Expand Up @@ -1992,6 +1994,7 @@ def parallel_in_pipeline(job_data_path):
},
"outputs": {"job_output_path": {"value": "${{parent.outputs.job_out_data}}", "type": "literal"}},
"mini_batch_size": 1,
"partition_keys": None,
"task": {
"type": "run_function",
"entry_script": "score.py",
Expand Down
Loading

0 comments on commit 4c9b464

Please sign in to comment.