Skip to content

Commit

Permalink
[ML][Pipelines]feat: anonymous component reuse (Azure#27008)
Browse files Browse the repository at this point in the history
* feat: internal anonymous component reuse - basic
need to test ignore file & additional includes

* fix: resolve comments

* fix: fix pylint & ci

* test: update internal pipeline job tests recording

* fix: fix ci

* fix: fix ci

* fix: skip some tests in recording mode to pass ci
  • Loading branch information
elliotzh authored Oct 26, 2022
1 parent 2f06050 commit 6e9e5c7
Show file tree
Hide file tree
Showing 56 changed files with 4,246 additions and 4,290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ class InternalInput(Input):
- Enum, enum (new)
"""

def __init__(self, datastore_mode=None, **kwargs):
def __init__(self, *, datastore_mode=None, is_resource=None, **kwargs):
self.datastore_mode = datastore_mode
self.is_resource = is_resource
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -90,7 +91,7 @@ def _get_python_builtin_type_str(self) -> str:
return super()._get_python_builtin_type_str()

@classmethod
def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> Optional["InternalInput"]:
def _from_base(cls, _input: Union[Input, Dict]) -> Optional["InternalInput"]:
"""Cast from Input or Dict to InternalInput. Do not guarantee to create a new object."""
if _input is None:
return None
Expand All @@ -105,8 +106,12 @@ def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> Optional["Inter


class InternalOutput(Output):
def __init__(self, *, datastore_mode=None, **kwargs):
self.datastore_mode = datastore_mode
super().__init__(**kwargs)

@classmethod
def _cast_from_output_or_dict(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
def _from_base(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
if _output is None:
return None
if isinstance(_output, InternalOutput):
Expand Down
198 changes: 198 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_merkle_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=pointless-string-statement

import os
import json
import hashlib
from datetime import datetime
from os import listdir
from os.path import isfile, join
from collections import deque
HASH_FILE_CHUNK_SIZE = 65536
HASH_ALGORITHM = "sha512"

''' Copied from ml-components
Create a merkle tree for the given directory path
The directory would typically represent a project directory'''


def create_merkletree(file_or_folder_path, exclude_function):
root = DirTreeNode("", "Directory",
datetime.fromtimestamp(os.path.getmtime(file_or_folder_path)).isoformat())
if os.path.isdir(file_or_folder_path):
folder_path = file_or_folder_path
_create_merkletree_helper(folder_path, root, exclude_function)
else:
file_path = file_or_folder_path
file_node = DirTreeNode(file_path,
"File",
datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat())
hexdigest_hash, bytehash = _get_hash(os.path.normpath(file_path),
file_path,
"File")
if hexdigest_hash and bytehash:
file_node.add_hash(hexdigest_hash, bytehash)
root.add_child(file_node)

_populate_hashes(root)
return root


''' Populate hashes for directory nodes
by hashing the hashes of child nodes under them'''


def _populate_hashes(rootNode):
if rootNode.is_file():
return rootNode.bytehash
h = hashlib.new(HASH_ALGORITHM)
for child in rootNode.children:
if child.is_file():
h.update(child.bytehash)
else:
h.update(_populate_hashes(child))
rootNode.bytehash = h.digest()
rootNode.hexdigest_hash = h.hexdigest()
return h.digest()


''' Create a merkle tree for the given directory path
:param projectDir: Directory for which to create a tree.
:param rootNode: Root node .
Walks the directory and create a dirTree '''


def _create_merkletree_helper(projectDir, rootNode, exclude_function):
for f in sorted(listdir(projectDir)):
path = os.path.normpath(join(projectDir, f))
if not exclude_function(path):
if isfile(join(projectDir, f)):
newNode = DirTreeNode(f, "File", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
hexdigest_hash, bytehash = _get_hash(path, f, "File")
if hexdigest_hash and bytehash:
newNode.add_hash(hexdigest_hash, bytehash)
rootNode.add_child(newNode)
else:
newNode = DirTreeNode(f, "Directory", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
rootNode.add_child(newNode)
_create_merkletree_helper(path, newNode, exclude_function)


def _get_hash(filePath, name, file_type):
h = hashlib.new(HASH_ALGORITHM)
if not os.access(filePath, os.R_OK):
print(filePath, os.R_OK)
print("Cannot access file, so excluded from snapshot: {}".format(filePath))
return (None, None)
with open(filePath, 'rb') as f:
while True:
data = f.read(HASH_FILE_CHUNK_SIZE)
if not data:
break
h.update(data)
h.update(name.encode('utf-8'))
h.update(file_type.encode('utf-8'))
return (h.hexdigest(), h.digest())


''' We compute both hexdigest and digest for hashes.
digest (bytes) is used so that we can compute the bytehash of a parent directory based on bytehash of its children
hexdigest is used so that we can serialize the tree using json'''


class DirTreeNode(object):
def __init__(self, name=None, file_type=None, timestamp=None, hexdigest_hash=None, bytehash=None):
self.file_type = file_type
self.name = name
self.timestamp = timestamp
self.children = []
self.hexdigest_hash = hexdigest_hash
self.bytehash = bytehash

def load_children_from_dict(self, node_dict):
if len(node_dict.items()) == 0:
return
self.name = node_dict['name']
self.file_type = node_dict['type']
self.hexdigest_hash = node_dict['hash']
self.timestamp = node_dict['timestamp']
for _, child in node_dict['children'].items():
node = DirTreeNode()
node.load_children_from_dict(child)
self.add_child(node)
return self

def load_children_from_json(self, node_dict):
self.name = node_dict['name']
self.file_type = node_dict['type']
self.hexdigest_hash = node_dict['hash']
self.timestamp = node_dict['timestamp']
for child in node_dict['children']:
node = DirTreeNode()
node.load_children_from_json(child)
self.add_child(node)
return self

def load_object_from_dict(self, node_dict):
self.load_children_from_dict(node_dict)

def load_root_object_from_json_string(self, jsondata):
node_dict = json.loads(jsondata)
self.load_children_from_json(node_dict)

def add_hash(self, hexdigest_hash, bytehash):
self.hexdigest_hash = hexdigest_hash
self.bytehash = bytehash

def add_child(self, node):
self.children.append(node)

def is_file(self):
return self.file_type == "File"

''' Only for debugging purposes'''
def print_tree(self):
queue = deque()
print("Name: " + self.name)
print("Type: " + self.file_type)
for child in self.children:
print(' ' + child.name)
queue.append(child)
for i in queue:
i.print_tree()


''' Serialize merkle tree.
Serialize all fields except digest (bytes)
'''


class DirTreeJsonEncoder(json.JSONEncoder):
def default(self, o):
if not isinstance(o, DirTreeNode):
return super(DirTreeJsonEncoder, self).default(o)
_dict = o.__dict__
_dict.pop("bytehash", None)
_dict['type'] = _dict.pop('file_type')
_dict['hash'] = _dict.pop('hexdigest_hash')

return _dict


class DirTreeJsonEncoderV2(json.JSONEncoder):
def default(self, o):
if not isinstance(o, DirTreeNode):
return super(DirTreeJsonEncoderV2, self).default(o)
_dict = o.__dict__
_dict.pop("bytehash", None)
if 'file_type' in _dict:
_dict['type'] = _dict.pop('file_type')
if 'hexdigest_hash' in _dict:
_dict['hash'] = _dict.pop('hexdigest_hash')
if isinstance(_dict['children'], list):
_dict['children'] = {x.name: x for x in _dict['children']}

return _dict
36 changes: 36 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from typing import Optional

from ...entities._assets import Code


class InternalCode(Code):
@property
def _upload_hash(self) -> Optional[str]:
# This property will be used to identify the uploaded content when trying to
# upload to datastore. The tracebacks will be as below:
# Traceback (most recent call last):
# _artifact_utilities._check_and_upload_path
# _artifact_utilities._upload_to_datastore
# _artifact_utilities.upload_artifact
# _blob_storage_helper.upload
# where asset id will be calculated based on the upload hash.

if self._is_anonymous is True:
# Name of an anonymous internal code is the same as its snapshot id
# in ml-component, use it as the upload hash to avoid duplicate hash
# calculation with _asset_utils.get_object_hash.
return self.name

return getattr(super(InternalCode, self), "_upload_hash")

def __setattr__(self, key, value):
if key == "name" and hasattr(self, key) and self._is_anonymous is True and value != self.name:
raise AttributeError(
"InternalCode name are calculated based on its content and cannot "
"be changed: current name is {} and new value is {}".format(self.name, value)
)
super().__setattr__(key, value)
66 changes: 39 additions & 27 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
# pylint: disable=protected-access, redefined-builtin
# disable redefined-builtin to use id/type as argument name
from contextlib import contextmanager
from os import PathLike
from typing import Dict, Union
from uuid import UUID

from marshmallow import INCLUDE, Schema

Expand All @@ -16,13 +18,16 @@
from azure.ai.ml.entities._system_data import SystemData
from azure.ai.ml.entities._util import convert_ordered_dict_to_dict
from azure.ai.ml.entities._validation import MutableValidationResult
from ._merkle_tree import create_merkletree

from ... import Input, Output
from ..._utils._asset_utils import IgnoreFile, get_ignore_file
from .._schema.component import InternalBaseComponentSchema
from ._additional_includes import _AdditionalIncludes
from ._input_outputs import InternalInput, InternalOutput
from .environment import InternalEnvironment
from .node import InternalBaseNode
from .code import InternalCode


class InternalComponent(Component):
Expand Down Expand Up @@ -127,40 +132,16 @@ def __init__(
self.ae365exepool = ae365exepool
self.launcher = launcher

# add some internal specific attributes to inputs/outputs after super().__init__()
self._post_process_internal_inputs_outputs(inputs, outputs)

@classmethod
def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
component_io = {}
for name, port in io_dict.items():
if is_input:
component_io[name] = InternalInput._cast_from_input_or_dict(port)
component_io[name] = InternalInput._from_base(port)
else:
component_io[name] = InternalOutput._cast_from_output_or_dict(port)
component_io[name] = InternalOutput._from_base(port)
return component_io

def _post_process_internal_inputs_outputs(
self,
inputs_dict: Union[Dict, Input, Output],
outputs_dict: Union[Dict, Input, Output],
):
for io_name, io_object in self.inputs.items():
original = inputs_dict[io_name]
# force append attribute for internal inputs
if isinstance(original, dict):
for attr_name in ["is_resource"]:
if attr_name in original:
io_object.__setattr__(attr_name, original[attr_name])

for io_name, io_object in self.outputs.items():
original = outputs_dict[io_name]
# force append attribute for internal inputs
if isinstance(original, dict):
for attr_name in ["datastore_mode"]:
if attr_name in original:
io_object.__setattr__(attr_name, original[attr_name])

@property
def _additional_includes(self):
if self.__additional_includes is None:
Expand Down Expand Up @@ -221,8 +202,27 @@ def _to_rest_object(self) -> ComponentVersionData:
result.name = self.name
return result

@classmethod
def _get_snapshot_id(cls, code_path: Union[str, PathLike]) -> str:
"""Get the snapshot id of a component with specific working directory in ml-components.
Use this as the name of code asset to reuse steps in a pipeline job from ml-components runs.
:param code_path: The path of the working directory.
:type code_path: str
:return: The snapshot id of a component in ml-components with code_path as its working directory.
"""
_ignore_file: IgnoreFile = get_ignore_file(code_path)
curr_root = create_merkletree(code_path, lambda x: _ignore_file.is_file_excluded(code_path))
snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
return snapshot_id

@contextmanager
def _resolve_local_code(self):
"""Create a Code object pointing to local code and yield it."""
# Note that if self.code is already a Code object, this function won't be called
# in create_or_update => _try_resolve_code_for_component, which is also
# forbidden by schema CodeFields for now.

self._additional_includes.resolve()

# file dependency in code will be read during internal environment resolution
Expand All @@ -232,7 +232,19 @@ def _resolve_local_code(self):
if isinstance(self.environment, InternalEnvironment):
self.environment.resolve(self._additional_includes.code)
# use absolute path in case temp folder & work dir are in different drive
yield self._additional_includes.code.absolute()
tmp_code_dir = self._additional_includes.code.absolute()
# Use the snapshot id in ml-components as code name to enable anonymous
# component reuse from ml-component runs.
# calculate snapshot id here instead of inside InternalCode to ensure that
# snapshot id is calculated based on the resolved code path
yield InternalCode(
name=self._get_snapshot_id(tmp_code_dir),
version="1",
base_path=self._base_path,
path=tmp_code_dir,
is_anonymous=True,
)

self._additional_includes.cleanup()

def __call__(self, *args, **kwargs) -> InternalBaseNode: # pylint: disable=useless-super-delegation
Expand Down
Loading

0 comments on commit 6e9e5c7

Please sign in to comment.