Skip to content

Commit

Permalink
[AL-2028] Changes ENV creds handling for linked data (activeloopai#2166)
Browse files Browse the repository at this point in the history
* change how env creds are handled

* docs

* black fix

* update tests

* mypy fix

* another test fix

* better error

* better error handling

* Visualizer changes for new ENV creds. (activeloopai#2189)

---------

Co-authored-by: Sasun Hambardzumyan <xustup@gmail.com>
  • Loading branch information
AbhinavTuli and khustup authored Feb 23, 2023
1 parent 0d0bbd8 commit 412fe6f
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 49 deletions.
17 changes: 9 additions & 8 deletions deeplake/api/tests/test_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from deeplake.tests.common import is_opt_true
from deeplake.util.exceptions import (
ManagedCredentialsNotFoundError,
MissingCredsError,
TensorMetaInvalidHtype,
UnableToReadFromUrlError,
)
Expand Down Expand Up @@ -82,16 +83,15 @@ def test_link_creds(request):
link_creds.populate_creds("abc", {})
link_creds.populate_creds("def", {})

with pytest.raises(KeyError):
with pytest.raises(MissingCredsError):
link_creds.populate_creds("ghi", {})

assert link_creds.get_encoding("ENV") == 0
assert link_creds.get_encoding(None) == 0
with pytest.raises(ValueError):
link_creds.get_encoding(None, "s3://my_bucket/my_key")
assert link_creds.get_encoding("abc") == 1
assert link_creds.get_encoding("def") == 2
with pytest.raises(ValueError):
with pytest.raises(MissingCredsError):
link_creds.get_encoding("ghi")

assert link_creds.get_creds_key(0) is None
Expand All @@ -106,16 +106,16 @@ def test_link_creds(request):
link_creds.add_creds_key("ghi")
assert link_creds.missing_keys == ["ghi"]

with pytest.raises(KeyError):
with pytest.raises(MissingCredsError):
link_creds.get_storage_provider("xyz", "s3")

with pytest.raises(ValueError):
with pytest.raises(MissingCredsError):
link_creds.get_storage_provider("ghi", "s3")

if is_opt_true(request, GCS_OPT):
assert isinstance(link_creds.get_storage_provider("def", "gcs"), GCSProvider)
assert isinstance(link_creds.get_storage_provider("def", "gcs"), GCSProvider)
assert isinstance(link_creds.get_storage_provider("ENV", "gcs"), GCSProvider)
assert isinstance(link_creds.get_storage_provider(None, "gcs"), GCSProvider)
if is_opt_true(request, S3_OPT):
assert isinstance(link_creds.get_storage_provider("abc", "s3"), S3Provider)
assert isinstance(link_creds.get_storage_provider("abc", "s3"), S3Provider)
Expand Down Expand Up @@ -190,8 +190,6 @@ def test_none_used_key(local_ds_generator, cat_path):
ds.populate_creds("my_s3_key", {})
ds.xyz.append(deeplake.link(cat_path))
assert ds.link_creds.used_creds_keys == set()
ds.xyz.append(deeplake.link(cat_path, "ENV"))
assert ds.link_creds.used_creds_keys == set()
ds.xyz.append(deeplake.link(cat_path, "my_s3_key"))
assert ds.link_creds.used_creds_keys == {"my_s3_key"}

Expand Down Expand Up @@ -313,6 +311,8 @@ def test_jwt_link(local_ds):
def test_video(request, local_ds_generator, create_shape_tensor, verify):
local_ds = local_ds_generator()
with local_ds as ds:
ds.add_creds_key("ENV")
ds.populate_creds("ENV", from_environment=True)
ds.create_tensor(
"linked_videos",
htype="link[video]",
Expand Down Expand Up @@ -340,6 +340,7 @@ def test_video(request, local_ds_generator, create_shape_tensor, verify):
assert ds.linked_videos[3].shape == (361, 720, 1280, 3)
# checking persistence
ds = local_ds_generator()
ds.populate_creds("ENV", from_environment=True)
for i in range(3):
assert ds.linked_videos[i].shape == (361, 720, 1280, 3)

Expand Down
4 changes: 3 additions & 1 deletion deeplake/api/tests/test_nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def test_nifti_raw_compress(memory_ds):

def test_nifti_cloud(memory_ds, s3_root_storage):
with memory_ds as ds:
ds.add_creds_key("ENV")
ds.populate_creds("ENV", from_environment=True)
nii_gz_4d = os.path.join(data_path, "example4d.nii.gz")
img = nib.load(nii_gz_4d)
with open(nii_gz_4d, "rb") as f:
Expand All @@ -108,7 +110,7 @@ def test_nifti_cloud(memory_ds, s3_root_storage):
deeplake.read(f"{s3_root_storage.root}/example4d.nii.gz", verify=True)
)
ds.nifti_linked.append(
deeplake.link(f"{s3_root_storage.root}/example4d.nii.gz")
deeplake.link(f"{s3_root_storage.root}/example4d.nii.gz", creds_key="ENV")
)

assert ds.abc[0].numpy().shape == img.shape
Expand Down
2 changes: 2 additions & 0 deletions deeplake/api/tests/test_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ def test_video_data(local_ds, video_paths):
)
def test_linked_video_timestamps(local_ds):
with local_ds as ds:
local_ds.add_creds_key("ENV")
local_ds.populate_creds("ENV", from_environment=True)
ds.create_tensor("videos", htype="link[video]", sample_compression="mp4")
ds.videos.append(
deeplake.link(
Expand Down
15 changes: 14 additions & 1 deletion deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3563,7 +3563,12 @@ def add_creds_key(self, creds_key: str, managed: bool = False):
self.link_creds.add_creds_key(creds_key)
save_link_creds(self.link_creds, self.storage)

def populate_creds(self, creds_key: str, creds: dict):
def populate_creds(
self,
creds_key: str,
creds: Optional[dict] = None,
from_environment: bool = False,
):
"""Populates the creds key added in add_creds_key with the given creds. These creds are used to fetch the external data.
This needs to be done everytime the dataset is reloaded for datasets that contain links to external data.
Expand All @@ -3575,8 +3580,16 @@ def populate_creds(self, creds_key: str, creds: dict):
>>> ds.add_creds_key("my_s3_key")
>>> # populate the creds
>>> ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"})
>>> # or
>>> ds.populate_creds("my_s3_key", from_environment=True)
"""
if creds and from_environment:
raise ValueError(
"Only one of creds or from_environment can be provided. Both cannot be provided at the same time."
)
if from_environment:
creds = {}
self.link_creds.populate_creds(creds_key, creds)

def update_creds_key(self, old_creds_key: str, new_creds_key: str):
Expand Down
27 changes: 17 additions & 10 deletions deeplake/core/link_creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject
from deeplake.core.storage.provider import StorageProvider
from deeplake.core.storage.s3 import S3Provider
from deeplake.util.exceptions import MissingCredsError, MissingManagedCredsError
from deeplake.util.token import expires_in_to_expires_at, is_expired_token
from deeplake.client.log import logger

Expand All @@ -23,14 +24,19 @@ def __init__(self):
self.org_id = None

def get_creds(self, key: Optional[str]):
if key in {"ENV", None}:
if key is None:
return {}
if key not in self.creds_keys:
raise KeyError(f"Creds key {key} does not exist")
raise MissingCredsError(f"Creds key {key} does not exist")
if key not in self.creds_dict:
raise ValueError(
f"Creds key {key} hasn't been populated. Populate it using ds.populate_creds()"
)
if key not in self.managed_creds_keys:
raise MissingCredsError(
f"Creds key {key} hasn't been populated. Populate it using ds.populate_creds()"
)
else:
raise MissingManagedCredsError(
f"Managed creds key {key} hasn't been fetched."
)
if (
self.client is not None
and key in self.managed_creds_keys
Expand Down Expand Up @@ -60,7 +66,7 @@ def get_default_provider(self, provider_type: str):

def get_storage_provider(self, key: Optional[str], provider_type: str):
assert provider_type in {"s3", "gcs"}
if key in {"ENV", None}:
if key is None:
return self.get_default_provider(provider_type)

provider: StorageProvider
Expand Down Expand Up @@ -130,7 +136,7 @@ def replace_creds(self, old_creds_key: str, new_creds_key: str):

def populate_creds(self, creds_key: str, creds):
if creds_key not in self.creds_keys:
raise KeyError(f"Creds key {creds_key} does not exist")
raise MissingCredsError(f"Creds key {creds_key} does not exist")
expires_in_to_expires_at(creds)
self.creds_dict[creds_key] = creds
self.storage_providers.pop(creds_key, None)
Expand Down Expand Up @@ -158,19 +164,20 @@ def frombuffer(cls, buffer: bytes):
obj.creds_mapping = {k: i + 1 for i, k in enumerate(obj.creds_keys)}
obj.managed_creds_keys = set(d["managed_creds_keys"])
obj.used_creds_keys = set(d["used_creds_keys"])
if "ENV" in obj.used_creds_keys:
obj.creds_keys = ["ENV"] + obj.creds_keys
obj.creds_mapping["ENV"] = 0
obj.is_dirty = False
return obj

def get_encoding(self, key: Optional[str] = None, path: Optional[str] = None):
if key == "ENV":
return 0
if key is None:
if path and path.startswith(ALL_CLOUD_PREFIXES):
raise ValueError("Creds key must always be specified for cloud storage")
return 0

if key not in self.creds_keys:
raise ValueError(f"Creds key {key} does not exist")
raise MissingCredsError(f"Creds key {key} does not exist")
return self.creds_mapping[key]

def get_creds_key(self, encoding):
Expand Down
15 changes: 9 additions & 6 deletions deeplake/core/linked_sample.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
from typing import Optional
from deeplake.util.exceptions import MissingCredsError
from deeplake.util.path import get_path_type
import deeplake
import numpy as np

from deeplake.util.creds import convert_creds_key


class LinkedSample:
"""Represents a sample that is initialized using external links. See :meth:`deeplake.link`."""

def __init__(self, path: str, creds_key: Optional[str] = None):
self.path = path
self.creds_key = convert_creds_key(creds_key, path)
self.creds_key = creds_key

@property
def dtype(self) -> str:
return np.array("").dtype.str


def read_linked_sample(
sample_path: str, sample_creds_key: str, link_creds, verify: bool
sample_path: str, sample_creds_key: Optional[str], link_creds, verify: bool
):
provider_type = get_path_type(sample_path)
if provider_type == "local":
Expand All @@ -38,9 +37,13 @@ def retry_refresh_managed_creds(f):
def wrapper(linked_creds, sample_creds_key, *args, **kwargs):
try:
return f(linked_creds, sample_creds_key, *args, **kwargs)
except MissingCredsError:
raise
except Exception as e:
linked_creds.populate_all_managed_creds()
return f(linked_creds, sample_creds_key, *args, **kwargs)
if linked_creds.client is not None:
linked_creds.populate_all_managed_creds()
return f(linked_creds, sample_creds_key, *args, **kwargs)
raise e

return wrapper

Expand Down
3 changes: 1 addition & 2 deletions deeplake/core/linked_tiled_sample.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional
import numpy as np
from deeplake.core.tensor_link import read_linked_sample
from deeplake.util.creds import convert_creds_key
from deeplake.util.path import get_path_type


Expand All @@ -19,7 +18,7 @@ def __init__(
path_types = {get_path_type(path) for path in path_array.flat}
if len(path_types) > 1:
raise ValueError("Path array contains paths in different locations.")
self.creds_key = convert_creds_key(creds_key, self.path)
self.creds_key = creds_key
self._tile_shape = None
self._shape = None

Expand Down
10 changes: 0 additions & 10 deletions deeplake/util/creds.py

This file was deleted.

8 changes: 8 additions & 0 deletions deeplake/util/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,3 +864,11 @@ def __init__(self, extension, htype=""):

class DatasetCorruptError(Exception):
pass


class MissingCredsError(Exception):
pass


class MissingManagedCredsError(Exception):
pass
2 changes: 2 additions & 0 deletions deeplake/visualizer/tests/test_playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test_video_playback(local_ds_generator, video_paths):
def test_linked_video_playback(local_ds_generator, gcs_path):
with local_ds_generator() as ds:
ds.create_tensor("video_links", htype="link[video]", sample_compression="mp4")
ds.add_creds_key("ENV")
ds.populate_creds("ENV", from_environment=True)
ds.video_links.append(
deeplake.link(
"http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerJoyrides.mp4",
Expand Down
19 changes: 10 additions & 9 deletions deeplake/visualizer/visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,17 @@ def access_creds(path: str):
paths = path.split("/", 1)
id = paths[0]
creds_key = paths[1]
if len(creds_key) == 0:
p = S3Provider("")
return {
"aws_access_key_id": p.aws_access_key_id,
"aws_secret_access_key": p.aws_secret_access_key,
"aws_session_token": p.aws_session_token,
"aws_region": p.aws_region,
}
if creds_key in visualizer.get_link_creds(id).creds_keys:
return visualizer.get_link_creds(id).get_creds(creds_key)
creds = visualizer.get_link_creds(id).get_creds(creds_key)
if len(creds) == 0:
p = S3Provider("")
creds = {
"aws_access_key_id": p.aws_access_key_id,
"aws_secret_access_key": p.aws_secret_access_key,
"aws_session_token": p.aws_session_token,
"aws_region": p.aws_region,
}
return creds

return Response("", 404)

Expand Down
2 changes: 0 additions & 2 deletions docs/source/Htypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,6 @@ use them just by adding the keys to your dataset. For example if you have manage
>>> ds.img.append(deeplake.link("https://picsum.photos/200/300")) # http path doesn’t need creds
>>> ds.img.append(deeplake.link("./path/to/cat.jpeg")) # local path doesn’t need creds
>>> ds.img.append(deeplake.link("s3://abc/def.jpeg")) # this will throw an exception as cloud paths always need creds_key
>>> ds.img.append(deeplake.link("s3://abc/def.jpeg", creds_key="ENV")) # this will use creds from environment

:bluebold:`Accessing the data`

>>> for i in range(5):
Expand Down

0 comments on commit 412fe6f

Please sign in to comment.