From ce723eeb6d46e05066a3dca11ba452760c14874b Mon Sep 17 00:00:00 2001 From: Edward Grigoryan Date: Fri, 8 Jan 2021 12:14:15 +0400 Subject: [PATCH] Removed lmdb cache and test --- hub/store/cache.py | 213 ---------------------------------- hub/store/store.py | 1 - hub/store/tests/test_cache.py | 47 -------- 3 files changed, 261 deletions(-) delete mode 100644 hub/store/cache.py delete mode 100644 hub/store/tests/test_cache.py diff --git a/hub/store/cache.py b/hub/store/cache.py deleted file mode 100644 index ca87eb1820..0000000000 --- a/hub/store/cache.py +++ /dev/null @@ -1,213 +0,0 @@ -import os -import zarr -import json -from hub.log import logger - - -class CacheStore(zarr.LMDBStore): - def __init__( - self, - path, - buffers=True, - namespace="namespace", - lock=True, - cache_reset=True, - **kwargs, - ): - """ - Extends zarr.LMDB store to support Ordered Dictionary map - - Parameters - ---------- - path : string - Location of database file. - buffers : bool, optional - If True (default) use support for buffers, which should increase performance by - reducing memory copies. - cache_reset: bool, optional - cleans up the cach - lock: bool, optional - argument for lmdb cache to for multiprocessing - namespace: str, optional - For creating namespaces for keys - **kwargs - Keyword arguments passed through to the `lmdb.open` function. - - """ - kwargs = {} - super(CacheStore, self).__init__(path, buffers=buffers, lock=lock, **kwargs) - self.namespace = namespace - if cache_reset: - self.clear() - self._order = ["_order"] - - @property - def _order(self): - try: - order = json.loads(super().__getitem__("_order")) - return order - except KeyError: - self._order = ["_order"] - return [] - - @_order.setter - def _order(self, x): - super().__setitem__("_order", json.dumps(x).encode()) - - def _key_format(self, key): - """ Zarr sometimes inserts tuple, but lmbd can't have tuple key (".zgroup", "z.group") """ - if isinstance(key, tuple): - key = str(key[0]) - return key - - def move_to_end(self, key): - """Move key to the end""" - order = self._order - ind = order.index(key) - el = order.pop(ind) - order.append(el) - key = self._key_format(key) - self._order = order - - def popitem(self, last=False): - """Remove the first value from the cache, as this will be the least recently""" - order = self._order - key = order.pop(0) - self._ordere = order - return key, self.pop(key, key_removed=True) - - def pop(self, key, key_removed=False): - """Remove an element from the cache""" - key = self._key_format(key) - - if not key_removed: - order = self._order - if key in order: - order.remove(key) - self._order = order - - el = self[key] - del self[key] - return el - - def __setitem__(self, key, value): - """On each new add, remember the order""" - order = self._order - key = self._key_format(key) - if key in order: - order.remove(key) - order.append(key) - self._order = order - super().__setitem__(key, value) - - def __getitem__(self, key): - """On each new add, remember the order""" - key = self._key_format(key) - el = super().__getitem__(key) - return el - - def __delitem__(self, key): - """ Delete item """ - key = self._key_format(key) - order = self._order - if key in order: - order.remove(key) - self._order = order - super().__delitem__(key) - - def safety_wrapper(self, gen): - while True: - try: - yield next(gen) - except StopIteration: - break - except Exception as e: - logger.debug(e) - - def clear(self): - """ Clean up the cache """ - for k in self.safety_wrapper(self.keys()): - if k != "_order" and k != "_values_cache": - try: - del self[k] - except Exception as e: - logger.info(f"CacheStore: {k} could not be deleted: {e}") - - -class Cache(zarr.LRUStoreCache): - def __init__(self, store, max_size, path="~/.activeloop/cache", lock=True): - """ - Extends zarr.LRUStoreCache with LMBD Cache that could be shared across - - Storage class that implements a least-recently-used (LRU) cache layer over - some other store. Intended primarily for use with stores that can be slow to - access, e.g., remote stores that require network communication to store and - retrieve data. - - Parameters - ---------- - store : MutableMapping - The store containing the actual data to be cached. - max_size : int - The maximum size that the cache may grow to, in number of bytes. Provide `None` - if you would like the cache to have unlimited size. - lock: bool, optional - Lock the cache for avoiding multiprocessing issues - """ - super(Cache, self).__init__(store, max_size) - self.path = os.path.expanduser(path) - os.makedirs(self.path, exist_ok=True) - self._values_cache = CacheStore( - self.path, buffers=False, lock=lock, namespace=store.root - ) - self.cache_key = "_current_size" - self.root = store.root - - @property - def _current_size(self): - """ get size counter from the cache """ - if "_values_cache" not in dir(self) or self.cache_key not in self._values_cache: - return 0 - return int.from_bytes( - self._values_cache[self.cache_key], byteorder="big", signed=True - ) - - @_current_size.setter - def _current_size(self, x): - """ set size counter to the cache """ - if "_values_cache" not in dir(self): - return - self._values_cache[self.cache_key] = int.to_bytes( - x, length=32, byteorder="big", signed=True - ) - - def flush(self): - """ flushes the cache db """ - self._values_cache.flush() - - def commit(self): - """ Deprecated alias to flush()""" - self.flush() - - def close(self): - """ closes the cache db """ - self._values_cache.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - self.close() - - def __setitem__(self, key, value): - """On each new add, remember the order""" - super().__setitem__(key, value) - - def __getitem__(self, key): - """On each new add, remember the order""" - el = super().__getitem__(key) - return el - - def __delitem__(self, key): - """ Delete item """ - super().__delitem__(key) diff --git a/hub/store/store.py b/hub/store/store.py index 942dfcb8bb..85ed86a9bc 100644 --- a/hub/store/store.py +++ b/hub/store/store.py @@ -10,7 +10,6 @@ import gcsfs import zarr -from hub.store.cache import Cache from hub.store.lru_cache import LRUCache from hub.client.hub_control import HubControlClient from hub.store.azure_fs import AzureBlobFileSystem diff --git a/hub/store/tests/test_cache.py b/hub/store/tests/test_cache.py deleted file mode 100644 index 0a7d0dbae2..0000000000 --- a/hub/store/tests/test_cache.py +++ /dev/null @@ -1,47 +0,0 @@ -from hub.store.cache import Cache -import zarr -import time -import posixpath - - -class SlowStore(zarr.MemoryStore): - def __init__(self, **kwargs): - super(SlowStore, self).__init__(**kwargs) - - def __getitem__(self, key, **kwargs): - time.sleep(0.001) - return super(SlowStore, self).__getitem__(key, **kwargs) - - def __setitem__(self, key, value, **kwargs): - super(SlowStore, self).__setitem__(key, value, **kwargs) - - -def test_cache(): - store = SlowStore() - store = Cache(store, max_size=1000000) - store.flush() - for i in range(10): - z = zarr.zeros( - (1000, 1000), - chunks=(100, 100), - path=posixpath.realpath(f"./data/test/test_cache/first{i}"), - store=store, - overwrite=True, - ) - - z[...] = i - store.invalidate() - - t1 = time.time() - z[...] - t2 = time.time() - z[...] - t3 = time.time() - assert z[0, 0] == i - # print(t2 - t1, t3 - t2) - assert t2 - t1 > t3 - t2 - store.close() - - -if __name__ == "__main__": - test_cache()