Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AL-2038] Faster transforms #2094

Merged
merged 51 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
1d0fe78
init
FayazRahman Dec 28, 2022
2e82a74
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Dec 29, 2022
7257b66
transforms
FayazRahman Dec 29, 2022
c96e488
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 2, 2023
5147017
numpy_only
FayazRahman Jan 2, 2023
1bd6dcb
groups
FayazRahman Jan 3, 2023
7f50e78
transform tests passing
FayazRahman Jan 4, 2023
c988ad1
cache_size opt + fix
FayazRahman Jan 4, 2023
91ed446
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 4, 2023
0f8a8cd
label fix
FayazRahman Jan 5, 2023
c157c3b
pgbar
FayazRahman Jan 5, 2023
23df780
fix
FayazRahman Jan 5, 2023
fbe14a3
lint
FayazRahman Jan 5, 2023
6a6c263
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 15, 2023
9e6d2ab
pg fix
FayazRahman Jan 15, 2023
a8ac859
pgbar update
FayazRahman Jan 15, 2023
69dfef3
merge confl
FayazRahman Jan 15, 2023
cb0a863
pg update
FayazRahman Jan 15, 2023
9ddef82
serial label sync
FayazRahman Jan 15, 2023
937f34b
fix
FayazRahman Jan 16, 2023
03837fb
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 17, 2023
6b30639
smol fix
FayazRahman Jan 17, 2023
c414eab
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Jan 19, 2023
0a4a5e5
merge confl
FayazRahman Jan 24, 2023
ff949c6
merge confl
FayazRahman Jan 31, 2023
9ef8eed
fix
FayazRahman Jan 31, 2023
301ff26
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Feb 7, 2023
c8c17e4
fix
FayazRahman Feb 7, 2023
62bdeb5
fixes
farizrahman4u Feb 8, 2023
ab65822
rem dbg lines
farizrahman4u Feb 8, 2023
5da8698
none fix
farizrahman4u Feb 8, 2023
4a8446f
add test
FayazRahman Feb 8, 2023
5b6a1fe
add another test
FayazRahman Feb 8, 2023
c2640fa
merge inplace fix
FayazRahman Feb 8, 2023
5b378b3
exception for none polygon inside sample
FayazRahman Feb 9, 2023
3bfe558
merge main
FayazRahman Feb 13, 2023
1a2f9e1
update
FayazRahman Feb 15, 2023
cfa5667
fix
FayazRahman Feb 15, 2023
169af14
add test
FayazRahman Feb 15, 2023
2abbdf1
more tests
FayazRahman Feb 15, 2023
b4609d1
fix
FayazRahman Feb 16, 2023
d4752d4
remove unused
FayazRahman Feb 16, 2023
3bbdbbe
merge main + refactor
FayazRahman Feb 20, 2023
3be8d22
fix
FayazRahman Feb 21, 2023
18a8e6d
updates
FayazRahman Apr 5, 2023
f335744
black
FayazRahman Apr 5, 2023
307a905
Merge branch 'main' of https://github.com/activeloopai/deeplake into …
FayazRahman Apr 8, 2023
4645592
mypy
FayazRahman Apr 8, 2023
6c7d0c7
black
FayazRahman Apr 10, 2023
83be600
refactor
FayazRahman Apr 12, 2023
357d0dd
black
FayazRahman Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions deeplake/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,10 @@ def _convert_to_list(self, samples):
return False

def check_each_sample(self, samples, verify=True):
# overridden in LinkedChunkEngine
return

def _sanitize_samples(self, samples, verify=True):
def _sanitize_samples(self, samples, verify=True, pg_callback=None):
check_samples_type(samples)
if isinstance(samples, list):
samples = [None if is_empty_list(sample) else sample for sample in samples]
Expand Down Expand Up @@ -969,7 +970,9 @@ def _extend(self, samples, progressbar, pg_callback=None, update_commit_diff=Tru
return
if len(samples) == 0:
return
samples, verified_samples = self._sanitize_samples(samples)
samples, verified_samples = self._sanitize_samples(
samples, pg_callback=pg_callback
)
self._samples_to_chunks(
samples,
start_chunk=self.last_appended_chunk(),
Expand Down Expand Up @@ -1968,6 +1971,9 @@ def list_all_chunks(self) -> List[str]:
"""Return list of all chunks for current `version_state['commit_id']` and tensor"""
commit_id = self.commit_id
if commit_id == FIRST_COMMIT_ID:
arr = self.chunk_id_encoder._encoded
if not arr.size:
return []
return [
ChunkIdEncoder.name_from_id(chunk_id)
for chunk_id in self.chunk_id_encoder._encoded[:, CHUNK_ID_COLUMN]
Expand Down
17 changes: 10 additions & 7 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,7 @@ def __init__(
d["_temp_tensors"] = []
dct = self.__dict__
dct.update(d)
dct["enabled_tensors"] = (
set(self._resolve_tensor_list(enabled_tensors, root=True))
if enabled_tensors
else None
)

try:
self._set_derived_attributes()
except LockedException:
Expand All @@ -246,6 +242,11 @@ def __init__(
raise ReadOnlyModeError(
"This dataset cannot be open for writing as you don't have permissions. Try loading the dataset with `read_only=True."
)
dct["enabled_tensors"] = (
set(self._resolve_tensor_list(enabled_tensors, root=True))
if enabled_tensors
else None
)
self._first_load_init()
self._initial_autoflush: List[
bool
Expand Down Expand Up @@ -306,8 +307,10 @@ def __len__(self, warn: bool = True):
"""Returns the length of the smallest tensor."""
tensor_lengths = [len(tensor) for tensor in self.tensors.values()]
pad_tensors = self._pad_tensors
if not pad_tensors and min(tensor_lengths, default=0) != max(
tensor_lengths, default=0
if (
warn
and not pad_tensors
and min(tensor_lengths, default=0) != max(tensor_lengths, default=0)
):
warning(
"The length of tensors in the dataset is different. The len(ds) returns the length of the "
Expand Down
3 changes: 2 additions & 1 deletion deeplake/core/linked_chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,13 @@ def check_each_sample(self, samples, verify=True):
verified_samples.append(sample)
else:
try:
_verify = verify and self.verify
verified_samples.append(
read_linked_sample(
sample.path,
sample.creds_key,
self.link_creds,
verify=verify and self.verify,
verify=_verify,
)
)
except Exception as e:
Expand Down
6 changes: 6 additions & 0 deletions deeplake/core/polygon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from deeplake.util.exceptions import EmptyPolygonError
from typing import Union, List

import numpy as np
import deeplake

Expand All @@ -7,6 +9,10 @@ class Polygon:
"""Represents a polygon."""

def __init__(self, coords: Union[np.ndarray, List[float]], dtype="float32"):
if coords is None or len(coords) == 0:
raise EmptyPolygonError(
"A polygons sample can be empty or None but a polygon within a sample cannot be empty or None."
)
self.coords = coords
self.dtype = dtype

Expand Down
6 changes: 5 additions & 1 deletion deeplake/core/tensor_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ def extend_shape(samples, link_creds=None, tensor_meta=None):
]
mixed_ndim = False
try:
arr = np.array(shapes)
if len(set(map(len, shapes))) > 1:
dtype = object
else:
dtype = None
arr = np.array(shapes, dtype=dtype)
if arr.dtype == object:
mixed_ndim = True
except ValueError:
Expand Down
100 changes: 99 additions & 1 deletion deeplake/core/transform/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import numpy as np
from click.testing import CliRunner
from deeplake.core.storage.memory import MemoryProvider
from deeplake.core.transform.transform_tensor import TransformTensor
from deeplake.core.version_control.test_version_control import (
compare_dataset_diff,
compare_tensor_diff,
Expand Down Expand Up @@ -804,6 +803,52 @@ def test_inplace_transform_non_head(local_ds_generator):
check_target_array(ds, i, target)


def test_inplace_transform_bug(local_ds_generator):
@deeplake.compute
def construct(sample_in, sample_out):
sample_out.append({"positive": [1, 2, 3], "negative": [4, 5, 6]})

ds = local_ds_generator()
with ds:
ds.create_tensor("id")
ds.id.extend(list(range(10)))

ds.create_tensor("positive")
ds.create_tensor("negative")

for _ in range(0, ds.max_len):
construct().eval(
ds,
num_workers=2,
skip_ok=True,
check_lengths=False,
pad_data_in=True,
)

np.testing.assert_array_equal(
ds.positive.numpy(aslist=True), [np.array([1, 2, 3])] * 10
)
np.testing.assert_array_equal(
ds.negative.numpy(aslist=True), [np.array([4, 5, 6])] * 10
)


def test_inplace_transform_bug_2(local_ds_generator):
@deeplake.compute
def tform(sample_in, sample_out):
sample_out.text2.append(sample_in.text.text())

ds = local_ds_generator()
with ds:
ds.create_tensor("text", htype="text", sample_compression="lz4")
ds.text.extend(["abcd", "efgh", "hijk"] * 10)
ds.create_tensor("text2", htype="text", sample_compression="lz4")
tform().eval(ds[["text"]], ds, num_workers=2, check_lengths=False)

np.testing.assert_array_equal(ds.text.text(), ["abcd", "efgh", "hijk"] * 10)
np.testing.assert_array_equal(ds.text2.text(), ["abcd", "efgh", "hijk"] * 10)


def test_inplace_transform_clear_chunks(local_ds_generator):
ds = local_ds_generator()

Expand Down Expand Up @@ -1196,3 +1241,56 @@ def test_downsample_transform(local_ds):
assert len(ds[tensor]) == 10
for i in range(10):
assert ds[tensor][i].shape == shape


def test_transform_numpy_only(local_ds):
@deeplake.compute
def upload(i, ds):
ds.abc.extend(i * np.ones((10, 5, 5)))

with local_ds as ds:
ds.create_tensor("abc")

upload().eval(list(range(100)), ds, num_workers=2)

assert len(local_ds) == 1000

for i in range(100):
np.testing.assert_array_equal(
ds.abc[i * 10 : (i + 1) * 10].numpy(), i * np.ones((10, 5, 5))
)


@deeplake.compute
def add_samples(i, ds, flower_path):
ds.abc.extend(i * np.ones((5, 5, 5)))
ds.images.extend([deeplake.read(flower_path) for _ in range(5)])


@deeplake.compute
def mul_by_2(sample_in, samples_out):
samples_out.abc.append(2 * sample_in.abc.numpy())
samples_out.images.append(sample_in.images.numpy() - 1)


def test_pipeline(local_ds, flower_path):
pipeline = deeplake.compose([add_samples(flower_path), mul_by_2()])

flower_arr = np.array(deeplake.read(flower_path))

with local_ds as ds:
ds.create_tensor("abc")
ds.create_tensor("images", htype="image", sample_compression="png")

pipeline.eval(list(range(10)), ds, num_workers=2)

assert len(local_ds) == 50

for i in range(10):
np.testing.assert_array_equal(
ds.abc[i * 5 : (i + 1) * 5].numpy(), i * 2 * np.ones((5, 5, 5))
)
np.testing.assert_array_equal(
ds.images[i * 5 : (i + 1) * 5].numpy(),
np.tile(flower_arr - 1, (5, 1, 1, 1)),
)
22 changes: 15 additions & 7 deletions deeplake/core/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from deeplake.util.class_label import sync_labels
import sys

import posixpath


class ComputeFunction:
def __init__(self, func, args, kwargs, name: Optional[str] = None):
Expand All @@ -56,6 +58,7 @@ def eval(
check_lengths: bool = True,
pad_data_in: bool = False,
read_only_ok: bool = False,
cache_size: int = 16,
**kwargs,
):
"""Evaluates the ComputeFunction on data_in to produce an output dataset ds_out.
Expand All @@ -77,6 +80,7 @@ def eval(
Defaults to False.
read_only_ok (bool): If ``True`` and output dataset is same as input dataset, the read-only check is skipped. This can be used to read data in parallel without making changes to underlying dataset.
Defaults to False.
cache_size (int): Cache size to be used by transform per worker.
**kwargs: Additional arguments.

Raises:
Expand All @@ -97,6 +101,7 @@ def eval(
check_lengths,
pad_data_in,
read_only_ok,
cache_size,
**kwargs,
)

Expand All @@ -123,6 +128,7 @@ def eval(
check_lengths: bool = True,
pad_data_in: bool = False,
read_only_ok: bool = False,
cache_size: int = 16,
**kwargs,
):
"""Evaluates the pipeline on ``data_in`` to produce an output dataset ``ds_out``.
Expand All @@ -144,6 +150,7 @@ def eval(
Defaults to ``False``.
read_only_ok (bool): If ``True`` and output dataset is same as input dataset, the read-only check is skipped.
Defaults to False.
cache_size (int): Cache size to be used by transform per worker.
**kwargs: Additional arguments.

Raises:
Expand Down Expand Up @@ -227,6 +234,7 @@ def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
overwrite,
skip_ok,
read_only_ok and overwrite,
cache_size,
**kwargs,
)
target_ds._send_compute_progress(**progress_end_args, status="success")
Expand Down Expand Up @@ -255,6 +263,7 @@ def run(
overwrite: bool = False,
skip_ok: bool = False,
read_only: bool = False,
cache_size: int = 16,
**kwargs,
):
"""Runs the pipeline on the input data to produce output samples and stores in the dataset.
Expand Down Expand Up @@ -282,10 +291,13 @@ def run(
else [target_ds[t].key for t in target_ds.tensors]
)

visible_tensors = list(target_ds.tensors)
visible_tensors = [target_ds[t].key for t in visible_tensors]

if not read_only:
for tensor in class_label_tensors:
actual_tensor = target_ds[tensor]
temp_tensor = f"__temp{tensor}_{uuid4().hex[:4]}"
temp_tensor = f"__temp{posixpath.relpath(tensor, target_ds.group_index)}_{uuid4().hex[:4]}"
with target_ds:
temp_tensor_obj = target_ds.create_tensor(
temp_tensor,
Expand All @@ -297,13 +309,9 @@ def run(
create_id_tensor=False,
)
temp_tensor_obj.meta._disable_temp_transform = True
label_temp_tensors[tensor] = temp_tensor
label_temp_tensors[tensor] = temp_tensor_obj.key
target_ds.flush()

visible_tensors = list(target_ds.tensors)
visible_tensors = [target_ds[t].key for t in visible_tensors]
visible_tensors = list(set(visible_tensors) - set(class_label_tensors))

tensors = list(target_ds._tensors())
tensors = [target_ds[t].key for t in tensors]
tensors = list(set(tensors) - set(class_label_tensors))
Expand All @@ -326,9 +334,9 @@ def run(
target_ds.link_creds,
skip_ok,
extend_only,
cache_size,
)
map_inp = zip(slices, storages, repeat(args))

try:
if progressbar:
desc = get_pbar_description(self.functions)
Expand Down
Loading