Skip to content

Commit

Permalink
handle local access method
Browse files Browse the repository at this point in the history
  • Loading branch information
FayazRahman committed Mar 11, 2023
1 parent c9d266f commit 1f9db8f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 49 deletions.
121 changes: 85 additions & 36 deletions deeplake/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
UnsupportedParameterException,
DatasetCorruptError,
CheckoutError,
ReadOnlyModeError,
)
from deeplake.util.storage import (
get_storage_and_cache_chain,
Expand Down Expand Up @@ -136,6 +137,11 @@ def init(
If dataset needs to be downloaded, 'local:2:processed' will use 2 workers and use processed scheduler, while 'local:3' will use 3 workers
and default scheduler (threaded), and 'local:processed' will use a single worker and use processed scheduler.
reset (bool): If the specified dataset cannot be loaded due to a corrupted HEAD state of the branch being loaded,
setting ``reset=True`` will reset HEAD changes and load the previous version.
# noqa: DAR101 reset
Returns:
Dataset: Dataset created using the arguments provided.
Expand All @@ -144,6 +150,10 @@ def init(
UserNotLoggedInException: When user is not logged in
InvalidTokenException: If the specified token is invalid
TokenPermissionError: When there are permission or other errors related to token
CheckoutError: If version address specified in the path cannot be found
DatasetCorruptError: If loading the dataset failed due to corruption and `reset` is not `True`
ValueError: If version is specified in the path when creating a dataset
Exception: Re-raises caught exception if reset cannot fix the issue
Danger:
Setting ``overwrite`` to ``True`` will delete all of your data if it exists! Be very careful when setting this parameter.
Expand Down Expand Up @@ -225,6 +235,7 @@ def init(
"ds_exists": ds_exists,
"num_workers": num_workers,
"scheduler": scheduler,
"reset": reset,
}

try:
Expand All @@ -234,20 +245,25 @@ def init(
except Exception as e:
if create:
raise e
if not reset:
if isinstance(e, DatasetCorruptError):
if access_method == "stream":
if not reset:
if isinstance(e, DatasetCorruptError):
raise DatasetCorruptError(
message=e.message,
action="Try using `reset=True` to reset HEAD changes and load the previous commit.",
cause=e.__cause__,
)
raise DatasetCorruptError(
message=e.message,
action="Try using `reset=True` to reset HEAD changes and load the previous commit.",
cause=e.__cause__,
"Exception occured (see Traceback). The dataset maybe corrupted."
"Try using `reset=True` to reset HEAD changes and load the previous commit."
) from e
if storage.read_only:
raise ReadOnlyModeError(
"Cannot reset when loading dataset in read-only mode."
)
raise DatasetCorruptError(
"Exception occured (see Traceback). The dataset maybe corrupted."
"Try using `reset=True` to reset HEAD changes and load the previous commit."
) from e
return dataset._reset_and_load(
cache_chain, access_method, dataset_kwargs, version, e
)
return dataset._reset_and_load(
cache_chain, access_method, dataset_kwargs, version, e
)

@staticmethod
def exists(
Expand All @@ -264,6 +280,9 @@ def exists(
Returns:
A boolean confirming whether the dataset exists or not at the given path.
Raises:
ValueError: If version is specified in the path
"""
path, version = process_dataset_path(path)

Expand Down Expand Up @@ -327,6 +346,7 @@ def empty(
UserNotLoggedInException: When user is not logged in
InvalidTokenException: If the specified toke is invalid
TokenPermissionError: When there are permission or other errors related to token
ValueError: If version is specified in the path
Danger:
Setting ``overwrite`` to ``True`` will delete all of your data if it exists! Be very careful when setting this parameter.
Expand Down Expand Up @@ -440,6 +460,11 @@ def load(
If dataset needs to be downloaded, 'local:2:processed' will use 2 workers and use processed scheduler, while 'local:3' will use 3 workers
and default scheduler (threaded), and 'local:processed' will use a single worker and use processed scheduler.
reset (bool): If the specified dataset cannot be loaded due to a corrupted HEAD state of the branch being loaded,
setting ``reset=True`` will reset HEAD changes and load the previous version.
# noqa: DAR101 reset
Returns:
Dataset: Dataset loaded using the arguments provided.
Expand All @@ -449,6 +474,9 @@ def load(
UserNotLoggedInException: When user is not logged in
InvalidTokenException: If the specified toke is invalid
TokenPermissionError: When there are permission or other errors related to token
CheckoutError: If version address specified in the path cannot be found
DatasetCorruptError: If loading the dataset failed due to corruption and `reset` is not `True`
Exception: Re-raises caught exception if reset cannot fix the issue
Warning:
Setting ``access_method`` to download will overwrite the local copy of the dataset if it was previously downloaded.
Expand Down Expand Up @@ -514,32 +542,44 @@ def load(
"ds_exists": True,
"num_workers": num_workers,
"scheduler": scheduler,
"reset": reset,
}

try:
return dataset._load(dataset_kwargs, access_method)
except (AgreementError, CheckoutError) as e:
raise e from None
except Exception as e:
if not reset:
if isinstance(e, DatasetCorruptError):
if access_method == "stream":
if not reset:
if isinstance(e, DatasetCorruptError):
raise DatasetCorruptError(
message=e.message,
action="Try using `reset=True` to reset HEAD changes and load the previous commit.",
cause=e.__cause__,
)
raise DatasetCorruptError(
message=e.message,
action="Try using `reset=True` to reset HEAD changes and load the previous commit.",
cause=e.__cause__,
"Exception occured (see Traceback). The dataset maybe corrupted."
"Try using `reset=True` to reset HEAD changes and load the previous commit."
"This will delete all uncommitted changes on the branch you are trying to load."
) from e
if storage.read_only:
raise ReadOnlyModeError(
"Cannot reset when loading dataset in read-only mode."
)
raise DatasetCorruptError(
"Exception occured (see Traceback). The dataset maybe corrupted."
"Try using `reset=True` to reset HEAD changes and load the previous commit."
"This will delete all uncommitted changes on the branch you are trying to load."
) from e
return dataset._reset_and_load(
cache_chain, access_method, dataset_kwargs, version, e
)
return dataset._reset_and_load(
cache_chain, access_method, dataset_kwargs, version, e
)
raise e

@staticmethod
def _reset_and_load(storage, access_method, dataset_kwargs, version, err):
"""Reset and then load the dataset. Only called when loading dataset normally errored out with `err`."""
"""Reset and then load the dataset. Only called when loading dataset errored out with `err`."""
if access_method != "stream":
dataset_kwargs["reset"] = True
ds = dataset._load(dataset_kwargs, access_method)
return ds

try:
version_info = load_version_info(storage)
except KeyError:
Expand Down Expand Up @@ -579,15 +619,15 @@ def _load(dataset_kwargs, access_method=None, create=False):
dataset_created(ret)
else:
dataset_loaded(ret)
else:
ret = get_local_dataset(**dataset_kwargs)

integrity_check(ret)
integrity_check(ret)

verbose = dataset_kwargs.get("verbose")
path = dataset_kwargs.get("path")
if verbose:
logger.info(f"{path} loaded successfully.")
verbose = dataset_kwargs.get("verbose")
path = dataset_kwargs.get("path")
if verbose:
logger.info(f"{path} loaded successfully.")
else:
ret = get_local_dataset(**dataset_kwargs)
return ret

@staticmethod
Expand Down Expand Up @@ -658,6 +698,7 @@ def delete(
DatasetHandlerError: If a Dataset does not exist at the given path and ``force = False``.
UserNotLoggedInException: When user is not logged in.
NotImplementedError: When attempting to delete a managed view.
ValueError: If version is specified in the path
Warning:
This is an irreversible operation. Data once deleted cannot be recovered.
Expand Down Expand Up @@ -978,9 +1019,17 @@ def deepcopy(
report_params["Dest"] = dest
feature_report_path(src, "deepcopy", report_params, token=token)

src_ds = deeplake.load(
src, read_only=True, creds=src_creds, token=token, verbose=False
)
try:
src_ds = deeplake.load(
src, read_only=True, creds=src_creds, token=token, verbose=False
)
except DatasetCorruptError as e:
raise DatasetCorruptError(
"The source dataset is corrupted.",
"You can try to fix this by loading the dataset with `reset=True` "
"which will attempt to reset uncommitted HEAD changes and load the previous version.",
e.__cause__,
)
src_storage = get_base_storage(src_ds.storage)

dest_storage, cache_chain = get_storage_and_cache_chain(
Expand Down
38 changes: 27 additions & 11 deletions deeplake/api/tests/test_reset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from deeplake.util.exceptions import DatasetCorruptError
from deeplake.util.exceptions import DatasetCorruptError, ReadOnlyModeError

import numpy as np

Expand All @@ -22,22 +22,38 @@ def verify_reset_on_checkout(ds, branch, commit_id, old_head, data):
np.testing.assert_array_equal(ds[tensor].numpy(), data[tensor])


def test_load_corrupt_dataset(local_path):
ds = deeplake.empty(local_path, overwrite=True)
ds.create_tensor("abc")
ds.abc.append(1)
first = ds.commit()
@pytest.mark.parametrize("path", ["local_path", "s3_path"], indirect=True)
def test_load_corrupt_dataset(path):
ds = deeplake.empty(path, overwrite=True)

ds.abc.append(2)
second = ds.commit()
access_method = "local" if path.startswith("s3://") else "stream"

with ds:
ds.create_tensor("abc")
ds.abc.append(1)
first = ds.commit()

ds.abc.append(2)
second = ds.commit()

ds = deeplake.load(path, access_method=access_method)

corrupt_ds(ds, "abc", 3)
save_head = ds.pending_commit_id

with pytest.raises(DatasetCorruptError):
ds = deeplake.load(local_path)

ds = deeplake.load(local_path, reset=True)
ds = deeplake.load(path, access_method=access_method)

with pytest.raises(ReadOnlyModeError):
ds = deeplake.load(
path, read_only=True, access_method=access_method, reset=True
)

ds = deeplake.load(
path,
reset=True,
access_method=access_method,
)
verify_reset_on_checkout(ds, "main", second, save_head, {"abc": [[1], [2]]})


Expand Down
2 changes: 2 additions & 0 deletions deeplake/util/access_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def get_local_dataset(
ds_exists,
num_workers,
scheduler,
reset,
):
local_path = get_local_storage_path(path, os.environ["DEEPLAKE_DOWNLOAD_PATH"])
download = access_method == "download" or (
Expand Down Expand Up @@ -91,6 +92,7 @@ def get_local_dataset(
local_cache_size=local_cache_size,
token=token,
org_id=org_id,
reset=reset,
)
if download:
ds.storage.next_storage[TIMESTAMP_FILENAME] = time.ctime().encode("utf-8")
Expand Down
1 change: 0 additions & 1 deletion docs/source/Utility-Functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ General Functions
:toctree:
:nosignatures:

list
exists

Making Deep Lake Samples
Expand Down
1 change: 0 additions & 1 deletion docs/source/deeplake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ deeplake
.. autofunction:: copy
.. autofunction:: deepcopy
.. autofunction:: connect
.. autofunction:: list
.. autofunction:: exists
.. autofunction:: read
.. autofunction:: link
Expand Down

0 comments on commit 1f9db8f

Please sign in to comment.