Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AL-2038] Faster transforms #2094

Merged
merged 51 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
1d0fe78
init
FayazRahman Dec 28, 2022
2e82a74
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Dec 29, 2022
7257b66
transforms
FayazRahman Dec 29, 2022
c96e488
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 2, 2023
5147017
numpy_only
FayazRahman Jan 2, 2023
1bd6dcb
groups
FayazRahman Jan 3, 2023
7f50e78
transform tests passing
FayazRahman Jan 4, 2023
c988ad1
cache_size opt + fix
FayazRahman Jan 4, 2023
91ed446
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 4, 2023
0f8a8cd
label fix
FayazRahman Jan 5, 2023
c157c3b
pgbar
FayazRahman Jan 5, 2023
23df780
fix
FayazRahman Jan 5, 2023
fbe14a3
lint
FayazRahman Jan 5, 2023
6a6c263
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 15, 2023
9e6d2ab
pg fix
FayazRahman Jan 15, 2023
a8ac859
pgbar update
FayazRahman Jan 15, 2023
69dfef3
merge confl
FayazRahman Jan 15, 2023
cb0a863
pg update
FayazRahman Jan 15, 2023
9ddef82
serial label sync
FayazRahman Jan 15, 2023
937f34b
fix
FayazRahman Jan 16, 2023
03837fb
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 17, 2023
6b30639
smol fix
FayazRahman Jan 17, 2023
c414eab
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 19, 2023
0a4a5e5
merge confl
FayazRahman Jan 24, 2023
ff949c6
merge confl
FayazRahman Jan 31, 2023
9ef8eed
fix
FayazRahman Jan 31, 2023
301ff26
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Feb 7, 2023
c8c17e4
fix
FayazRahman Feb 7, 2023
62bdeb5
fixes
farizrahman4u Feb 8, 2023
ab65822
rem dbg lines
farizrahman4u Feb 8, 2023
5da8698
none fix
farizrahman4u Feb 8, 2023
4a8446f
add test
FayazRahman Feb 8, 2023
5b6a1fe
add another test
FayazRahman Feb 8, 2023
c2640fa
merge inplace fix
FayazRahman Feb 8, 2023
5b378b3
exception for none polygon inside sample
FayazRahman Feb 9, 2023
3bfe558
merge main
FayazRahman Feb 13, 2023
1a2f9e1
update
FayazRahman Feb 15, 2023
cfa5667
fix
FayazRahman Feb 15, 2023
169af14
add test
FayazRahman Feb 15, 2023
2abbdf1
more tests
FayazRahman Feb 15, 2023
b4609d1
fix
FayazRahman Feb 16, 2023
d4752d4
remove unused
FayazRahman Feb 16, 2023
3bbdbbe
merge main + refactor
FayazRahman Feb 20, 2023
3be8d22
fix
FayazRahman Feb 21, 2023
18a8e6d
updates
FayazRahman Apr 5, 2023
f335744
black
FayazRahman Apr 5, 2023
307a905
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Apr 8, 2023
4645592
mypy
FayazRahman Apr 8, 2023
6c7d0c7
black
FayazRahman Apr 10, 2023
83be600
refactor
FayazRahman Apr 12, 2023
357d0dd
black
FayazRahman Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions deeplake/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,10 @@ def _convert_to_list(self, samples):
return False

def check_each_sample(self, samples, verify=True):
# overridden in LinkedChunkEngine
return

def _sanitize_samples(self, samples, verify=True):
def _sanitize_samples(self, samples, verify=True, pg_callback=None):
check_samples_type(samples)
if isinstance(samples, list):
samples = [
Expand Down Expand Up @@ -994,7 +995,9 @@ def _extend(self, samples, progressbar, pg_callback=None, update_commit_diff=Tru
return
if len(samples) == 0:
return
samples, verified_samples = self._sanitize_samples(samples)
samples, verified_samples = self._sanitize_samples(
samples, pg_callback=pg_callback
)
self._samples_to_chunks(
samples,
start_chunk=self.last_appended_chunk(),
Expand Down Expand Up @@ -1025,15 +1028,22 @@ def extend(
if self.is_sequence:
samples = tqdm(samples) if progressbar else samples
verified_samples = []
for sample in samples:
if sample is None:
sample = []
verified_sample = self._extend(
sample, progressbar=False, update_commit_diff=False
)
self.sequence_encoder.register_samples(len(sample), 1)
self.commit_diff.add_data(1)
verified_samples.append(verified_sample or sample)
num_samples_added = 0
try:
for sample in samples:
if sample is None:
sample = []
verified_sample = self._extend(
sample, progressbar=False, update_commit_diff=False
)
self.sequence_encoder.register_samples(len(sample), 1)
self.commit_diff.add_data(1)
num_samples_added += 1
verified_samples.append(verified_sample or sample)
except Exception:
for _ in range(num_samples_added):
self.pop()
raise
if link_callback:
samples = [
None if is_empty_list(s) else s for s in verified_samples
Expand Down Expand Up @@ -2051,6 +2061,9 @@ def list_all_chunks(self) -> List[str]:
"""Return list of all chunks for current `version_state['commit_id']` and tensor"""
commit_id = self.commit_id
if commit_id == FIRST_COMMIT_ID:
arr = self.chunk_id_encoder._encoded
if not arr.size:
return []
return [
ChunkIdEncoder.name_from_id(chunk_id)
for chunk_id in self.chunk_id_encoder._encoded[:, CHUNK_ID_COLUMN]
Expand Down
3 changes: 2 additions & 1 deletion deeplake/core/linked_chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,13 @@ def check_each_sample(self, samples, verify=True):
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=verify and self.verify,
verify=_verify,
)
)
except Exception as e:
Expand Down
6 changes: 6 additions & 0 deletions deeplake/core/polygon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from deeplake.util.exceptions import EmptyPolygonError
from typing import Union, List

import numpy as np
import deeplake

Expand All @@ -7,6 +9,10 @@ class Polygon:
"""Represents a polygon."""

def __init__(self, coords: Union[np.ndarray, List[float]], dtype="float32"):
if coords is None or len(coords) == 0:
raise EmptyPolygonError(
"A polygons sample can be empty or None but a polygon within a sample cannot be empty or None."
)
self.coords = coords
self.dtype = dtype

Expand Down
54 changes: 53 additions & 1 deletion deeplake/core/transform/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import numpy as np
from click.testing import CliRunner
from deeplake.core.storage.memory import MemoryProvider
from deeplake.core.transform.transform_tensor import TransformTensor
from deeplake.core.version_control.test_version_control import (
compare_dataset_diff,
compare_tensor_diff,
Expand Down Expand Up @@ -1439,3 +1438,56 @@ def upload(item, ds):
assert ds["boxes"].meta.max_shape == [20, 4]

assert ds["labels"].numpy().shape == (40, 10)


def test_transform_numpy_only(local_ds):
@deeplake.compute
def upload(i, ds):
ds.abc.extend(i * np.ones((10, 5, 5)))

with local_ds as ds:
ds.create_tensor("abc")

upload().eval(list(range(100)), ds, num_workers=2)

assert len(local_ds) == 1000

for i in range(100):
np.testing.assert_array_equal(
ds.abc[i * 10 : (i + 1) * 10].numpy(), i * np.ones((10, 5, 5))
)


@deeplake.compute
def add_samples(i, ds, flower_path):
ds.abc.extend(i * np.ones((5, 5, 5)))
ds.images.extend([deeplake.read(flower_path) for _ in range(5)])


@deeplake.compute
def mul_by_2(sample_in, samples_out):
samples_out.abc.append(2 * sample_in.abc.numpy())
samples_out.images.append(sample_in.images.numpy() - 1)


def test_pipeline(local_ds, flower_path):
pipeline = deeplake.compose([add_samples(flower_path), mul_by_2()])

flower_arr = np.array(deeplake.read(flower_path))

with local_ds as ds:
ds.create_tensor("abc")
ds.create_tensor("images", htype="image", sample_compression="png")

pipeline.eval(list(range(10)), ds, num_workers=2)

assert len(local_ds) == 50

for i in range(10):
np.testing.assert_array_equal(
ds.abc[i * 5 : (i + 1) * 5].numpy(), i * 2 * np.ones((5, 5, 5))
)
np.testing.assert_array_equal(
ds.images[i * 5 : (i + 1) * 5].numpy(),
np.tile(flower_arr - 1, (5, 1, 1, 1)),
)
32 changes: 18 additions & 14 deletions deeplake/core/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from deeplake.util.version_control import auto_checkout
from deeplake.util.class_label import sync_labels

import posixpath


class ComputeFunction:
def __init__(self, func, args, kwargs, name: Optional[str] = None):
Expand All @@ -55,6 +57,7 @@ def eval(
check_lengths: bool = True,
pad_data_in: bool = False,
read_only_ok: bool = False,
cache_size: int = 16,
checkpoint_interval: int = 0,
ignore_errors: bool = False,
**kwargs,
Expand All @@ -78,6 +81,7 @@ def eval(
Defaults to False.
read_only_ok (bool): If ``True`` and output dataset is same as input dataset, the read-only check is skipped. This can be used to read data in parallel without making changes to underlying dataset.
Defaults to False.
cache_size (int): Cache size to be used by transform per worker.
checkpoint_interval (int): If > 0, the transform will be checkpointed with a commit every ``checkpoint_interval`` input samples to avoid restarting full transform due to intermitten failures. If the transform is interrupted, the intermediate data is deleted and the dataset is reset to the last commit.
If <= 0, no checkpointing is done. Checkpoint interval should be a multiple of num_workers if num_workers > 0. Defaults to 0.
ignore_errors (bool): If ``True``, input samples that causes transform to fail will be skipped and the errors will be ignored **if possible**.
Expand All @@ -102,6 +106,7 @@ def eval(
check_lengths,
pad_data_in,
read_only_ok,
cache_size,
checkpoint_interval,
ignore_errors,
**kwargs,
Expand Down Expand Up @@ -130,6 +135,7 @@ def eval(
check_lengths: bool = True,
pad_data_in: bool = False,
read_only_ok: bool = False,
cache_size: int = 16,
checkpoint_interval: int = 0,
ignore_errors: bool = False,
**kwargs,
Expand All @@ -153,6 +159,7 @@ def eval(
Defaults to ``False``.
read_only_ok (bool): If ``True`` and output dataset is same as input dataset, the read-only check is skipped.
Defaults to False.
cache_size (int): Cache size to be used by transform per worker.
checkpoint_interval (int): If > 0, the transform will be checkpointed with a commit every ``checkpoint_interval`` input samples to avoid restarting full transform due to intermitten failures. If the transform is interrupted, the intermediate data is deleted and the dataset is reset to the last commit.
If <= 0, no checkpointing is done. Checkpoint interval should be a multiple of num_workers if num_workers > 0. Defaults to 0.
ignore_errors (bool): If ``True``, input samples that causes transform to fail will be skipped and the errors will be ignored **if possible**.
Expand Down Expand Up @@ -218,7 +225,7 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
initial_autoflush = target_ds.storage.autoflush
target_ds.storage.autoflush = False

if not check_lengths:
if not check_lengths or read_only_ok:
skip_ok = True

checkpointing_enabled = checkpoint_interval > 0
Expand Down Expand Up @@ -267,6 +274,7 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
overwrite,
skip_ok,
read_only_ok and overwrite,
cache_size,
pbar,
pqueue,
ignore_errors,
Expand All @@ -286,11 +294,12 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
index, sample = None, None
if isinstance(e, TransformError):
index, sample = e.index, e.sample
e = e.__cause__ # type: ignore
raise TransformError(
index=index,
sample=sample,
samples_processed=samples_processed,
).with_traceback(e.__traceback__)
) from e
finally:
reload_and_rechunk(
overwrite,
Expand All @@ -316,6 +325,7 @@ def run(
overwrite: bool = False,
skip_ok: bool = False,
read_only: bool = False,
cache_size: int = 16,
pbar=None,
pqueue=None,
ignore_errors: bool = False,
Expand All @@ -340,16 +350,14 @@ def run(
else []
)
label_temp_tensors = {}
actual_tensors = (
None
if not class_label_tensors
else [target_ds[t].key for t in target_ds.tensors]
)

visible_tensors = list(target_ds.tensors)
visible_tensors = [target_ds[t].key for t in visible_tensors]

if not read_only:
for tensor in class_label_tensors:
actual_tensor = target_ds[tensor]
temp_tensor = f"__temp{tensor}_{uuid4().hex[:4]}"
temp_tensor = f"__temp{posixpath.relpath(tensor, target_ds.group_index)}_{uuid4().hex[:4]}"
with target_ds:
temp_tensor_obj = target_ds.create_tensor(
temp_tensor,
Expand All @@ -361,13 +369,9 @@ def run(
create_id_tensor=False,
)
temp_tensor_obj.meta._disable_temp_transform = True
label_temp_tensors[tensor] = temp_tensor
label_temp_tensors[tensor] = temp_tensor_obj.key
target_ds.flush()

visible_tensors = list(target_ds.tensors)
visible_tensors = [target_ds[t].key for t in visible_tensors]
visible_tensors = list(set(visible_tensors) - set(class_label_tensors))

tensors = list(target_ds._tensors())
tensors = [target_ds[t].key for t in tensors]
tensors = list(set(tensors) - set(class_label_tensors))
Expand All @@ -384,12 +388,12 @@ def run(
tensors,
visible_tensors,
label_temp_tensors,
actual_tensors,
self,
version_state,
target_ds.link_creds,
skip_ok,
extend_only,
cache_size,
ignore_errors,
)
map_inp = zip(slices, offsets, storages, repeat(args))
Expand Down
Loading