Skip to content

Commit

Permalink
minor refactors and docstring updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AbhinavTuli committed Jun 2, 2021
1 parent a7b6f9e commit d5fbd56
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 28 deletions.
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def _get_s3_provider(request):
return _get_storage_provider(request, S3)


def _get_dataset(provider: StorageProvider):
return Dataset(storage=provider)
def _get_dataset(storage: StorageProvider):
return Dataset(storage=storage)


@pytest.fixture
Expand Down
25 changes: 18 additions & 7 deletions hub/api/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from hub.util.cache_chain import generate_chain
from hub.constants import (
MB,
META_FILENAME,
DEFAULT_MEMORY_CACHE_SIZE,
DEFAULT_LOCAL_CACHE_SIZE,
Expand Down Expand Up @@ -57,8 +58,10 @@ def __init__(
"Dataset should not be constructed with both storage and path. Ignoring path and using storage."
)
base_storage = storage or storage_provider_from_path(path)
memory_cache_size_bytes = memory_cache_size * MB
local_cache_size_bytes = local_cache_size * MB
self.storage = generate_chain(
base_storage, memory_cache_size, local_cache_size, path
base_storage, memory_cache_size_bytes, local_cache_size_bytes, path
)
self.tensors: Dict[str, Tensor] = {}
if META_FILENAME in self.storage:
Expand Down Expand Up @@ -111,19 +114,27 @@ def __iter__(self):
yield self[i]

def flush(self):
"""Necessary operation after writes if a cache is being used. Clears the cache and sends the data to underlying storage"""
"""Necessary operation after writes if caches are being used.
Writes all the dirty data from the cache layers (if any) to the underlying storage.
Here dirty data corresponds to data that has been changed/assigned and but hasn't yet been sent to the underlying storage.
"""
self.storage.flush()

def clear_cache(self):
"""Flushes the contents of the cache and deletes contents of all the layers of it.
This doesn't clear data from the actual storage.
This is useful if you have multiple dataset with memory caches open, taking up too much RAM.
Also useful when storage cache is no longer needed for certain datasets and is taking up storage space.
"""Flushes (see Dataset.flush documentation) the contents of the cache layers (if any) and then deletes contents of all the layers of it.
This doesn't delete data from the actual storage.
This is useful if you have multiple datasets with memory caches open, taking up too much RAM.
Also useful when local cache is no longer needed for certain datasets and is taking up storage space.
"""
self.flush()
if self.storage.hasattr("clear_cache"):
self.storage.clear_cache()

def delete(self):
"""Deletes the entire dataset from the cache layers (if any) and the underlying storage.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
"""
self.storage.clear()

@staticmethod
def from_path(path: str):
"""Create a local hub dataset from unstructured data.
Expand Down
3 changes: 0 additions & 3 deletions hub/api/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,3 @@ def numpy(self):
A numpy array containing the data represented by this tensor.
"""
return read_array(self.key, self.storage, self.slice)

def flush(self):
self.storage.flush()
20 changes: 15 additions & 5 deletions hub/core/storage/lru_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ def flush(self):
self.next_storage[key] = self.cache_storage[key]
self.dirty_keys.clear()

self.next_storage.flush()

def __getitem__(self, path: str):
"""If item is in cache_storage, retrieves from there and returns.
If item isn't in cache_storage, retrieves from next storage, stores in cache_storage (if possible) and returns.
Expand Down Expand Up @@ -108,14 +106,26 @@ def __delitem__(self, path: str):
raise

def clear_cache(self):
"""Delete the contents of all layers of the cache but not from the final storage."""
"""Flushes the content of the cache and and then deletes contents of all the layers of it.
This doesn't delete data from the actual storage.
"""
self.flush()
self.cache_used = 0
self.lru_sizes.clear()
self.dirty_keys.clear()
self.cache_storage.clear()

if self.next_storage.hasattr("clear_cache"):
self.next_storage.clear_cache()

def clear(self):
"""Deletes all the data from all the layers of the cache and the final storage."""
self.actual_storage.clear()
"""Deletes ALL the data from all the layers of the cache and the actual storage.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
"""
self.cache_used = 0
self.lru_sizes.clear()
self.dirty_keys.clear()
self.cache_storage.clear()
self.next_storage.clear()

def __len__(self):
Expand Down
2 changes: 1 addition & 1 deletion hub/core/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class MemoryProvider(StorageProvider):
"""Provider class for using the memory."""

def __init__(self, root):
def __init__(self, root=""):
self.dict = {}

def __getitem__(
Expand Down
18 changes: 8 additions & 10 deletions hub/util/cache_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def get_cache_chain(storage_list: List[StorageProvider], size_list: List[int]):
storage_list (List[StorageProvider]): The list of storage providers needed in a cache.
Should have atleast one provider in the list.
If only one provider, LRU cache isn't created and the provider is returned.
size_list (List[int]): The list of sizes of the caches.
size_list (List[int]): The list of sizes of the caches in bytes.
Should have size 1 less than provider_list and specifies size of cache for all providers except the last one.
The last one is the primary storage and is assumed to have infinite space.
Expand All @@ -31,10 +31,8 @@ def get_cache_chain(storage_list: List[StorageProvider], size_list: List[int]):
return storage_list[0]
if len(size_list) + 1 != len(storage_list):
raise ProviderSizeListMismatch
storage_list.reverse()
size_list.reverse()
store = storage_list[0]
for size, cache in zip(size_list, storage_list[1:]):
store = storage_list[-1]
for size, cache in reversed(zip(size_list, storage_list[:-1])):
store = LRUCache(cache, store, size)
return store

Expand All @@ -49,9 +47,9 @@ def generate_chain(
Args:
base_storage (StorageProvider): The underlying actual storage of the Dataset.
memory_cache_size (int): The size of the memory cache to be used in MB.
local_cache_size (int): The size of the local filesystem cache to be used in MB.
path (str): The location of the dataset.
memory_cache_size (int): The size of the memory cache to be used in bytes.
local_cache_size (int): The size of the local filesystem cache to be used in bytes.
path (str): The location of the dataset. If not None, it is used to figure out the folder name where the local cache is stored.
Returns:
StorageProvider: Returns a cache containing the base_storage along with memory and local cache if a positive size has been specified for them.
Expand All @@ -64,9 +62,9 @@ def generate_chain(
size_list: List[int] = []
if memory_cache_size > 0:
storage_list.append(MemoryProvider(f"cache/{dataset_id}"))
size_list.append(memory_cache_size * MB)
size_list.append(memory_cache_size)
if local_cache_size > 0:
storage_list.append(LocalProvider(f"~/.activeloop/cache/{dataset_id}"))
size_list.append(local_cache_size * MB)
size_list.append(local_cache_size)
storage_list.append(base_storage)
return get_cache_chain(storage_list, size_list)

0 comments on commit d5fbd56

Please sign in to comment.