From 467ae134757aed5782a98df818295cd1be19f548 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:04:28 +0400 Subject: [PATCH 01/83] Add GCS storage provider --- conftest.py | 3 ++ hub/constants.py | 2 + hub/core/storage/gcs.py | 76 +++++++++++++++++++++++++++++++++++ hub/tests/dataset_fixtures.py | 17 +++++++- hub/tests/path_fixtures.py | 26 ++++++++++++ hub/tests/storage_fixtures.py | 10 ++++- hub/util/keys.py | 2 +- hub/util/storage.py | 4 ++ 8 files changed, 135 insertions(+), 5 deletions(-) create mode 100644 hub/core/storage/gcs.py diff --git a/conftest.py b/conftest.py index dbcf0c9278..54b76f3367 100644 --- a/conftest.py +++ b/conftest.py @@ -28,6 +28,9 @@ def pytest_addoption(parser): LOCAL_OPT, action="store_true", help="Local tests will run if enabled." ) parser.addoption(S3_OPT, action="store_true", help="S3 tests will run if enabled.") + parser.addoption( + GCS_OPT, action="store_true", help="GCS tests will run if enabled." + ) parser.addoption( HUB_CLOUD_OPT, action="store_true", help="Hub cloud tests will run if enabled." ) diff --git a/hub/constants.py b/hub/constants.py index 91c9ddbdca..ceb1ab2b9c 100644 --- a/hub/constants.py +++ b/hub/constants.py @@ -88,6 +88,7 @@ PYTEST_MEMORY_PROVIDER_BASE_ROOT = "mem://hub_pytest" PYTEST_LOCAL_PROVIDER_BASE_ROOT = "/tmp/hub_pytest/" # TODO: may fail for windows PYTEST_S3_PROVIDER_BASE_ROOT = "s3://hub-2.0-tests/" +PYTEST_GCS_PROVIDER_BASE_ROOT = "gcs://snark-test/" PYTEST_HUB_CLOUD_PROVIDER_BASE_ROOT = f"hub://{HUB_CLOUD_DEV_USERNAME}/" # environment variables @@ -99,6 +100,7 @@ MEMORY_OPT = "--memory-skip" LOCAL_OPT = "--local" S3_OPT = "--s3" +GCS_OPT = "--gcs" HUB_CLOUD_OPT = "--hub-cloud" S3_PATH_OPT = "--s3-path" KEEP_STORAGE_OPT = "--keep-storage" diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py new file mode 100644 index 0000000000..2cb63c686d --- /dev/null +++ b/hub/core/storage/gcs.py @@ -0,0 +1,76 @@ +import posixpath +from typing import Optional +from hub.core.storage.provider import StorageProvider +import gcsfs + + +class GCSProvider(StorageProvider): + """Provider class for using GC storage.""" + + def __init__( + self, + root: str, + token: Optional[str] = None, + ): + """Initializes the GCSProvider + + Example: + s3_provider = GCSProvider("snark-test/benchmarks") + + Args: + root (str): The root of the provider. All read/write request keys will be appended to root. + token (str, optional): GCP token, used for fetching credentials for storage). + """ + self.root = root + self.token: Optional[str] = token + self.missing_exceptions = ( + FileNotFoundError, + IsADirectoryError, + NotADirectoryError, + ) + self.initialize_provider() + + def initialize_provider(self): + self._set_bucket_and_path() + self.fs = gcsfs.GCSFileSystem(token=self.token) + + def _set_bucket_and_path(self): + root = self.root.replace("gcp://", "").replace("gcs://", "") + self.bucket = root.split("/")[0] + self.path = root + if not self.path.endswith("/"): + self.path += "/" + + def clear(self): + """Remove all keys below root - empties out mapping""" + self.fs.delete(self.path, True) + + def __getitem__(self, key): + """Retrieve data""" + try: + with self.fs.open(posixpath.join(self.path, key), "rb") as f: + return f.read() + except self.missing_exceptions: + raise KeyError(key) + + def __setitem__(self, key, value): + """Store value in key""" + with self.fs.open(posixpath.join(self.path, key), "wb") as f: + f.write(value) + + def __iter__(self): + """iterating over the structure""" + yield from (x for x in self.fs.find(self.root)) + + def __len__(self): + """returns length of the structure""" + return len(self.fs.find(self.root)) + + def __delitem__(self, key): + """Remove key""" + self.fs.rm(posixpath.join(self.path, key)) + + def __contains__(self, key): + """Does key exist in mapping?""" + path = posixpath.join(self.path, key) + return self.fs.exists(path) diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 9a23acb8be..31c420bf84 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -3,13 +3,13 @@ enabled_datasets = pytest.mark.parametrize( "ds", - ["memory_ds", "local_ds", "s3_ds"], + ["memory_ds", "local_ds", "s3_ds", "gcs_ds"], indirect=True, ) enabled_persistent_dataset_generators = pytest.mark.parametrize( "ds_generator", - ["local_ds_generator", "s3_ds_generator"], + ["local_ds_generator", "s3_ds_generator", "gcs_ds_generator"], indirect=True, ) @@ -45,6 +45,19 @@ def generate_s3_ds(): return generate_s3_ds +@pytest.fixture +def gcs_ds(gcs_ds_generator): + return gcs_ds_generator() + + +@pytest.fixture +def gcs_ds_generator(gcs_path): + def generate_gcs_ds(): + return hub.dataset(gcs_path) + + return generate_gcs_ds + + @pytest.fixture def hub_cloud_ds(hub_cloud_ds_generator): return hub_cloud_ds_generator() diff --git a/hub/tests/path_fixtures.py b/hub/tests/path_fixtures.py index 8c85c60eef..ac420e420c 100644 --- a/hub/tests/path_fixtures.py +++ b/hub/tests/path_fixtures.py @@ -1,3 +1,4 @@ +from hub.core.storage.gcs import GCSProvider from hub.util.storage import storage_provider_from_hub_path from hub.core.storage.s3 import S3Provider from hub.core.storage.local import LocalProvider @@ -8,10 +9,12 @@ KEEP_STORAGE_OPT, LOCAL_OPT, MEMORY_OPT, + PYTEST_GCS_PROVIDER_BASE_ROOT, PYTEST_HUB_CLOUD_PROVIDER_BASE_ROOT, PYTEST_LOCAL_PROVIDER_BASE_ROOT, PYTEST_MEMORY_PROVIDER_BASE_ROOT, S3_OPT, + GCS_OPT, ) import posixpath from hub.tests.common import ( @@ -25,6 +28,7 @@ MEMORY = "memory" LOCAL = "local" S3 = "s3" +GCS = "gcs" HUB_CLOUD = "hub_cloud" @@ -49,6 +53,12 @@ def _get_path_composition_configs(request): "is_id_prefix": True, "use_underscores": False, }, + GCS: { + "base_root": PYTEST_GCS_PROVIDER_BASE_ROOT, + "use_id": True, + "is_id_prefix": True, + "use_underscores": False, + }, HUB_CLOUD: { "base_root": PYTEST_HUB_CLOUD_PROVIDER_BASE_ROOT, "use_id": True, @@ -125,6 +135,22 @@ def s3_path(request): S3Provider(path).clear() +@pytest.fixture +def gcs_path(request): + if not is_opt_true(request, GCS_OPT): + pytest.skip() + return + + path = _get_storage_path(request, GCS) + GCSProvider(path).clear() + + yield path + + # clear storage unless flagged otherwise + if not is_opt_true(request, KEEP_STORAGE_OPT): + S3Provider(path).clear() + + @pytest.fixture def hub_cloud_path(request, hub_cloud_dev_token): if not is_opt_true(request, HUB_CLOUD_OPT): diff --git a/hub/tests/storage_fixtures.py b/hub/tests/storage_fixtures.py index b363165cc2..8b47ff1eac 100644 --- a/hub/tests/storage_fixtures.py +++ b/hub/tests/storage_fixtures.py @@ -1,3 +1,4 @@ +from hub.core.storage.gcs import GCSProvider from hub.util.storage import storage_provider_from_hub_path from hub.core.storage.s3 import S3Provider from hub.core.storage.local import LocalProvider @@ -7,13 +8,13 @@ enabled_storages = pytest.mark.parametrize( "storage", - ["memory_storage", "local_storage", "s3_storage"], + ["memory_storage", "local_storage", "s3_storage", "gcs_storage"], indirect=True, ) enabled_persistent_storages = pytest.mark.parametrize( "storage", - ["local_storage", "s3_storage"], + ["local_storage", "s3_storage", "gcs_storage"], indirect=True, ) @@ -33,6 +34,11 @@ def s3_storage(s3_path): return S3Provider(s3_path) +@pytest.fixture +def gcs_storage(gcs_path): + return GCSProvider(gcs_path) + + @pytest.fixture def hub_cloud_storage(hub_cloud_path, hub_cloud_dev_token): return storage_provider_from_hub_path(hub_cloud_path, token=hub_cloud_dev_token) diff --git a/hub/util/keys.py b/hub/util/keys.py index 0592a309c3..9c076a89b3 100644 --- a/hub/util/keys.py +++ b/hub/util/keys.py @@ -42,7 +42,7 @@ def dataset_exists(storage: StorageProvider) -> bool: try: storage[get_dataset_meta_key()] return True - except KeyError: + except (KeyError, FileNotFoundError): return False diff --git a/hub/util/storage.py b/hub/util/storage.py index cb106c5ae0..79657be300 100644 --- a/hub/util/storage.py +++ b/hub/util/storage.py @@ -1,3 +1,4 @@ +from hub.core.storage.gcs import GCSProvider from hub.util.cache_chain import generate_chain from hub.constants import MB from hub.util.tag import check_hub_path @@ -25,6 +26,7 @@ def storage_provider_from_path( Returns: If given a path starting with s3:// returns the S3Provider. + If given a path starting with gcp:// orreturns the GCPProvider. If given a path starting with mem:// returns the MemoryProvider. If given a path starting with hub:// returns the underlying cloud Provider. If given a valid local path, returns the LocalProvider. @@ -43,6 +45,8 @@ def storage_provider_from_path( storage: StorageProvider = S3Provider( path, key, secret, session_token, endpoint_url, region, token=token ) + elif path.startswith("gcp://") or path.startswith("gcs://"): + return GCSProvider(path, creds) elif path.startswith("mem://"): storage = MemoryProvider(path) elif path.startswith("hub://"): From b1082e5ae40f6841017d076d20bb68f32848665b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:08:11 +0400 Subject: [PATCH 02/83] Fix docstring --- hub/util/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/util/storage.py b/hub/util/storage.py index 79657be300..218c7d7c3c 100644 --- a/hub/util/storage.py +++ b/hub/util/storage.py @@ -26,7 +26,7 @@ def storage_provider_from_path( Returns: If given a path starting with s3:// returns the S3Provider. - If given a path starting with gcp:// orreturns the GCPProvider. + If given a path starting with gcp:// or gcs:// returns the GCPProvider. If given a path starting with mem:// returns the MemoryProvider. If given a path starting with hub:// returns the underlying cloud Provider. If given a valid local path, returns the LocalProvider. From 823057fa22d6169a7b15389a1188b015f586e07d Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:35:18 +0400 Subject: [PATCH 03/83] Add gcsfs to requirements --- hub/requirements/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/hub/requirements/requirements.txt b/hub/requirements/requirements.txt index 3acb88a2c9..0aeded270c 100644 --- a/hub/requirements/requirements.txt +++ b/hub/requirements/requirements.txt @@ -3,6 +3,7 @@ pytest~=6.2.3 click~=7.1.2 boto3~=1.17.43 botocore~=1.20.78 +gcsfs~=0.6.2 numcodecs~=0.7.3 Pillow~=8.2.0 lz4~=3.1.3 From 5a94c0fdb9f7198396943fd491b172b46c260e07 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:39:54 +0400 Subject: [PATCH 04/83] Move requirement to common --- hub/requirements/common.txt | 1 + hub/requirements/requirements.txt | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index e8f7dd16fb..5d297c5443 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -2,6 +2,7 @@ numpy pillow==8.2.0 boto3 boto3-stubs[essential] +gcsfs~=0.6.2 pathos humbug>=0.2.6 types-requests diff --git a/hub/requirements/requirements.txt b/hub/requirements/requirements.txt index 0aeded270c..3acb88a2c9 100644 --- a/hub/requirements/requirements.txt +++ b/hub/requirements/requirements.txt @@ -3,7 +3,6 @@ pytest~=6.2.3 click~=7.1.2 boto3~=1.17.43 botocore~=1.20.78 -gcsfs~=0.6.2 numcodecs~=0.7.3 Pillow~=8.2.0 lz4~=3.1.3 From 2d63fabbc8a8fa5ce9afc7c15062546cbb78cb1f Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:48:14 +0400 Subject: [PATCH 05/83] Change circleci config --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a5a6602cd9..c938bb0a44 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -176,7 +176,7 @@ commands: command: | $Env:GOOGLE_APPLICATION_CREDENTIALS = $Env:CI_GCS_PATH setx /m GOOGLE_APPLICATION_CREDENTIALS "$Env:GOOGLE_APPLICATION_CREDENTIALS" - python3 -m pytest --cov-report=xml --cov=./ --local --s3 --hub-cloud --kaggle --ignore-glob=buH/* + python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* - when: condition: << parameters.unix-like >> steps: @@ -186,7 +186,7 @@ commands: BUGGER_OFF: "true" command: | export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json - python3 -m pytest --cov-report=xml --cov=./ --local --s3 --hub-cloud --kaggle --ignore-glob=buH/* + python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* parallelism: 10 run-backwards-compatibility-tests: From 7b2d1f0ec2ef8a5a55ed2c0b09135ead3f6f9846 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 14:52:52 +0400 Subject: [PATCH 06/83] Fix lint --- hub/core/storage/gcs.py | 2 +- hub/util/storage.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 2cb63c686d..465405d07a 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -1,7 +1,7 @@ import posixpath from typing import Optional from hub.core.storage.provider import StorageProvider -import gcsfs +import gcsfs # type: ignore class GCSProvider(StorageProvider): diff --git a/hub/util/storage.py b/hub/util/storage.py index 218c7d7c3c..e65de3c935 100644 --- a/hub/util/storage.py +++ b/hub/util/storage.py @@ -46,7 +46,7 @@ def storage_provider_from_path( path, key, secret, session_token, endpoint_url, region, token=token ) elif path.startswith("gcp://") or path.startswith("gcs://"): - return GCSProvider(path, creds) + storage = GCSProvider(path, creds) elif path.startswith("mem://"): storage = MemoryProvider(path) elif path.startswith("hub://"): From 77e998bba306690e59f66ca3926c48cc4ead1263 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 15:07:00 +0400 Subject: [PATCH 07/83] Fix gcs_ds fixture --- hub/core/storage/gcs.py | 4 ++-- hub/tests/path_fixtures.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 465405d07a..d924085eca 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -1,5 +1,5 @@ import posixpath -from typing import Optional +from typing import Dict, Optional, Union from hub.core.storage.provider import StorageProvider import gcsfs # type: ignore @@ -10,7 +10,7 @@ class GCSProvider(StorageProvider): def __init__( self, root: str, - token: Optional[str] = None, + token: Union[str, Dict] = None, ): """Initializes the GCSProvider diff --git a/hub/tests/path_fixtures.py b/hub/tests/path_fixtures.py index ac420e420c..5c8a3ccc4e 100644 --- a/hub/tests/path_fixtures.py +++ b/hub/tests/path_fixtures.py @@ -148,7 +148,7 @@ def gcs_path(request): # clear storage unless flagged otherwise if not is_opt_true(request, KEEP_STORAGE_OPT): - S3Provider(path).clear() + GCSProvider(path).clear() @pytest.fixture From a8c46f09798b7a3c59810bf97d32dcdb59a63b88 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 15:26:12 +0400 Subject: [PATCH 08/83] Add token file input --- hub/core/storage/gcs.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index d924085eca..2c5a07987a 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -2,6 +2,7 @@ from typing import Dict, Optional, Union from hub.core.storage.provider import StorageProvider import gcsfs # type: ignore +import json class GCSProvider(StorageProvider): @@ -15,14 +16,14 @@ def __init__( """Initializes the GCSProvider Example: - s3_provider = GCSProvider("snark-test/benchmarks") + gcs_provider = GCSProvider("snark-test/gcs_ds") Args: root (str): The root of the provider. All read/write request keys will be appended to root. - token (str, optional): GCP token, used for fetching credentials for storage). + token (str/Dict): GCP token, used for fetching credentials for storage). """ self.root = root - self.token: Optional[str] = token + self.token: Union[str, Dict] = token self.missing_exceptions = ( FileNotFoundError, IsADirectoryError, @@ -32,6 +33,9 @@ def __init__( def initialize_provider(self): self._set_bucket_and_path() + if isinstance(self.token, str): + with open(self.token) as token_file: + self.token = json.load(token_file) self.fs = gcsfs.GCSFileSystem(token=self.token) def _set_bucket_and_path(self): From bc834e1d7106ad35773475af8640020ef185b11c Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 16:01:00 +0400 Subject: [PATCH 09/83] Add print for test --- hub/core/storage/gcs.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 2c5a07987a..f1f956c263 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -23,7 +23,7 @@ def __init__( token (str/Dict): GCP token, used for fetching credentials for storage). """ self.root = root - self.token: Union[str, Dict] = token + self.token: Union[str, Dict, None] = token self.missing_exceptions = ( FileNotFoundError, IsADirectoryError, @@ -33,9 +33,7 @@ def __init__( def initialize_provider(self): self._set_bucket_and_path() - if isinstance(self.token, str): - with open(self.token) as token_file: - self.token = json.load(token_file) + print(self.token) self.fs = gcsfs.GCSFileSystem(token=self.token) def _set_bucket_and_path(self): From b1c63a30c24870edb1473c7abb5fff2f089b4c5b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 16:36:03 +0400 Subject: [PATCH 10/83] Add creds from env var --- hub/constants.py | 1 + hub/core/storage/gcs.py | 1 - hub/tests/dataset_fixtures.py | 4 ++-- hub/tests/path_fixtures.py | 12 +++++++++--- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/hub/constants.py b/hub/constants.py index ceb1ab2b9c..7b13e68a0a 100644 --- a/hub/constants.py +++ b/hub/constants.py @@ -95,6 +95,7 @@ ENV_HUB_DEV_PASSWORD = "ACTIVELOOP_HUB_PASSWORD" ENV_KAGGLE_USERNAME = "KAGGLE_USERNAME" ENV_KAGGLE_KEY = "KAGGLE_KEY" +ENV_GOOGLE_APPLICATION_CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS" # pytest options MEMORY_OPT = "--memory-skip" diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index f1f956c263..21f096f4c1 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -33,7 +33,6 @@ def __init__( def initialize_provider(self): self._set_bucket_and_path() - print(self.token) self.fs = gcsfs.GCSFileSystem(token=self.token) def _set_bucket_and_path(self): diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 31c420bf84..198915e178 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -51,9 +51,9 @@ def gcs_ds(gcs_ds_generator): @pytest.fixture -def gcs_ds_generator(gcs_path): +def gcs_ds_generator(gcs_path, gcs_creds): def generate_gcs_ds(): - return hub.dataset(gcs_path) + return hub.dataset(gcs_path, creds=gcs_creds) return generate_gcs_ds diff --git a/hub/tests/path_fixtures.py b/hub/tests/path_fixtures.py index 5c8a3ccc4e..0eea5e238c 100644 --- a/hub/tests/path_fixtures.py +++ b/hub/tests/path_fixtures.py @@ -15,6 +15,7 @@ PYTEST_MEMORY_PROVIDER_BASE_ROOT, S3_OPT, GCS_OPT, + ENV_GOOGLE_APPLICATION_CREDENTIALS, ) import posixpath from hub.tests.common import ( @@ -135,20 +136,25 @@ def s3_path(request): S3Provider(path).clear() +@pytest.fixture(scope="session") +def gcs_creds(): + return os.environ[ENV_GOOGLE_APPLICATION_CREDENTIALS] + + @pytest.fixture -def gcs_path(request): +def gcs_path(request, gcs_creds): if not is_opt_true(request, GCS_OPT): pytest.skip() return path = _get_storage_path(request, GCS) - GCSProvider(path).clear() + GCSProvider(path, token=gcs_creds).clear() yield path # clear storage unless flagged otherwise if not is_opt_true(request, KEEP_STORAGE_OPT): - GCSProvider(path).clear() + GCSProvider(path, token=gcs_creds).clear() @pytest.fixture From 55387e403cea8f1580d318fedeb1b855c82295a3 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 16:43:14 +0400 Subject: [PATCH 11/83] Modify CONTRIBUTING.md --- CONTRIBUTING.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 141353eb11..2135f1a68d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -31,6 +31,7 @@ pip3 install -r hub/requirements/tests.txt - `pytest .`: Run all tests with memory only. - `pytest . --local`: Run all tests with memory and local. - `pytest . --s3`: Run all tests with memory and s3. +- `pytest . --gcs`: Run all tests with memory and GCS - `pytest . --kaggle`: Run all tests that use the kaggle API. - `pytest . --memory-skip --hub-cloud`: Run all tests with hub cloud only. #### Backwards Compatibility Tests @@ -41,6 +42,7 @@ Combine any of the following options to suit your test cases. - `--local`: Enable local tests. - `--s3`: Enable S3 tests. +- `--gcs`: Enable GCS tests. - `--hub-cloud`: Enable hub cloud tests. - `--memory-skip`: Disable memory tests. - `--s3-path`: Specify an s3 path if you don't have access to our internal testing bucket. From 1361620d6c314f489d830009c5f1d06fd94f206a Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 17:19:57 +0400 Subject: [PATCH 12/83] Check readonly --- hub/core/storage/gcs.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 21f096f4c1..3a657aac70 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -44,6 +44,7 @@ def _set_bucket_and_path(self): def clear(self): """Remove all keys below root - empties out mapping""" + self.check_readonly() self.fs.delete(self.path, True) def __getitem__(self, key): @@ -56,6 +57,7 @@ def __getitem__(self, key): def __setitem__(self, key, value): """Store value in key""" + self.check_readonly() with self.fs.open(posixpath.join(self.path, key), "wb") as f: f.write(value) @@ -69,6 +71,7 @@ def __len__(self): def __delitem__(self, key): """Remove key""" + self.check_readonly() self.fs.rm(posixpath.join(self.path, key)) def __contains__(self, key): From 32f4f707fad6a2ed625a2e18bf1fb8705d0bd953 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 20 Aug 2021 17:22:11 +0400 Subject: [PATCH 13/83] Format docstrings --- hub/core/storage/gcs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 3a657aac70..00fe3d5dd6 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -29,9 +29,9 @@ def __init__( IsADirectoryError, NotADirectoryError, ) - self.initialize_provider() + self._initialize_provider() - def initialize_provider(self): + def _initialize_provider(self): self._set_bucket_and_path() self.fs = gcsfs.GCSFileSystem(token=self.token) @@ -62,11 +62,11 @@ def __setitem__(self, key, value): f.write(value) def __iter__(self): - """iterating over the structure""" + """Iterating over the structure""" yield from (x for x in self.fs.find(self.root)) def __len__(self): - """returns length of the structure""" + """Returns length of the structure""" return len(self.fs.find(self.root)) def __delitem__(self, key): From b8a62ade9e0eff7295728331cf5097db29b6bd93 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 16:43:58 +0400 Subject: [PATCH 14/83] Modify circleci timeout --- .circleci/config.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c938bb0a44..b1a5e1658f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -188,7 +188,9 @@ commands: export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* - parallelism: 10 + parallelism: 10 + no_output_timeout: 30m + run-backwards-compatibility-tests: steps: - run: From d7f2d66a0d5181675ecdb8c8553db26c93a4fb12 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 16:47:47 +0400 Subject: [PATCH 15/83] Fix config --- .circleci/config.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index b1a5e1658f..5bf28556b4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -295,6 +295,8 @@ jobs: condition: << parameters.unix-like >> steps: - slack-status + parallelism: 10 + no_output_timeout: 30m deploy: executor: linux environment: From b4e8e5070ab6cfa0755b6c8c2e56938b96443f86 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 16:49:10 +0400 Subject: [PATCH 16/83] Remove duplicate --- .circleci/config.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5bf28556b4..f3215ccdc9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -187,9 +187,6 @@ commands: command: | export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* - - parallelism: 10 - no_output_timeout: 30m run-backwards-compatibility-tests: steps: From d750e7ccc8c1b7b4c79f56c8c8dc424417cee82a Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 16:50:53 +0400 Subject: [PATCH 17/83] Fix tab --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f3215ccdc9..23b2a07618 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -293,7 +293,7 @@ jobs: steps: - slack-status parallelism: 10 - no_output_timeout: 30m + no_output_timeout: 30m deploy: executor: linux environment: From 48c54acb488e4bf98b6711ebd51339e43d911131 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 16:58:07 +0400 Subject: [PATCH 18/83] Remove parallelism --- .circleci/config.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 23b2a07618..fbb0cc132c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -292,7 +292,6 @@ jobs: condition: << parameters.unix-like >> steps: - slack-status - parallelism: 10 no_output_timeout: 30m deploy: executor: linux From c68a63ac760f4b4015dfda491d4576dc7d0e510b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 23 Aug 2021 17:08:16 +0400 Subject: [PATCH 19/83] Replace join for path --- hub/core/storage/gcs.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 00fe3d5dd6..0809153b4d 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -47,10 +47,13 @@ def clear(self): self.check_readonly() self.fs.delete(self.path, True) + def _get_full_key_path(self, key): + return posixpath.join(self.path, key) + def __getitem__(self, key): """Retrieve data""" try: - with self.fs.open(posixpath.join(self.path, key), "rb") as f: + with self.fs.open(self._get_full_key_path(key), "rb") as f: return f.read() except self.missing_exceptions: raise KeyError(key) @@ -58,7 +61,7 @@ def __getitem__(self, key): def __setitem__(self, key, value): """Store value in key""" self.check_readonly() - with self.fs.open(posixpath.join(self.path, key), "wb") as f: + with self.fs.open(self._get_full_key_path(key), "wb") as f: f.write(value) def __iter__(self): @@ -72,9 +75,9 @@ def __len__(self): def __delitem__(self, key): """Remove key""" self.check_readonly() - self.fs.rm(posixpath.join(self.path, key)) + self.fs.rm(self._get_full_key_path(key)) def __contains__(self, key): """Does key exist in mapping?""" - path = posixpath.join(self.path, key) + path = self._get_full_key_path(key) return self.fs.exists(path) From 2bf9a89ce2270c9540fcf007b93554fa5a9d102d Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 13:15:59 +0400 Subject: [PATCH 20/83] Change to gcloud --- hub/core/storage/gcs.py | 64 +++++++++++++++++++++++++++---------- hub/requirements/common.txt | 3 +- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 0809153b4d..dcca275e28 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -1,8 +1,11 @@ import posixpath -from typing import Dict, Optional, Union -from hub.core.storage.provider import StorageProvider -import gcsfs # type: ignore import json +import os +from typing import Dict, Union + +from google.auth import credentials +from hub.core.storage.provider import StorageProvider +from google.cloud import storage # type: ignore class GCSProvider(StorageProvider): @@ -28,12 +31,27 @@ def __init__( FileNotFoundError, IsADirectoryError, NotADirectoryError, + AttributeError, ) self._initialize_provider() def _initialize_provider(self): self._set_bucket_and_path() - self.fs = gcsfs.GCSFileSystem(token=self.token) + from google.oauth2 import service_account + + if isinstance(self.token, dict): + token_path = posixpath.expanduser("gcs.json") + with open(token_path, "wb") as f: + json.dump(self.token, f) + self.token = token_path + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.token + credentials = service_account.Credentials.from_service_account_file(self.token) + + scoped_credentials = credentials.with_scopes( + ["https://www.googleapis.com/auth/cloud-platform"] + ) + self.client = storage.Client(credentials=scoped_credentials) + self.client_bucket = self.client.get_bucket(self.bucket) def _set_bucket_and_path(self): root = self.root.replace("gcp://", "").replace("gcs://", "") @@ -42,42 +60,54 @@ def _set_bucket_and_path(self): if not self.path.endswith("/"): self.path += "/" + def _get_path_from_key(self, key): + return posixpath.join(self.path, key) + + def _list_keys(self): + self._blob_objects = self.client_bucket.list_blobs(prefix=self.path) + return [obj.name for obj in self._blob_objects] + def clear(self): """Remove all keys below root - empties out mapping""" self.check_readonly() - self.fs.delete(self.path, True) - - def _get_full_key_path(self, key): - return posixpath.join(self.path, key) + blob_objects = self.client_bucket.list_blobs(prefix=self.path) + self.client_bucket.delete_blobs(blob_objects) def __getitem__(self, key): """Retrieve data""" try: - with self.fs.open(self._get_full_key_path(key), "rb") as f: - return f.read() + blob = self.client_bucket.get_blob(self._get_path_from_key(key)) + return blob.download_as_bytes() except self.missing_exceptions: raise KeyError(key) def __setitem__(self, key, value): """Store value in key""" self.check_readonly() - with self.fs.open(self._get_full_key_path(key), "wb") as f: - f.write(value) + blob = self.client_bucket.blob(self._get_path_from_key(key)) + if isinstance(value, memoryview): + value = value.tobytes() + blob.upload_from_string( + value, + ) def __iter__(self): """Iterating over the structure""" - yield from (x for x in self.fs.find(self.root)) + yield from [f for f in self._list_keys() if not f.endswith("/")] def __len__(self): """Returns length of the structure""" - return len(self.fs.find(self.root)) + return len(self._list_keys()) def __delitem__(self, key): """Remove key""" self.check_readonly() - self.fs.rm(self._get_full_key_path(key)) + blob = self.client_bucket.blob(self._get_path_from_key(key)) + blob.delete() def __contains__(self, key): """Does key exist in mapping?""" - path = self._get_full_key_path(key) - return self.fs.exists(path) + stats = storage.Blob( + bucket=self.client_bucket, name=self._get_path_from_key(key) + ).exists(self.client) + return stats diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index 5d297c5443..6b00cdfe91 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -2,7 +2,8 @@ numpy pillow==8.2.0 boto3 boto3-stubs[essential] -gcsfs~=0.6.2 +google-cloud-storage~=1.42.0 +google-auth~=2.0.1 pathos humbug>=0.2.6 types-requests From b9b73491427cc7b24b9eb9b3300d4ab640d0397b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 13:18:33 +0400 Subject: [PATCH 21/83] Fix delte --- hub/core/storage/gcs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index dcca275e28..e3a281dd23 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -71,7 +71,8 @@ def clear(self): """Remove all keys below root - empties out mapping""" self.check_readonly() blob_objects = self.client_bucket.list_blobs(prefix=self.path) - self.client_bucket.delete_blobs(blob_objects) + for blob in blob_objects: + blob.delete() def __getitem__(self, key): """Retrieve data""" From 83a7d0b908970f5c95979aabaacbd23a12b822c9 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 13:26:57 +0400 Subject: [PATCH 22/83] Move import --- hub/core/storage/gcs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index e3a281dd23..ba3c3b549b 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -3,9 +3,9 @@ import os from typing import Dict, Union -from google.auth import credentials from hub.core.storage.provider import StorageProvider from google.cloud import storage # type: ignore +from google.oauth2 import service_account # type: ignore class GCSProvider(StorageProvider): @@ -37,7 +37,6 @@ def __init__( def _initialize_provider(self): self._set_bucket_and_path() - from google.oauth2 import service_account if isinstance(self.token, dict): token_path = posixpath.expanduser("gcs.json") From fceb0b80ec7ded6d01ee10ab03ba97a22054aa13 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 15:19:54 +0400 Subject: [PATCH 23/83] Fix pickle --- hub/core/storage/gcs.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index ba3c3b549b..2449a3e65b 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -49,8 +49,8 @@ def _initialize_provider(self): scoped_credentials = credentials.with_scopes( ["https://www.googleapis.com/auth/cloud-platform"] ) - self.client = storage.Client(credentials=scoped_credentials) - self.client_bucket = self.client.get_bucket(self.bucket) + client = storage.Client(credentials=scoped_credentials) + self.client_bucket = client.get_bucket(self.bucket) def _set_bucket_and_path(self): root = self.root.replace("gcp://", "").replace("gcs://", "") @@ -87,9 +87,7 @@ def __setitem__(self, key, value): blob = self.client_bucket.blob(self._get_path_from_key(key)) if isinstance(value, memoryview): value = value.tobytes() - blob.upload_from_string( - value, - ) + blob.upload_from_string(value) def __iter__(self): """Iterating over the structure""" @@ -109,5 +107,14 @@ def __contains__(self, key): """Does key exist in mapping?""" stats = storage.Blob( bucket=self.client_bucket, name=self._get_path_from_key(key) - ).exists(self.client) + ).exists(self.client_bucket.client) return stats + + def __getstate__(self): + return (self.root, self.token, self.missing_exceptions) + + def __setstate__(self, state): + self.root = state[0] + self.token = state[1] + self.missing_exceptions = state[2] + self._initialize_provider() From 5afb3bda72d543c173e86d68dcaf2e7d59871f4b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 16:12:45 +0400 Subject: [PATCH 24/83] Fix creds --- hub/core/storage/gcs.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 2449a3e65b..1a944b41e4 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -37,19 +37,24 @@ def __init__( def _initialize_provider(self): self._set_bucket_and_path() - - if isinstance(self.token, dict): - token_path = posixpath.expanduser("gcs.json") - with open(token_path, "wb") as f: - json.dump(self.token, f) - self.token = token_path - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.token - credentials = service_account.Credentials.from_service_account_file(self.token) - - scoped_credentials = credentials.with_scopes( - ["https://www.googleapis.com/auth/cloud-platform"] - ) - client = storage.Client(credentials=scoped_credentials) + if self.token: + if isinstance(self.token, dict): + token_path = posixpath.expanduser("gcs.json") + with open(token_path, "wb") as f: + json.dump(self.token, f) + self.token = token_path + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.token + credentials = service_account.Credentials.from_service_account_file( + self.token + ) + + scoped_credentials = credentials.with_scopes( + ["https://www.googleapis.com/auth/cloud-platform"] + ) + + client = storage.Client(credentials=scoped_credentials) + else: + client = storage.Client(credentials=self.token) self.client_bucket = client.get_bucket(self.bucket) def _set_bucket_and_path(self): From b3ca9d6529ad3c459c55135eea11e9dc748d803d Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 16:16:19 +0400 Subject: [PATCH 25/83] Convert bytearrays --- hub/core/storage/gcs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 1a944b41e4..23f06711a5 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -92,6 +92,8 @@ def __setitem__(self, key, value): blob = self.client_bucket.blob(self._get_path_from_key(key)) if isinstance(value, memoryview): value = value.tobytes() + elif isinstance(value, bytearray): + value = bytes(value) blob.upload_from_string(value) def __iter__(self): From 6ebe0f027dbb207a1fdd72133ae62f5b3311e8fb Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Wed, 25 Aug 2021 18:08:30 +0400 Subject: [PATCH 26/83] Creds formats --- hub/core/storage/gcs.py | 142 ++++++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 19 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 23f06711a5..79d492a45f 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -2,10 +2,128 @@ import json import os from typing import Dict, Union +import textwrap -from hub.core.storage.provider import StorageProvider from google.cloud import storage # type: ignore from google.oauth2 import service_account # type: ignore +import google.auth as gauth # type: ignore +import google.auth.compute_engine # type: ignore +import google.auth.credentials # type: ignore +import google.auth.exceptions # type: ignore +from google.oauth2.credentials import Credentials # type: ignore + +from hub.core.storage.provider import StorageProvider + + +class GoogleCredentials: + def __init__(self, token, project=None): + self.scope = "https://www.googleapis.com/auth/cloud-platform" + self.project = project + self.access = "full-access" + self.heads = {} + self.credentials = None + self.method = None + self.token = token + self.connect(method=token) + + def _connect_google_default(self): + credentials, project = gauth.default(scopes=[self.scope]) + msg = textwrap.dedent( + """\ + User-provided project '{}' does not match the google default project '{}'. Either + 1. Accept the google-default project by not passing a `project` to GCSFileSystem + 2. Configure the default project to match the user-provided project (gcloud config set project) + 3. Use an authorization method other than 'google_default' by providing 'token=...' + """ + ) + if self.project and self.project != project: + raise ValueError(msg.format(self.project, project)) + self.project = project + self.credentials = credentials + + def _connect_cloud(self): + self.credentials = gauth.compute_engine.Credentials() + + def _connect_cache(self): + project, access = self.project, self.access + if (project, access) in self.tokens: + credentials = self.tokens[(project, access)] + self.credentials = credentials + + def _dict_to_credentials(self, token): + """ + Convert old dict-style token. + Does not preserve access token itself, assumes refresh required. + """ + token_path = posixpath.expanduser("gcs.json") + with open(token_path, "w") as f: + json.dump(token, f) + return token_path + + def _connect_token(self, token): + """ + Connect using a concrete token + Parameters + ---------- + token: str, dict or Credentials + If a str, try to load as a Service file, or next as a JSON; if + dict, try to interpret as credentials; if Credentials, use directly. + """ + if isinstance(token, str): + if not os.path.exists(token): + raise FileNotFoundError(token) + try: + self._connect_service(token) + return + except: + token = json.load(open(token)) + if isinstance(token, dict): + token = self._dict_to_credentials(token) + self._connect_service(token) + return + elif isinstance(token, google.auth.credentials.Credentials): + credentials = token + else: + raise ValueError("Token format not understood") + self.credentials = credentials + if self.credentials.valid: + self.credentials.apply(self.heads) + + def _connect_service(self, fn): + credentials = service_account.Credentials.from_service_account_file( + fn, scopes=[self.scope] + ) + self.credentials = credentials + + def _connect_anon(self): + self.credentials = None + + def connect(self, method=None): + """ + Establish session token. A new token will be requested if the current + one is within 100s of expiry. + Parameters + ---------- + method: str (google_default|cache|cloud|token|anon|browser) or None + Type of authorisation to implement - calls `_connect_*` methods. + If None, will try sequence of methods. + """ + if method not in [ + "google_default", + "cache", + "cloud", + "token", + "anon", + None, + ]: + self._connect_token(method) + elif method is None: + for meth in ["google_default", "cache", "cloud", "anon"]: + self.connect(method=meth) + break + else: + self.__getattribute__("_connect_" + method)() + self.method = method class GCSProvider(StorageProvider): @@ -37,24 +155,10 @@ def __init__( def _initialize_provider(self): self._set_bucket_and_path() - if self.token: - if isinstance(self.token, dict): - token_path = posixpath.expanduser("gcs.json") - with open(token_path, "wb") as f: - json.dump(self.token, f) - self.token = token_path - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.token - credentials = service_account.Credentials.from_service_account_file( - self.token - ) - - scoped_credentials = credentials.with_scopes( - ["https://www.googleapis.com/auth/cloud-platform"] - ) - - client = storage.Client(credentials=scoped_credentials) - else: - client = storage.Client(credentials=self.token) + if not self.token: + self.token = None + scoped_credentials = GoogleCredentials(self.token) + client = storage.Client(credentials=scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) def _set_bucket_and_path(self): From 0169f503a273d6d00eb9b4a84a1cea2957455304 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 12:10:47 +0400 Subject: [PATCH 27/83] Add browser token generation --- hub/core/storage/gcs.py | 125 ++++++++++++++++++++++++++-------------- 1 file changed, 82 insertions(+), 43 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 79d492a45f..af83e05bac 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -1,4 +1,5 @@ import posixpath +import pickle import json import os from typing import Dict, Union @@ -10,34 +11,40 @@ import google.auth.compute_engine # type: ignore import google.auth.credentials # type: ignore import google.auth.exceptions # type: ignore -from google.oauth2.credentials import Credentials # type: ignore - -from hub.core.storage.provider import StorageProvider +from google_auth_oauthlib.flow import InstalledAppFlow # type: ignore +from hub.core.storage.provider import StorageProvider # type: ignore +from hub.util.exceptions import GCSDefaultCredsNotFoundError class GoogleCredentials: - def __init__(self, token, project=None): + def __init__(self, token: Union[str, Dict] = None, project: str = None): self.scope = "https://www.googleapis.com/auth/cloud-platform" self.project = project - self.access = "full-access" - self.heads = {} self.credentials = None - self.method = None self.token = token + self.tokens = {} self.connect(method=token) + @classmethod + def load_tokens(cls): + """Get "browser" tokens from disc""" + try: + with open(".gcstoken", "rb") as f: + tokens = pickle.load(f) + except Exception: + tokens = {} + GoogleCredentials.tokens = tokens + + def _save_tokens(self): + with open(".gcstoken", "wb") as f: + pickle.dump(self.tokens, f, 2) + def _connect_google_default(self): credentials, project = gauth.default(scopes=[self.scope]) - msg = textwrap.dedent( - """\ - User-provided project '{}' does not match the google default project '{}'. Either - 1. Accept the google-default project by not passing a `project` to GCSFileSystem - 2. Configure the default project to match the user-provided project (gcloud config set project) - 3. Use an authorization method other than 'google_default' by providing 'token=...' - """ - ) if self.project and self.project != project: - raise ValueError(msg.format(self.project, project)) + raise ValueError( + "Project name does not match the google default project name." + ) self.project = project self.credentials = credentials @@ -45,29 +52,38 @@ def _connect_cloud(self): self.credentials = gauth.compute_engine.Credentials() def _connect_cache(self): - project, access = self.project, self.access - if (project, access) in self.tokens: - credentials = self.tokens[(project, access)] + credentials = self.project.get(self.project, None) + if credentials: self.credentials = credentials - def _dict_to_credentials(self, token): + def _dict_to_credentials(self, token: Dict): """ Convert old dict-style token. Does not preserve access token itself, assumes refresh required. + + Args: + token (dict): dictionary with token to be stored in .json file. + + Returns: + Path to stored .json file. """ - token_path = posixpath.expanduser("gcs.json") + token_path = posixpath.expanduser("~/.config/gcloud/gcs.json") with open(token_path, "w") as f: json.dump(token, f) return token_path - def _connect_token(self, token): + def _connect_token(self, token: Union[str, Dict]): """ - Connect using a concrete token - Parameters - ---------- - token: str, dict or Credentials - If a str, try to load as a Service file, or next as a JSON; if - dict, try to interpret as credentials; if Credentials, use directly. + Connect using a concrete token. + + Args: + token (str/dict/Credentials): + If a str, try to load as a Service file, or next as a JSON; if + dict, try to interpret as credentials; if Credentials, use directly. + + Raises: + FileNotFoundError: If token file doesn't exist. + ValueError: If token format isn't supported by gauth. """ if isinstance(token, str): if not os.path.exists(token): @@ -86,8 +102,6 @@ def _connect_token(self, token): else: raise ValueError("Token format not understood") self.credentials = credentials - if self.credentials.valid: - self.credentials.apply(self.heads) def _connect_service(self, fn): credentials = service_account.Credentials.from_service_account_file( @@ -95,6 +109,34 @@ def _connect_service(self, fn): ) self.credentials = credentials + def _connect_browser(self): + try: + import pdb + + pdb.set_trace() + with open( + posixpath.expanduser( + "~/.config/gcloud/application_default_credentials.json" + ), + "r", + ) as f: + default_config = json.load(f) + except: + raise GCSDefaultCredsNotFoundError() + client_config = { + "installed": { + "client_id": default_config["client_id"], + "client_secret": default_config["client_secret"], + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + } + } + flow = InstalledAppFlow.from_client_config(client_config, [self.scope]) + credentials = flow.run_console() + self.tokens[(self.project, self.access)] = credentials + self._save_tokens() + self.credentials = credentials + def _connect_anon(self): self.credentials = None @@ -102,11 +144,11 @@ def connect(self, method=None): """ Establish session token. A new token will be requested if the current one is within 100s of expiry. - Parameters - ---------- - method: str (google_default|cache|cloud|token|anon|browser) or None - Type of authorisation to implement - calls `_connect_*` methods. - If None, will try sequence of methods. + + Args: + method (str): Supported methods: google_default|cache|cloud|token|anon|browser|None. + Type of authorisation to implement - calls `_connect_*` methods. + If None, will try sequence of methods. """ if method not in [ "google_default", @@ -114,6 +156,7 @@ def connect(self, method=None): "cloud", "token", "anon", + "browser", None, ]: self._connect_token(method) @@ -123,21 +166,16 @@ def connect(self, method=None): break else: self.__getattribute__("_connect_" + method)() - self.method = method class GCSProvider(StorageProvider): """Provider class for using GC storage.""" - def __init__( - self, - root: str, - token: Union[str, Dict] = None, - ): + def __init__(self, root: str, token: Union[str, Dict] = None, project: str = None): """Initializes the GCSProvider Example: - gcs_provider = GCSProvider("snark-test/gcs_ds") + gcs_provider = GCSProvider("gcs://my-bucket/gcs_ds") Args: root (str): The root of the provider. All read/write request keys will be appended to root. @@ -145,6 +183,7 @@ def __init__( """ self.root = root self.token: Union[str, Dict, None] = token + self.project = project self.missing_exceptions = ( FileNotFoundError, IsADirectoryError, @@ -157,7 +196,7 @@ def _initialize_provider(self): self._set_bucket_and_path() if not self.token: self.token = None - scoped_credentials = GoogleCredentials(self.token) + scoped_credentials = GoogleCredentials(self.token, project=self.project) client = storage.Client(credentials=scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) From fb7164b7e993b7b832244da342a3b92ca6082b35 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 12:23:19 +0400 Subject: [PATCH 28/83] Add exception --- hub/core/storage/gcs.py | 16 ++++++++-------- hub/util/exceptions.py | 8 ++++++++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index af83e05bac..f297388a6c 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -48,9 +48,6 @@ def _connect_google_default(self): self.project = project self.credentials = credentials - def _connect_cloud(self): - self.credentials = gauth.compute_engine.Credentials() - def _connect_cache(self): credentials = self.project.get(self.project, None) if credentials: @@ -140,28 +137,26 @@ def _connect_browser(self): def _connect_anon(self): self.credentials = None - def connect(self, method=None): + def connect(self, method: str = None): """ Establish session token. A new token will be requested if the current one is within 100s of expiry. Args: - method (str): Supported methods: google_default|cache|cloud|token|anon|browser|None. + method (str): Supported methods: google_default|cache|anon|browser|None. Type of authorisation to implement - calls `_connect_*` methods. If None, will try sequence of methods. """ if method not in [ "google_default", "cache", - "cloud", - "token", "anon", "browser", None, ]: self._connect_token(method) elif method is None: - for meth in ["google_default", "cache", "cloud", "anon"]: + for meth in ["google_default", "cache", "anon"]: self.connect(method=meth) break else: @@ -180,6 +175,11 @@ def __init__(self, root: str, token: Union[str, Dict] = None, project: str = Non Args: root (str): The root of the provider. All read/write request keys will be appended to root. token (str/Dict): GCP token, used for fetching credentials for storage). + Can be a path to the credentials file, actual credential dictionary or one of the folowing: + `google_default`: Tries to load default credentials for the specified project. + `cache`: Retrieves the previously used credentials from cache if exist. + `anon`: Sets credentials=None + `browser`: Generates and stores new token file using cli. """ self.root = root self.token: Union[str, Dict, None] = token diff --git a/hub/util/exceptions.py b/hub/util/exceptions.py index d7e0db1d8f..652bf275db 100644 --- a/hub/util/exceptions.py +++ b/hub/util/exceptions.py @@ -535,3 +535,11 @@ def __init__(self): class CorruptedSampleError(Exception): def __init__(self, compression): super().__init__(f"Invalid {compression} file.") + + +class GCSDefaultCredsNotFoundError(Exception): + def __init__(self): + super().__init__( + "Unable to find default google application credentials at ~/.config/gcloud/application_default_credentials.json. " + "Please make sure you initialized gcloud service earlier." + ) From d2a865cdc1506520a531a17701966f2caeeec9ec Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 12:47:08 +0400 Subject: [PATCH 29/83] Fix lint --- hub/core/storage/gcs.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index f297388a6c..d9daa27fe1 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -3,7 +3,6 @@ import json import os from typing import Dict, Union -import textwrap from google.cloud import storage # type: ignore from google.oauth2 import service_account # type: ignore @@ -12,7 +11,7 @@ import google.auth.credentials # type: ignore import google.auth.exceptions # type: ignore from google_auth_oauthlib.flow import InstalledAppFlow # type: ignore -from hub.core.storage.provider import StorageProvider # type: ignore +from hub.core.storage.provider import StorageProvider from hub.util.exceptions import GCSDefaultCredsNotFoundError @@ -22,7 +21,7 @@ def __init__(self, token: Union[str, Dict] = None, project: str = None): self.project = project self.credentials = None self.token = token - self.tokens = {} + self.tokens: Dict[str] = {} self.connect(method=token) @classmethod @@ -59,7 +58,7 @@ def _dict_to_credentials(self, token: Dict): Does not preserve access token itself, assumes refresh required. Args: - token (dict): dictionary with token to be stored in .json file. + token (Dict): dictionary with token to be stored in .json file. Returns: Path to stored .json file. @@ -69,7 +68,7 @@ def _dict_to_credentials(self, token: Dict): json.dump(token, f) return token_path - def _connect_token(self, token: Union[str, Dict]): + def _connect_token(self, token: Union[str, Dict] = None): """ Connect using a concrete token. @@ -137,13 +136,13 @@ def _connect_browser(self): def _connect_anon(self): self.credentials = None - def connect(self, method: str = None): + def connect(self, method: Union[str, Dict] = None): """ Establish session token. A new token will be requested if the current one is within 100s of expiry. Args: - method (str): Supported methods: google_default|cache|anon|browser|None. + method (str/Dict): Supported methods: google_default|cache|anon|browser|None. Type of authorisation to implement - calls `_connect_*` methods. If None, will try sequence of methods. """ @@ -180,6 +179,7 @@ def __init__(self, root: str, token: Union[str, Dict] = None, project: str = Non `cache`: Retrieves the previously used credentials from cache if exist. `anon`: Sets credentials=None `browser`: Generates and stores new token file using cli. + project (str): Name of the project from gcloud. """ self.root = root self.token: Union[str, Dict, None] = token From f9cf3dd96154d292ce432399b0857a2dbf491354 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 13:01:38 +0400 Subject: [PATCH 30/83] Add oauth to requirements --- hub/core/storage/gcs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index d9daa27fe1..145be1ab8b 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -21,7 +21,7 @@ def __init__(self, token: Union[str, Dict] = None, project: str = None): self.project = project self.credentials = None self.token = token - self.tokens: Dict[str] = {} + self.tokens: Dict[str, Dict] = {} self.connect(method=token) @classmethod @@ -158,8 +158,10 @@ def connect(self, method: Union[str, Dict] = None): for meth in ["google_default", "cache", "anon"]: self.connect(method=meth) break - else: + elif isinstance(method, str): self.__getattribute__("_connect_" + method)() + else: + raise AttributeError(f"Invalid method: {method}") class GCSProvider(StorageProvider): From 3272e384ec36cc8cd6bbaa3c69eb248406dd2aea Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 13:15:10 +0400 Subject: [PATCH 31/83] Fix docstrings --- hub/core/storage/gcs.py | 3 +++ hub/requirements/common.txt | 1 + 2 files changed, 4 insertions(+) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 145be1ab8b..51bda39816 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -145,6 +145,9 @@ def connect(self, method: Union[str, Dict] = None): method (str/Dict): Supported methods: google_default|cache|anon|browser|None. Type of authorisation to implement - calls `_connect_*` methods. If None, will try sequence of methods. + + Raises: + AttributeError: If method is invalid. """ if method not in [ "google_default", diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index 6b00cdfe91..cd0cd52f5a 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -4,6 +4,7 @@ boto3 boto3-stubs[essential] google-cloud-storage~=1.42.0 google-auth~=2.0.1 +google-auth-oauthlib~=0.4.5 pathos humbug>=0.2.6 types-requests From 0280028b16e2798b85af621b34b4313b2717e502 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 15:02:39 +0400 Subject: [PATCH 32/83] Change no_output_timeout in config --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fbb0cc132c..c541087ed3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -177,6 +177,7 @@ commands: $Env:GOOGLE_APPLICATION_CREDENTIALS = $Env:CI_GCS_PATH setx /m GOOGLE_APPLICATION_CREDENTIALS "$Env:GOOGLE_APPLICATION_CREDENTIALS" python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* + no_output_timeout: 30m - when: condition: << parameters.unix-like >> steps: @@ -187,6 +188,7 @@ commands: command: | export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* + no_output_timeout: 30m run-backwards-compatibility-tests: steps: @@ -292,7 +294,6 @@ jobs: condition: << parameters.unix-like >> steps: - slack-status - no_output_timeout: 30m deploy: executor: linux environment: From b19824597f736046339965b5b83f6bc8721a2b52 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 15:17:17 +0400 Subject: [PATCH 33/83] Add project to state --- hub/core/storage/gcs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 51bda39816..c51379fb6c 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -48,7 +48,7 @@ def _connect_google_default(self): self.credentials = credentials def _connect_cache(self): - credentials = self.project.get(self.project, None) + credentials = self.tokens.get(self.project, None) if credentials: self.credentials = credentials @@ -266,10 +266,11 @@ def __contains__(self, key): return stats def __getstate__(self): - return (self.root, self.token, self.missing_exceptions) + return (self.root, self.token, self.missing_exceptions, self.project) def __setstate__(self, state): self.root = state[0] self.token = state[1] self.missing_exceptions = state[2] + self.project = state[3] self._initialize_provider() From 1ad25143500124153537f414c25ba0c2ba852d60 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 15:35:01 +0400 Subject: [PATCH 34/83] Add multiprocessing import to test_transform --- hub/core/transform/test_transform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 28c568406e..8b0b5909bc 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -1,6 +1,7 @@ import hub import pytest import numpy as np +import multiprocessing from click.testing import CliRunner from hub.core.storage.memory import MemoryProvider from hub.util.remove_cache import remove_memory_cache From 96f928d1f5ee0e459c7010841c33353bbc7b1e7c Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 26 Aug 2021 16:52:16 +0400 Subject: [PATCH 35/83] Rename class --- hub/core/storage/gcs.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index c51379fb6c..c59383c88d 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -15,7 +15,7 @@ from hub.util.exceptions import GCSDefaultCredsNotFoundError -class GoogleCredentials: +class GCloudCredentials: def __init__(self, token: Union[str, Dict] = None, project: str = None): self.scope = "https://www.googleapis.com/auth/cloud-platform" self.project = project @@ -28,14 +28,14 @@ def __init__(self, token: Union[str, Dict] = None, project: str = None): def load_tokens(cls): """Get "browser" tokens from disc""" try: - with open(".gcstoken", "rb") as f: + with open(".gcs_token", "rb") as f: tokens = pickle.load(f) except Exception: tokens = {} - GoogleCredentials.tokens = tokens + GCloudCredentials.tokens = tokens def _save_tokens(self): - with open(".gcstoken", "wb") as f: + with open(".gcs_token", "wb") as f: pickle.dump(self.tokens, f, 2) def _connect_google_default(self): @@ -107,9 +107,6 @@ def _connect_service(self, fn): def _connect_browser(self): try: - import pdb - - pdb.set_trace() with open( posixpath.expanduser( "~/.config/gcloud/application_default_credentials.json" @@ -201,7 +198,7 @@ def _initialize_provider(self): self._set_bucket_and_path() if not self.token: self.token = None - scoped_credentials = GoogleCredentials(self.token, project=self.project) + scoped_credentials = GCloudCredentials(self.token, project=self.project) client = storage.Client(credentials=scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) From a1a3fc519f68f8ca304f3019c4cc0aee10ec9b01 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 10:22:50 +0400 Subject: [PATCH 36/83] Downgrade gcloud storage --- hub/requirements/common.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index c220796555..1e9869855f 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -2,7 +2,7 @@ numpy pillow==8.2.0 boto3 boto3-stubs[essential] -google-cloud-storage~=1.42.0 +google-cloud-storage~=1.13.0 google-auth~=2.0.1 google-auth-oauthlib~=0.4.5 pathos From 1aaffafb7bf29acdcd6f96ec48a6f812b1c361f2 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 11:09:43 +0400 Subject: [PATCH 37/83] Revert "Downgrade gcloud storage" This reverts commit a1a3fc519f68f8ca304f3019c4cc0aee10ec9b01. --- hub/requirements/common.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index 1e9869855f..c220796555 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -2,7 +2,7 @@ numpy pillow==8.2.0 boto3 boto3-stubs[essential] -google-cloud-storage~=1.13.0 +google-cloud-storage~=1.42.0 google-auth~=2.0.1 google-auth-oauthlib~=0.4.5 pathos From 3f36ab22743c700e9de66a3563c48d249ec0d573 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 11:47:34 +0400 Subject: [PATCH 38/83] Skip some transform tests for gcs --- hub/core/transform/test_transform.py | 32 +++++++++++++++------------- hub/tests/dataset_fixtures.py | 12 +++++++++++ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 8b0b5909bc..aa080326c4 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -6,7 +6,7 @@ from hub.core.storage.memory import MemoryProvider from hub.util.remove_cache import remove_memory_cache from hub.tests.common import parametrize_num_workers -from hub.tests.dataset_fixtures import enabled_datasets +from hub.tests.dataset_fixtures import enabled_datasets, enabled_non_gcs_datasets from hub.util.exceptions import InvalidOutputDatasetError all_compressions = pytest.mark.parametrize("sample_compression", [None, "png", "jpeg"]) @@ -44,8 +44,9 @@ def crop_image(sample_in, samples_out, copy=1): samples_out.image.append(sample_in.image.numpy()[:100, :100, :]) -@enabled_datasets -def test_single_transform_hub_dataset(ds): +@enabled_non_gcs_datasets +def test_single_transform_hub_dataset(non_gcs_ds): + ds = non_gcs_ds with CliRunner().isolated_filesystem(): with hub.dataset("./test/transform_hub_in_generic") as data_in: data_in.create_tensor("image") @@ -119,9 +120,10 @@ def test_chain_transform_list_small(ds): ) -@enabled_datasets +@enabled_non_gcs_datasets @pytest.mark.xfail(raises=NotImplementedError, strict=True) -def test_chain_transform_list_big(ds): +def test_chain_transform_list_big(non_gcs_ds): + ds = non_gcs_ds ls = [i for i in range(2)] ds_out = ds ds_out.create_tensor("image") @@ -164,10 +166,10 @@ def test_chain_transform_list_small_processed(ds): @all_compressions -@enabled_datasets -def test_transform_hub_read(ds, cat_path, sample_compression): +@enabled_non_gcs_datasets +def test_transform_hub_read(non_gcs_ds, cat_path, sample_compression): data_in = [cat_path] * 10 - ds_out = ds + ds_out = non_gcs_ds ds_out.create_tensor("image", htype="image", sample_compression=sample_compression) read_image().eval(data_in, ds_out, num_workers=8) @@ -178,10 +180,10 @@ def test_transform_hub_read(ds, cat_path, sample_compression): @all_compressions -@enabled_datasets -def test_transform_hub_read_pipeline(ds, cat_path, sample_compression): +@enabled_non_gcs_datasets +def test_transform_hub_read_pipeline(non_gcs_ds, cat_path, sample_compression): data_in = [cat_path] * 10 - ds_out = ds + ds_out = non_gcs_ds ds_out.create_tensor("image", htype="image", sample_compression=sample_compression) pipeline = hub.compose([read_image(), crop_image(copy=2)]) pipeline.eval(data_in, ds_out, num_workers=8) @@ -191,16 +193,16 @@ def test_transform_hub_read_pipeline(ds, cat_path, sample_compression): np.testing.assert_array_equal(ds_out.image[i].numpy(), ds_out.image[0].numpy()) -@enabled_datasets -def test_hub_like(ds): +@enabled_non_gcs_datasets +def test_hub_like(non_gcs_ds): with CliRunner().isolated_filesystem(): - data_in = ds + data_in = non_gcs_ds data_in.create_tensor("image", htype="image", sample_compression="png") data_in.create_tensor("label", htype="class_label") for i in range(1, 100): data_in.image.append(i * np.ones((i, i), dtype="uint8")) data_in.label.append(i * np.ones((1,), dtype="uint32")) - ds_out = hub.like("test/transform_hub_like", ds) + ds_out = hub.like("test/transform_hub_like", non_gcs_ds) fn2(copy=1, mul=2).eval(data_in, ds_out, num_workers=5) assert len(ds_out) == 99 for index in range(1, 100): diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 6fa36289c7..59aa7ab4ba 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -7,6 +7,12 @@ indirect=True, ) +enabled_non_gcs_datasets = pytest.mark.parametrize( + "non_gcs_ds", + ["memory_ds", "local_ds", "s3_ds"], + indirect=True, +) + enabled_persistent_dataset_generators = pytest.mark.parametrize( "ds_generator", ["local_ds_generator", "s3_ds_generator", "gcs_ds_generator"], @@ -77,6 +83,12 @@ def ds(request): return request.getfixturevalue(request.param) +@pytest.fixture +def non_gcs_ds(request): + """Used with parametrize to use all enabled dataset fixtures except gcs_ds.""" + return request.getfixturevalue(request.param) + + @pytest.fixture def ds_generator(request): """Used with parametrize to use all enabled persistent dataset generator fixtures.""" From bac553222c0c8c75f24621879e385858ce9c1ec9 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 12:53:13 +0400 Subject: [PATCH 39/83] Add retry --- hub/core/storage/gcs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index c59383c88d..9c91dcb6a3 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -5,6 +5,7 @@ from typing import Dict, Union from google.cloud import storage # type: ignore +from google.api_core import retry # type: ignore from google.oauth2 import service_account # type: ignore import google.auth as gauth # type: ignore import google.auth.compute_engine # type: ignore @@ -199,6 +200,7 @@ def _initialize_provider(self): if not self.token: self.token = None scoped_credentials = GCloudCredentials(self.token, project=self.project) + self.retry = retry.Retry(deadline=60) client = storage.Client(credentials=scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) @@ -227,7 +229,7 @@ def __getitem__(self, key): """Retrieve data""" try: blob = self.client_bucket.get_blob(self._get_path_from_key(key)) - return blob.download_as_bytes() + return blob.download_as_bytes(retry=self.retry) except self.missing_exceptions: raise KeyError(key) @@ -239,7 +241,7 @@ def __setitem__(self, key, value): value = value.tobytes() elif isinstance(value, bytearray): value = bytes(value) - blob.upload_from_string(value) + blob.upload_from_string(value, retry=self.retry) def __iter__(self): """Iterating over the structure""" From 4c26979a5e76ae763a39236e585a7bd8467f1f6d Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 14:36:44 +0400 Subject: [PATCH 40/83] Add token tests --- .../storage/tests/test_storage_provider.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/hub/core/storage/tests/test_storage_provider.py b/hub/core/storage/tests/test_storage_provider.py index 23fe2e943d..ea3f374b6b 100644 --- a/hub/core/storage/tests/test_storage_provider.py +++ b/hub/core/storage/tests/test_storage_provider.py @@ -1,5 +1,10 @@ +import json +from google.auth.environment_vars import GCE_METADATA_HOST +from hub.tests.path_fixtures import gcs_creds from hub.tests.storage_fixtures import enabled_storages, enabled_persistent_storages from hub.tests.cache_fixtures import enabled_cache_chains +from hub.core.storage.gcs import GCloudCredentials +import os import pytest from hub.constants import MB import pickle @@ -132,3 +137,20 @@ def test_pickling(storage): pickled_storage = pickle.dumps(storage) unpickled_storage = pickle.loads(pickled_storage) assert unpickled_storage[FILE_1] == b"hello world" + + +def test_gcs_tokens(): + gcreds = GCloudCredentials() + assert gcreds.credentials + token_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + gcreds = GCloudCredentials(token=token_path) + assert gcreds.credentials + + with open(token_path, "rb") as f: + token = json.load(f) + gcreds = GCloudCredentials(token=token) + assert gcreds.credentials + gcreds = GCloudCredentials(token="google_default") + assert gcreds.credentials + gcreds = GCloudCredentials(token="anon") + assert not gcreds.credentials From 0858e98fe11e30c4ecd9882217c78de27f12e683 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 14:40:33 +0400 Subject: [PATCH 41/83] Modify cache token --- hub/core/storage/gcs.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 9c91dcb6a3..ca7349cf0a 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -25,19 +25,17 @@ def __init__(self, token: Union[str, Dict] = None, project: str = None): self.tokens: Dict[str, Dict] = {} self.connect(method=token) - @classmethod - def load_tokens(cls): + def load_tokens(self): """Get "browser" tokens from disc""" try: with open(".gcs_token", "rb") as f: - tokens = pickle.load(f) + return pickle.load(f) except Exception: - tokens = {} - GCloudCredentials.tokens = tokens + return None - def _save_tokens(self): + def _save_tokens(self, token): with open(".gcs_token", "wb") as f: - pickle.dump(self.tokens, f, 2) + pickle.dump(token, f, 2) def _connect_google_default(self): credentials, project = gauth.default(scopes=[self.scope]) @@ -49,7 +47,7 @@ def _connect_google_default(self): self.credentials = credentials def _connect_cache(self): - credentials = self.tokens.get(self.project, None) + credentials = self.load_tokens() if credentials: self.credentials = credentials @@ -127,8 +125,7 @@ def _connect_browser(self): } flow = InstalledAppFlow.from_client_config(client_config, [self.scope]) credentials = flow.run_console() - self.tokens[(self.project, self.access)] = credentials - self._save_tokens() + self._save_tokens(credentials) self.credentials = credentials def _connect_anon(self): From fa2bc3d2fb2f0ad14820f21d16c131ed2f33e221 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 14:44:53 +0400 Subject: [PATCH 42/83] Remove import --- hub/core/storage/tests/test_storage_provider.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hub/core/storage/tests/test_storage_provider.py b/hub/core/storage/tests/test_storage_provider.py index ea3f374b6b..9a9f556016 100644 --- a/hub/core/storage/tests/test_storage_provider.py +++ b/hub/core/storage/tests/test_storage_provider.py @@ -1,5 +1,4 @@ import json -from google.auth.environment_vars import GCE_METADATA_HOST from hub.tests.path_fixtures import gcs_creds from hub.tests.storage_fixtures import enabled_storages, enabled_persistent_storages from hub.tests.cache_fixtures import enabled_cache_chains From 908479fdbaa4377d491e0a3e73b8202093eee5a6 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 15:20:14 +0400 Subject: [PATCH 43/83] Create tempfile for dicts --- hub/core/storage/gcs.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index ca7349cf0a..5a134f0f5e 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -2,6 +2,7 @@ import pickle import json import os +import tempfile from typing import Dict, Union from google.cloud import storage # type: ignore @@ -62,10 +63,10 @@ def _dict_to_credentials(self, token: Dict): Returns: Path to stored .json file. """ - token_path = posixpath.expanduser("~/.config/gcloud/gcs.json") - with open(token_path, "w") as f: + token_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False) + with open(token_file.name, "w") as f: json.dump(token, f) - return token_path + return token_file.name def _connect_token(self, token: Union[str, Dict] = None): """ From a37186d0630e1568db29b691622b4b6330e43ed1 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 27 Aug 2021 16:53:54 +0400 Subject: [PATCH 44/83] Add excpetion test --- hub/core/storage/tests/test_storage_provider.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hub/core/storage/tests/test_storage_provider.py b/hub/core/storage/tests/test_storage_provider.py index 9a9f556016..9435abde4e 100644 --- a/hub/core/storage/tests/test_storage_provider.py +++ b/hub/core/storage/tests/test_storage_provider.py @@ -3,6 +3,7 @@ from hub.tests.storage_fixtures import enabled_storages, enabled_persistent_storages from hub.tests.cache_fixtures import enabled_cache_chains from hub.core.storage.gcs import GCloudCredentials +from hub.util.exceptions import GCSDefaultCredsNotFoundError import os import pytest from hub.constants import MB @@ -153,3 +154,5 @@ def test_gcs_tokens(): assert gcreds.credentials gcreds = GCloudCredentials(token="anon") assert not gcreds.credentials + with pytest.raises(GCSDefaultCredsNotFoundError) as e: + gcreds = GCloudCredentials(token="browser") From 0b14999f3c30b4870e8215b54de56a69eebe4fbb Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 13:53:21 +0400 Subject: [PATCH 45/83] Add client reinitializing --- hub/core/storage/gcs.py | 8 ++++++-- hub/core/transform/transform.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 5a134f0f5e..4457751cbc 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -197,9 +197,13 @@ def _initialize_provider(self): self._set_bucket_and_path() if not self.token: self.token = None - scoped_credentials = GCloudCredentials(self.token, project=self.project) + self.scoped_credentials = GCloudCredentials(self.token, project=self.project) self.retry = retry.Retry(deadline=60) - client = storage.Client(credentials=scoped_credentials.credentials) + client = storage.Client(credentials=self.scoped_credentials.credentials) + self.client_bucket = client.get_bucket(self.bucket) + + def reinitialize_provider(self): + client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) def _set_bucket_and_path(self): diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 6abf3f09ee..d29b9dd42d 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -1,3 +1,4 @@ +from hub.core.storage.gcs import GCSProvider import hub import math from typing import List @@ -112,7 +113,8 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - + if isinstance(ds_out.storage.next_storage, GCSProvider): + ds_out.storage.next_storage.reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush From 33f81979b99d9ba4049649ebc1773ed3c5c3d2a5 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 14:03:16 +0400 Subject: [PATCH 46/83] Disable some api tests for gcs --- hub/api/tests/test_api.py | 7 ++++--- hub/core/transform/test_transform.py | 20 +++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/hub/api/tests/test_api.py b/hub/api/tests/test_api.py index 10b2c0f125..48715e4258 100644 --- a/hub/api/tests/test_api.py +++ b/hub/api/tests/test_api.py @@ -17,6 +17,7 @@ from hub.tests.dataset_fixtures import ( enabled_datasets, enabled_persistent_dataset_generators, + enabled_non_gcs_datasets, ) @@ -142,7 +143,7 @@ def test_stringify_with_path(local_ds): assert str(ds) == f"Dataset(path='{local_ds.path}', tensors=[])" -@enabled_datasets +@enabled_non_gcs_datasets def test_compute_fixed_tensor(ds): ds.create_tensor("image") ds.image.extend(np.ones((32, 28, 28))) @@ -150,7 +151,7 @@ def test_compute_fixed_tensor(ds): np.testing.assert_array_equal(ds.image.numpy(), np.ones((32, 28, 28))) -@enabled_datasets +@enabled_non_gcs_datasets def test_compute_dynamic_tensor(ds): ds.create_tensor("image") @@ -213,7 +214,7 @@ def test_empty_samples(ds: Dataset): np.testing.assert_array_equal(actual, expected) -@enabled_datasets +@enabled_non_gcs_datasets def test_safe_downcasting(ds: Dataset): int_tensor = ds.create_tensor("int", dtype="uint8") int_tensor.append(0) diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index aa080326c4..9b34c91d1a 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -45,8 +45,7 @@ def crop_image(sample_in, samples_out, copy=1): @enabled_non_gcs_datasets -def test_single_transform_hub_dataset(non_gcs_ds): - ds = non_gcs_ds +def test_single_transform_hub_dataset(ds): with CliRunner().isolated_filesystem(): with hub.dataset("./test/transform_hub_in_generic") as data_in: data_in.create_tensor("image") @@ -122,8 +121,7 @@ def test_chain_transform_list_small(ds): @enabled_non_gcs_datasets @pytest.mark.xfail(raises=NotImplementedError, strict=True) -def test_chain_transform_list_big(non_gcs_ds): - ds = non_gcs_ds +def test_chain_transform_list_big(ds): ls = [i for i in range(2)] ds_out = ds ds_out.create_tensor("image") @@ -167,9 +165,9 @@ def test_chain_transform_list_small_processed(ds): @all_compressions @enabled_non_gcs_datasets -def test_transform_hub_read(non_gcs_ds, cat_path, sample_compression): +def test_transform_hub_read(ds, cat_path, sample_compression): data_in = [cat_path] * 10 - ds_out = non_gcs_ds + ds_out = ds ds_out.create_tensor("image", htype="image", sample_compression=sample_compression) read_image().eval(data_in, ds_out, num_workers=8) @@ -181,9 +179,9 @@ def test_transform_hub_read(non_gcs_ds, cat_path, sample_compression): @all_compressions @enabled_non_gcs_datasets -def test_transform_hub_read_pipeline(non_gcs_ds, cat_path, sample_compression): +def test_transform_hub_read_pipeline(ds, cat_path, sample_compression): data_in = [cat_path] * 10 - ds_out = non_gcs_ds + ds_out = ds ds_out.create_tensor("image", htype="image", sample_compression=sample_compression) pipeline = hub.compose([read_image(), crop_image(copy=2)]) pipeline.eval(data_in, ds_out, num_workers=8) @@ -194,15 +192,15 @@ def test_transform_hub_read_pipeline(non_gcs_ds, cat_path, sample_compression): @enabled_non_gcs_datasets -def test_hub_like(non_gcs_ds): +def test_hub_like(ds): with CliRunner().isolated_filesystem(): - data_in = non_gcs_ds + data_in = ds data_in.create_tensor("image", htype="image", sample_compression="png") data_in.create_tensor("label", htype="class_label") for i in range(1, 100): data_in.image.append(i * np.ones((i, i), dtype="uint8")) data_in.label.append(i * np.ones((1,), dtype="uint32")) - ds_out = hub.like("test/transform_hub_like", non_gcs_ds) + ds_out = hub.like("test/transform_hub_like", ds) fn2(copy=1, mul=2).eval(data_in, ds_out, num_workers=5) assert len(ds_out) == 99 for index in range(1, 100): From cd0490b6c359ed1e4012b7363c13ef48de22bb01 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 14:08:54 +0400 Subject: [PATCH 47/83] Change fixture name --- hub/tests/dataset_fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 59aa7ab4ba..3f2fd754b0 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -8,7 +8,7 @@ ) enabled_non_gcs_datasets = pytest.mark.parametrize( - "non_gcs_ds", + "ds", ["memory_ds", "local_ds", "s3_ds"], indirect=True, ) From ef1156e3df8bf18731d88f529bb640eb85bc8257 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 14:11:32 +0400 Subject: [PATCH 48/83] Check cache --- hub/core/transform/transform.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index d29b9dd42d..45a8d4c0d7 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -1,3 +1,5 @@ +from hub.core.storage.lru_cache import LRUCache +from hub.core import storage from hub.core.storage.gcs import GCSProvider import hub import math @@ -113,7 +115,9 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - if isinstance(ds_out.storage.next_storage, GCSProvider): + if isinstance(ds_out, storage, LRUCache) and isinstance( + ds_out.storage.next_storage, GCSProvider + ): ds_out.storage.next_storage.reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush From 539f333e37471e36bdff91d2839333ce0d862372 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 14:28:05 +0400 Subject: [PATCH 49/83] Fix check --- hub/core/transform/transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 45a8d4c0d7..53c5c75cde 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -115,7 +115,7 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - if isinstance(ds_out, storage, LRUCache) and isinstance( + if isinstance(ds_out.storage, LRUCache) and isinstance( ds_out.storage.next_storage, GCSProvider ): ds_out.storage.next_storage.reinitialize_provider() From 8ac9a157b07e94b234f452e55c222a6efb4daf30 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 14:46:00 +0400 Subject: [PATCH 50/83] Change method name --- hub/core/storage/gcs.py | 2 +- hub/core/transform/transform.py | 2 +- hub/requirements/tests.txt | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 4457751cbc..d3e513c1db 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -202,7 +202,7 @@ def _initialize_provider(self): client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) - def reinitialize_provider(self): + def _reinitialize_provider(self): client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 53c5c75cde..1120f23638 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -118,7 +118,7 @@ def eval( if isinstance(ds_out.storage, LRUCache) and isinstance( ds_out.storage.next_storage, GCSProvider ): - ds_out.storage.next_storage.reinitialize_provider() + ds_out.storage.next_storage._reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush diff --git a/hub/requirements/tests.txt b/hub/requirements/tests.txt index 88984c55b4..1d04648410 100644 --- a/hub/requirements/tests.txt +++ b/hub/requirements/tests.txt @@ -3,6 +3,7 @@ pytest-cases pytest-benchmark pytest-cov mypy +typing_extensions black darglint kaggle \ No newline at end of file From 190da7fb59ed2ba7a744bfe1b435af8d96ba6c90 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 15:23:39 +0400 Subject: [PATCH 51/83] Ignore check --- hub/requirements/tests.txt | 4 ++-- hub/util/storage.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hub/requirements/tests.txt b/hub/requirements/tests.txt index 1d04648410..17715fb6ff 100644 --- a/hub/requirements/tests.txt +++ b/hub/requirements/tests.txt @@ -3,7 +3,7 @@ pytest-cases pytest-benchmark pytest-cov mypy -typing_extensions black darglint -kaggle \ No newline at end of file +kaggle +typing_extensions \ No newline at end of file diff --git a/hub/util/storage.py b/hub/util/storage.py index e65de3c935..0c5a5af798 100644 --- a/hub/util/storage.py +++ b/hub/util/storage.py @@ -46,7 +46,7 @@ def storage_provider_from_path( path, key, secret, session_token, endpoint_url, region, token=token ) elif path.startswith("gcp://") or path.startswith("gcs://"): - storage = GCSProvider(path, creds) + storage = GCSProvider(path, creds) # type: ignore elif path.startswith("mem://"): storage = MemoryProvider(path) elif path.startswith("hub://"): From 5cb0be062c1797efddaa67a328f0a4ce120724a0 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 15:31:24 +0400 Subject: [PATCH 52/83] Move typing_extensions requirement --- hub/requirements/common.txt | 1 + hub/requirements/tests.txt | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index c220796555..e1b6d8f8c3 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -11,3 +11,4 @@ types-requests types-click tqdm lz4 +typing_extensions \ No newline at end of file diff --git a/hub/requirements/tests.txt b/hub/requirements/tests.txt index 17715fb6ff..88984c55b4 100644 --- a/hub/requirements/tests.txt +++ b/hub/requirements/tests.txt @@ -5,5 +5,4 @@ pytest-cov mypy black darglint -kaggle -typing_extensions \ No newline at end of file +kaggle \ No newline at end of file From 8b8a92363b621cb6571d536db87ec3a63413a211 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 15:47:49 +0400 Subject: [PATCH 53/83] Specify version --- hub/requirements/common.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/requirements/common.txt b/hub/requirements/common.txt index e1b6d8f8c3..c84073c166 100644 --- a/hub/requirements/common.txt +++ b/hub/requirements/common.txt @@ -11,4 +11,4 @@ types-requests types-click tqdm lz4 -typing_extensions \ No newline at end of file +typing_extensions>=3.10.0.0 \ No newline at end of file From edbc2fd55655ca4d4f6d98b3c5c9e2a8326761a1 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 15:54:47 +0400 Subject: [PATCH 54/83] Remove unused fixture --- hub/tests/dataset_fixtures.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 3f2fd754b0..2bb84c1c87 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -80,12 +80,7 @@ def generate_hub_cloud_ds(**kwargs): @pytest.fixture def ds(request): """Used with parametrize to use all enabled dataset fixtures.""" - return request.getfixturevalue(request.param) - - -@pytest.fixture -def non_gcs_ds(request): - """Used with parametrize to use all enabled dataset fixtures except gcs_ds.""" + print(request.param) return request.getfixturevalue(request.param) From d81ba97982d40c37708a45056ef0ee21e6b518dd Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 30 Aug 2021 16:06:21 +0400 Subject: [PATCH 55/83] Remove print --- hub/tests/dataset_fixtures.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hub/tests/dataset_fixtures.py b/hub/tests/dataset_fixtures.py index 2bb84c1c87..1297bb63cd 100644 --- a/hub/tests/dataset_fixtures.py +++ b/hub/tests/dataset_fixtures.py @@ -80,7 +80,6 @@ def generate_hub_cloud_ds(**kwargs): @pytest.fixture def ds(request): """Used with parametrize to use all enabled dataset fixtures.""" - print(request.param) return request.getfixturevalue(request.param) From 8dacd6b62adae01a49d2a8b5c212493833367258 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Tue, 31 Aug 2021 16:02:33 +0400 Subject: [PATCH 56/83] Remove reinitialization, add docstrings, modify setitem --- hub/core/storage/gcs.py | 40 +++++++++++++++++++--------- hub/core/transform/test_transform.py | 1 - hub/util/keys.py | 2 +- hub/util/storage.py | 2 +- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index d3e513c1db..b8fc8569dd 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -26,7 +26,7 @@ def __init__(self, token: Union[str, Dict] = None, project: str = None): self.tokens: Dict[str, Dict] = {} self.connect(method=token) - def load_tokens(self): + def _load_tokens(self): """Get "browser" tokens from disc""" try: with open(".gcs_token", "rb") as f: @@ -39,6 +39,14 @@ def _save_tokens(self, token): pickle.dump(token, f, 2) def _connect_google_default(self): + """Attempts to get credentials from google default configurations: + environment variable GOOGLE_APPLICATION_CREDENTIALS, Google Cloud SDK default credentials or default project. + For more details see: https://google-auth.readthedocs.io/en/master/reference/google.auth.html#google.auth.default + + Raises: + ValueError: If the name of the default project doesn't match the GCSProvider project name. + DefaultCredentialsError: If no credentials are found. + """ credentials, project = gauth.default(scopes=[self.scope]) if self.project and self.project != project: raise ValueError( @@ -48,13 +56,14 @@ def _connect_google_default(self): self.credentials = credentials def _connect_cache(self): - credentials = self.load_tokens() + """Load token stored after using _connect_browser() method.""" + credentials = self._load_tokens() if credentials: self.credentials = credentials def _dict_to_credentials(self, token: Dict): """ - Convert old dict-style token. + Convert dict-style token. Does not preserve access token itself, assumes refresh required. Args: @@ -106,6 +115,13 @@ def _connect_service(self, fn): self.credentials = credentials def _connect_browser(self): + """Create and store new credentials using OAuth authentication method. + Requires having default client configuration file in ~/.config/gcloud/application_default_credentials.json + (default location after initializing gcloud). + + Raises: + GCSDefaultCredsNotFoundError: if application deafault credentials can't be found. + """ try: with open( posixpath.expanduser( @@ -130,6 +146,7 @@ def _connect_browser(self): self.credentials = credentials def _connect_anon(self): + """Use provider without specific credentials. Applicable for public projects/buckets.""" self.credentials = None def connect(self, method: Union[str, Dict] = None): @@ -202,10 +219,6 @@ def _initialize_provider(self): client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) - def _reinitialize_provider(self): - client = storage.Client(credentials=self.scoped_credentials.credentials) - self.client_bucket = client.get_bucket(self.bucket) - def _set_bucket_and_path(self): root = self.root.replace("gcp://", "").replace("gcs://", "") self.bucket = root.split("/")[0] @@ -216,9 +229,9 @@ def _set_bucket_and_path(self): def _get_path_from_key(self, key): return posixpath.join(self.path, key) - def _list_keys(self): + def _all_keys(self): self._blob_objects = self.client_bucket.list_blobs(prefix=self.path) - return [obj.name for obj in self._blob_objects] + return {obj.name for obj in self._blob_objects} def clear(self): """Remove all keys below root - empties out mapping""" @@ -239,10 +252,11 @@ def __setitem__(self, key, value): """Store value in key""" self.check_readonly() blob = self.client_bucket.blob(self._get_path_from_key(key)) - if isinstance(value, memoryview): - value = value.tobytes() - elif isinstance(value, bytearray): - value = bytes(value) + if isinstance(value, memoryview) and ( + value.strides == (1,) and value.shape == (len(value.obj),) + ): + value = value.obj + value = bytes(value) blob.upload_from_string(value, retry=self.retry) def __iter__(self): diff --git a/hub/core/transform/test_transform.py b/hub/core/transform/test_transform.py index 9b34c91d1a..18e73d2e7f 100644 --- a/hub/core/transform/test_transform.py +++ b/hub/core/transform/test_transform.py @@ -1,7 +1,6 @@ import hub import pytest import numpy as np -import multiprocessing from click.testing import CliRunner from hub.core.storage.memory import MemoryProvider from hub.util.remove_cache import remove_memory_cache diff --git a/hub/util/keys.py b/hub/util/keys.py index 9c076a89b3..0592a309c3 100644 --- a/hub/util/keys.py +++ b/hub/util/keys.py @@ -42,7 +42,7 @@ def dataset_exists(storage: StorageProvider) -> bool: try: storage[get_dataset_meta_key()] return True - except (KeyError, FileNotFoundError): + except KeyError: return False diff --git a/hub/util/storage.py b/hub/util/storage.py index f4b705bfbf..c07294ec1a 100644 --- a/hub/util/storage.py +++ b/hub/util/storage.py @@ -46,7 +46,7 @@ def storage_provider_from_path( path, key, secret, session_token, endpoint_url, region, token=token ) elif path.startswith("gcp://") or path.startswith("gcs://"): - storage = GCSProvider(path, creds) # type: ignore + storage = GCSProvider(path, creds) elif path.startswith("mem://"): storage = MemoryProvider(path) elif path.startswith("hub://"): From a2af6d8f611188acede566aede1aa3f3ae2ae33e Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Tue, 31 Aug 2021 16:12:02 +0400 Subject: [PATCH 57/83] Remove reinitialize from transform --- hub/core/transform/transform.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 1120f23638..972b0b352b 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -1,6 +1,3 @@ -from hub.core.storage.lru_cache import LRUCache -from hub.core import storage -from hub.core.storage.gcs import GCSProvider import hub import math from typing import List @@ -115,10 +112,6 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - if isinstance(ds_out.storage, LRUCache) and isinstance( - ds_out.storage.next_storage, GCSProvider - ): - ds_out.storage.next_storage._reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush From 183989165dbd16e721e88ec1731f24c09b3e34c7 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Tue, 31 Aug 2021 16:34:29 +0400 Subject: [PATCH 58/83] Fix __all_keys in iter --- hub/core/storage/gcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index b8fc8569dd..4da175b881 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -261,11 +261,11 @@ def __setitem__(self, key, value): def __iter__(self): """Iterating over the structure""" - yield from [f for f in self._list_keys() if not f.endswith("/")] + yield from [f for f in self._all_keys() if not f.endswith("/")] def __len__(self): """Returns length of the structure""" - return len(self._list_keys()) + return len(self._all_keys()) def __delitem__(self, key): """Remove key""" From 0a5a872266b655ec5b56c9beaa4d469c003f9282 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 2 Sep 2021 16:57:00 +0400 Subject: [PATCH 59/83] Increase maxfiles open --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c541087ed3..a2e3d3fdce 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -106,7 +106,8 @@ commands: - run: name: "Install dependencies" command: | - brew install zlib libjpeg webp + brew install zlib libjpeg webp + sudo launchctl limit maxfiles 65536 200000 sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From ccc1952f21eafbd66fccf33e9f519346da7416e9 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 09:44:20 +0400 Subject: [PATCH 60/83] Add limit --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a2e3d3fdce..090789374c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -107,7 +107,7 @@ commands: name: "Install dependencies" command: | brew install zlib libjpeg webp - sudo launchctl limit maxfiles 65536 200000 + sudo launchctl limit maxfiles 65536 300000 sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From 82212b28c7d9a9984e96a91b904cc1a1f863414c Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 12:25:19 +0400 Subject: [PATCH 61/83] Revert reinit --- hub/core/storage/gcs.py | 4 ++++ hub/core/transform/transform.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 4da175b881..41166acd1d 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -219,6 +219,10 @@ def _initialize_provider(self): client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) + def reinitialize_provider(self): + client = storage.Client(credentials=self.scoped_credentials.credentials) + self.client_bucket = client.get_bucket(self.bucket) + def _set_bucket_and_path(self): root = self.root.replace("gcp://", "").replace("gcs://", "") self.bucket = root.split("/")[0] diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 972b0b352b..697d0a80f6 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -2,6 +2,7 @@ import math from typing import List from itertools import repeat +from hub.core.storage.gcs import GCSProvider from hub.core.compute.provider import ComputeProvider from hub.util.bugout_reporter import hub_reporter from hub.util.compute import get_compute_provider @@ -112,6 +113,8 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) + if isinstance(ds_out.storage.next_storage, GCSProvider): + ds_out.storage.next_storage.reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush From 59379368a4461556bbfa7d4249317019cf774182 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 14:20:48 +0400 Subject: [PATCH 62/83] Modify maxfiles per proc --- .circleci/config.yml | 5 +++-- hub/core/storage/gcs.py | 4 ---- hub/core/transform/transform.py | 3 --- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 090789374c..1f45535486 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -106,8 +106,9 @@ commands: - run: name: "Install dependencies" command: | - brew install zlib libjpeg webp - sudo launchctl limit maxfiles 65536 300000 + brew install zlib libjpeg webp + sudo sysctl -w kern.maxfiles=300000 + sudo sysctl -w kern.maxfilesperproc=300000 sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 41166acd1d..4da175b881 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -219,10 +219,6 @@ def _initialize_provider(self): client = storage.Client(credentials=self.scoped_credentials.credentials) self.client_bucket = client.get_bucket(self.bucket) - def reinitialize_provider(self): - client = storage.Client(credentials=self.scoped_credentials.credentials) - self.client_bucket = client.get_bucket(self.bucket) - def _set_bucket_and_path(self): root = self.root.replace("gcp://", "").replace("gcs://", "") self.bucket = root.split("/")[0] diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 697d0a80f6..972b0b352b 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -2,7 +2,6 @@ import math from typing import List from itertools import repeat -from hub.core.storage.gcs import GCSProvider from hub.core.compute.provider import ComputeProvider from hub.util.bugout_reporter import hub_reporter from hub.util.compute import get_compute_provider @@ -113,8 +112,6 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - if isinstance(ds_out.storage.next_storage, GCSProvider): - ds_out.storage.next_storage.reinitialize_provider() self.run(data_in, ds_out, tensors, compute_provider, num_workers) ds_out.storage.autoflush = initial_autoflush From dea7758ff02673697b416d26ed16cd67a850ab5e Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 15:20:29 +0400 Subject: [PATCH 63/83] Close pools --- hub/core/transform/transform.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 972b0b352b..56ce2f7f9b 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -3,6 +3,8 @@ from typing import List from itertools import repeat from hub.core.compute.provider import ComputeProvider +from hub.core.compute.thread import ThreadProvider +from hub.core.compute.process import ProcessProvider from hub.util.bugout_reporter import hub_reporter from hub.util.compute import get_compute_provider from hub.util.remove_cache import get_base_storage, get_dataset_with_zero_size_cache @@ -139,6 +141,8 @@ def run( all_tensor_metas, all_chunk_id_encoders = zip(*metas_and_encoders) merge_all_tensor_metas(all_tensor_metas, ds_out) merge_all_chunk_id_encoders(all_chunk_id_encoders, ds_out) + if isinstance(compute, (ThreadProvider, ProcessProvider): + compute.pool.close() def compose(functions: List[TransformFunction]): From 94292de7cc2fdc4ccd2859faff7da7db5cffff66 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 15:24:00 +0400 Subject: [PATCH 64/83] Fix --- hub/core/transform/transform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 56ce2f7f9b..c3bb7a8aa3 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -141,8 +141,8 @@ def run( all_tensor_metas, all_chunk_id_encoders = zip(*metas_and_encoders) merge_all_tensor_metas(all_tensor_metas, ds_out) merge_all_chunk_id_encoders(all_chunk_id_encoders, ds_out) - if isinstance(compute, (ThreadProvider, ProcessProvider): - compute.pool.close() + if isinstance(compute, (ThreadProvider, ProcessProvider)): + compute.pool.close() def compose(functions: List[TransformFunction]): From a40fe17791c7a29ea3817228fc1f23e63963c471 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 15:38:59 +0400 Subject: [PATCH 65/83] Terminate pool --- hub/core/compute/process.py | 4 +++- hub/core/transform/transform.py | 3 --- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hub/core/compute/process.py b/hub/core/compute/process.py index a5f5916771..9b7b0f3773 100644 --- a/hub/core/compute/process.py +++ b/hub/core/compute/process.py @@ -8,4 +8,6 @@ def __init__(self, workers): self.pool = ProcessPool(nodes=workers) def map(self, func, iterable): - return self.pool.map(func, iterable) + res = self.pool.map(func, iterable) + self.pool.terminate() + return res diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index c3bb7a8aa3..328ef9422f 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -137,12 +137,9 @@ def run( store_data_slice, zip(slices, repeat(output_base_storage), repeat(tensors), repeat(self)), ) - all_tensor_metas, all_chunk_id_encoders = zip(*metas_and_encoders) merge_all_tensor_metas(all_tensor_metas, ds_out) merge_all_chunk_id_encoders(all_chunk_id_encoders, ds_out) - if isinstance(compute, (ThreadProvider, ProcessProvider)): - compute.pool.close() def compose(functions: List[TransformFunction]): From 4679216f7318fdc82b9e677f9c8e7e324e5e9076 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 16:07:13 +0400 Subject: [PATCH 66/83] Change to closing --- hub/core/compute/process.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hub/core/compute/process.py b/hub/core/compute/process.py index 9b7b0f3773..8e7212c093 100644 --- a/hub/core/compute/process.py +++ b/hub/core/compute/process.py @@ -1,5 +1,6 @@ from hub.core.compute.provider import ComputeProvider from pathos.pools import ProcessPool # type: ignore +from contextlib import closing class ProcessProvider(ComputeProvider): @@ -8,6 +9,6 @@ def __init__(self, workers): self.pool = ProcessPool(nodes=workers) def map(self, func, iterable): - res = self.pool.map(func, iterable) - self.pool.terminate() + with closing(self.pool) as p: + res = p.map(func, iterable) return res From 83e4a542ca7f8318a3f6211429a06252052902d5 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 16:55:33 +0400 Subject: [PATCH 67/83] Use with --- hub/core/compute/process.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hub/core/compute/process.py b/hub/core/compute/process.py index 8e7212c093..d342f1a330 100644 --- a/hub/core/compute/process.py +++ b/hub/core/compute/process.py @@ -1,6 +1,5 @@ from hub.core.compute.provider import ComputeProvider from pathos.pools import ProcessPool # type: ignore -from contextlib import closing class ProcessProvider(ComputeProvider): @@ -9,6 +8,5 @@ def __init__(self, workers): self.pool = ProcessPool(nodes=workers) def map(self, func, iterable): - with closing(self.pool) as p: - res = p.map(func, iterable) - return res + with self.pool as p: + return p.map(func, iterable) From 77d400a092c4c590c66820ef61b274549d758682 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 16:59:15 +0400 Subject: [PATCH 68/83] Set sharing for pytorch --- hub/integrations/pytorch/pytorch.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hub/integrations/pytorch/pytorch.py b/hub/integrations/pytorch/pytorch.py index a12819968f..21a3c993d6 100644 --- a/hub/integrations/pytorch/pytorch.py +++ b/hub/integrations/pytorch/pytorch.py @@ -15,6 +15,9 @@ pytorch_installed = True try: import torch + import torch.multiprocessing + + torch.multiprocessing.set_sharing_strategy("file_system") except ModuleNotFoundError: pytorch_installed = False From 1389c5ec4ba93b72fbdc6921277966101436f280 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 17:53:47 +0400 Subject: [PATCH 69/83] worker_init_fn in Dataloader --- hub/core/compute/process.py | 3 +-- hub/integrations/pytorch/pytorch.py | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hub/core/compute/process.py b/hub/core/compute/process.py index d342f1a330..a5f5916771 100644 --- a/hub/core/compute/process.py +++ b/hub/core/compute/process.py @@ -8,5 +8,4 @@ def __init__(self, workers): self.pool = ProcessPool(nodes=workers) def map(self, func, iterable): - with self.pool as p: - return p.map(func, iterable) + return self.pool.map(func, iterable) diff --git a/hub/integrations/pytorch/pytorch.py b/hub/integrations/pytorch/pytorch.py index 21a3c993d6..acbff4fdaf 100644 --- a/hub/integrations/pytorch/pytorch.py +++ b/hub/integrations/pytorch/pytorch.py @@ -15,13 +15,14 @@ pytorch_installed = True try: import torch - import torch.multiprocessing - - torch.multiprocessing.set_sharing_strategy("file_system") except ModuleNotFoundError: pytorch_installed = False +def set_worker_sharing_strategy(worker_id: int) -> None: + torch.multiprocessing.set_sharing_strategy("file_system") + + def dataset_to_pytorch( dataset, transform: Optional[Callable] = None, @@ -104,4 +105,5 @@ def __iter__(self): drop_last=drop_last, collate_fn=collate_fn, pin_memory=pin_memory, + worker_init_fn=set_worker_sharing_strategy, ) From f8e852f52eafdcc70e72ab9e9bfcade8e204c69c Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 20:31:02 +0400 Subject: [PATCH 70/83] Add ulimit --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1f45535486..0c5751f520 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,7 +108,8 @@ commands: command: | brew install zlib libjpeg webp sudo sysctl -w kern.maxfiles=300000 - sudo sysctl -w kern.maxfilesperproc=300000 + sudo sysctl -w kern.maxfilesperproc=300000 + sudo ulimit -n unlimited sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From dd010e442f218b5db2092f979e6f40eb3cefafe7 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 20:52:03 +0400 Subject: [PATCH 71/83] Remove ulimit --- .circleci/config.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0c5751f520..69a9ff46f1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,8 +108,7 @@ commands: command: | brew install zlib libjpeg webp sudo sysctl -w kern.maxfiles=300000 - sudo sysctl -w kern.maxfilesperproc=300000 - sudo ulimit -n unlimited + sudo sysctl -w kern.maxfilesperproc=300000 sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From 06f7ecbdafd4f462829b55e142d4f6f93f465de2 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 21:24:27 +0400 Subject: [PATCH 72/83] Change n --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 69a9ff46f1..6f54257f42 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,7 +108,8 @@ commands: command: | brew install zlib libjpeg webp sudo sysctl -w kern.maxfiles=300000 - sudo sysctl -w kern.maxfilesperproc=300000 + sudo sysctl -w kern.maxfilesperproc=300000 + ulimit -n 10000 sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From cd8a90a7e9a4159ac2feff6ae2b0a65e7de813fa Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 22:46:34 +0400 Subject: [PATCH 73/83] Comment pytorch read test --- hub/integrations/tests/test_pytorch.py | 46 +++++++++++++------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/hub/integrations/tests/test_pytorch.py b/hub/integrations/tests/test_pytorch.py index 0ca6848016..a1451b2c1e 100644 --- a/hub/integrations/tests/test_pytorch.py +++ b/hub/integrations/tests/test_pytorch.py @@ -320,29 +320,29 @@ def test_readonly(local_ds): pass -@requires_torch -def test_corrupt_dataset(local_ds, corrupt_image_paths, compressed_image_paths): - if isinstance(get_base_storage(local_ds.storage), MemoryProvider): - with pytest.raises(DatasetUnsupportedPytorch): - dl = local_ds.pytorch(num_workers=2) - return - img_good = hub.read(compressed_image_paths["jpeg"][0]) - img_bad = hub.read(corrupt_image_paths["jpeg"]) - with local_ds: - local_ds.create_tensor("image", htype="image", sample_compression="jpeg") - for i in range(3): - for i in range(10): - local_ds.image.append(img_good) - local_ds.image.append(img_bad) - num_samples = 0 - num_batches = 0 - with pytest.warns(UserWarning): - dl = local_ds.pytorch(num_workers=2, batch_size=2) - for (batch,) in dl: - num_batches += 1 - num_samples += len(batch) - assert num_samples == 30 - assert num_batches == 15 +# @requires_torch +# def test_corrupt_dataset(local_ds, corrupt_image_paths, compressed_image_paths): +# if isinstance(get_base_storage(local_ds.storage), MemoryProvider): +# with pytest.raises(DatasetUnsupportedPytorch): +# dl = local_ds.pytorch(num_workers=2) +# return +# img_good = hub.read(compressed_image_paths["jpeg"][0]) +# img_bad = hub.read(corrupt_image_paths["jpeg"]) +# with local_ds: +# local_ds.create_tensor("image", htype="image", sample_compression="jpeg") +# for i in range(3): +# for i in range(10): +# local_ds.image.append(img_good) +# local_ds.image.append(img_bad) +# num_samples = 0 +# num_batches = 0 +# with pytest.warns(UserWarning): +# dl = local_ds.pytorch(num_workers=2, batch_size=2) +# for (batch,) in dl: +# num_batches += 1 +# num_samples += len(batch) +# assert num_samples == 30 +# assert num_batches == 15 @requires_torch From 90ab53e2f0b37e05d4b504527bf1131191fde5af Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Fri, 3 Sep 2021 23:45:24 +0400 Subject: [PATCH 74/83] Use deepcopy of batches --- hub/integrations/tests/test_pytorch.py | 55 ++++++++++++++------------ 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/hub/integrations/tests/test_pytorch.py b/hub/integrations/tests/test_pytorch.py index a1451b2c1e..13da464661 100644 --- a/hub/integrations/tests/test_pytorch.py +++ b/hub/integrations/tests/test_pytorch.py @@ -2,6 +2,7 @@ import numpy as np import pickle import pytest +from copy import deepcopy from hub.util.remove_cache import get_base_storage from hub.util.exceptions import DatasetUnsupportedPytorch, TensorDoesNotExistError @@ -257,7 +258,9 @@ def test_custom_tensor_order(ds): ds, num_workers=2, tensors=["c", "d", "a"], python_version_warning=False ) for dl in [dl_new, dl_old]: - for i, batch in enumerate(dl): + for i, batch_i in enumerate(dl): + batch = deepcopy(batch_i) + del batch_i c1, d1, a1 = batch a2 = batch["a"] c2 = batch["c"] @@ -282,7 +285,9 @@ def test_custom_tensor_order(ds): np.testing.assert_array_equal(d1[0], ds.d.numpy()[i]) dls = ds.pytorch(num_workers=2, tensors=["c", "d", "a"]) - for i, batch in enumerate(dls): + for i, batch_i in enumerate(dls): + batch = deepcopy(batch_i) + del batch_i c1, d1, a1 = batch a2 = batch["a"] c2 = batch["c"] @@ -320,29 +325,29 @@ def test_readonly(local_ds): pass -# @requires_torch -# def test_corrupt_dataset(local_ds, corrupt_image_paths, compressed_image_paths): -# if isinstance(get_base_storage(local_ds.storage), MemoryProvider): -# with pytest.raises(DatasetUnsupportedPytorch): -# dl = local_ds.pytorch(num_workers=2) -# return -# img_good = hub.read(compressed_image_paths["jpeg"][0]) -# img_bad = hub.read(corrupt_image_paths["jpeg"]) -# with local_ds: -# local_ds.create_tensor("image", htype="image", sample_compression="jpeg") -# for i in range(3): -# for i in range(10): -# local_ds.image.append(img_good) -# local_ds.image.append(img_bad) -# num_samples = 0 -# num_batches = 0 -# with pytest.warns(UserWarning): -# dl = local_ds.pytorch(num_workers=2, batch_size=2) -# for (batch,) in dl: -# num_batches += 1 -# num_samples += len(batch) -# assert num_samples == 30 -# assert num_batches == 15 +@requires_torch +def test_corrupt_dataset(local_ds, corrupt_image_paths, compressed_image_paths): + if isinstance(get_base_storage(local_ds.storage), MemoryProvider): + with pytest.raises(DatasetUnsupportedPytorch): + dl = local_ds.pytorch(num_workers=2) + return + img_good = hub.read(compressed_image_paths["jpeg"][0]) + img_bad = hub.read(corrupt_image_paths["jpeg"]) + with local_ds: + local_ds.create_tensor("image", htype="image", sample_compression="jpeg") + for i in range(3): + for i in range(10): + local_ds.image.append(img_good) + local_ds.image.append(img_bad) + num_samples = 0 + num_batches = 0 + with pytest.warns(UserWarning): + dl = local_ds.pytorch(num_workers=2, batch_size=2) + for (batch,) in dl: + num_batches += 1 + num_samples += len(batch) + assert num_samples == 30 + assert num_batches == 15 @requires_torch From e71a38d12e5abb0f268557a3d6d2a72016630766 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Sat, 4 Sep 2021 00:41:16 +0400 Subject: [PATCH 75/83] Change setitem --- hub/core/storage/gcs.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index 4da175b881..f4fa3e2f7c 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -252,11 +252,15 @@ def __setitem__(self, key, value): """Store value in key""" self.check_readonly() blob = self.client_bucket.blob(self._get_path_from_key(key)) - if isinstance(value, memoryview) and ( - value.strides == (1,) and value.shape == (len(value.obj),) - ): - value = value.obj - value = bytes(value) + if isinstance(value, memoryview): + value = value.tobytes() + elif isinstance(value, bytearray): + value = bytes(value) + # if isinstance(value, memoryview) and ( + # value.strides == (1,) and value.shape == (len(value.obj),) + # ): + # value = value.obj + # value = bytes(value) blob.upload_from_string(value, retry=self.retry) def __iter__(self): From 5515f525bec542f79bbf893d398d49365eb18396 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 9 Sep 2021 22:08:25 +0400 Subject: [PATCH 76/83] Remove del --- hub/integrations/tests/test_pytorch.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/hub/integrations/tests/test_pytorch.py b/hub/integrations/tests/test_pytorch.py index 13da464661..0ca6848016 100644 --- a/hub/integrations/tests/test_pytorch.py +++ b/hub/integrations/tests/test_pytorch.py @@ -2,7 +2,6 @@ import numpy as np import pickle import pytest -from copy import deepcopy from hub.util.remove_cache import get_base_storage from hub.util.exceptions import DatasetUnsupportedPytorch, TensorDoesNotExistError @@ -258,9 +257,7 @@ def test_custom_tensor_order(ds): ds, num_workers=2, tensors=["c", "d", "a"], python_version_warning=False ) for dl in [dl_new, dl_old]: - for i, batch_i in enumerate(dl): - batch = deepcopy(batch_i) - del batch_i + for i, batch in enumerate(dl): c1, d1, a1 = batch a2 = batch["a"] c2 = batch["c"] @@ -285,9 +282,7 @@ def test_custom_tensor_order(ds): np.testing.assert_array_equal(d1[0], ds.d.numpy()[i]) dls = ds.pytorch(num_workers=2, tensors=["c", "d", "a"]) - for i, batch_i in enumerate(dls): - batch = deepcopy(batch_i) - del batch_i + for i, batch in enumerate(dls): c1, d1, a1 = batch a2 = batch["a"] c2 = batch["c"] From 094e13118e286b075edc0a2b4ec41e259841e99e Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Thu, 9 Sep 2021 22:14:37 +0400 Subject: [PATCH 77/83] Unset limits --- .circleci/config.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6f54257f42..7722eb020c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -106,10 +106,7 @@ commands: - run: name: "Install dependencies" command: | - brew install zlib libjpeg webp - sudo sysctl -w kern.maxfiles=300000 - sudo sysctl -w kern.maxfilesperproc=300000 - ulimit -n 10000 + brew install zlib libjpeg webp sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ info: steps: From ae886b24971517b13865e829d9e12614ba83b149 Mon Sep 17 00:00:00 2001 From: AbhinavTuli Date: Sun, 12 Sep 2021 19:20:46 +0530 Subject: [PATCH 78/83] fix mac test --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7722eb020c..38ce9ae0c4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,6 +108,7 @@ commands: command: | brew install zlib libjpeg webp sudo ln -s /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include/* /usr/local/include/ + sudo launchctl limit maxfiles 65536 200000 info: steps: - run: From 7b8585a7304c561019c1ac25caec36104972faac Mon Sep 17 00:00:00 2001 From: AbhinavTuli Date: Sun, 12 Sep 2021 22:28:05 +0530 Subject: [PATCH 79/83] add back parallelism --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 38ce9ae0c4..adf148dd8e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -190,6 +190,7 @@ commands: export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json python3 -m pytest --cov-report=xml --cov=./ --local --s3 --gcs --hub-cloud --kaggle --ignore-glob=buH/* no_output_timeout: 30m + parallelism: 10 run-backwards-compatibility-tests: steps: From 3dd6815d9858394c245022fd16b4fee1e9ae384c Mon Sep 17 00:00:00 2001 From: AbhinavTuli Date: Mon, 13 Sep 2021 17:31:39 +0530 Subject: [PATCH 80/83] fix mac tests by closing --- hub/core/compute/process.py | 5 +++++ hub/core/compute/provider.py | 4 ++++ hub/core/compute/serial.py | 3 +++ hub/core/compute/thread.py | 5 +++++ hub/core/transform/transform.py | 9 ++++++++- 5 files changed, 25 insertions(+), 1 deletion(-) diff --git a/hub/core/compute/process.py b/hub/core/compute/process.py index a5f5916771..3fe9771c71 100644 --- a/hub/core/compute/process.py +++ b/hub/core/compute/process.py @@ -9,3 +9,8 @@ def __init__(self, workers): def map(self, func, iterable): return self.pool.map(func, iterable) + + def close(self): + self.pool.close() + self.pool.join() + self.pool.clear() diff --git a/hub/core/compute/provider.py b/hub/core/compute/provider.py index d5c1dc05bc..bd6bf90d45 100644 --- a/hub/core/compute/provider.py +++ b/hub/core/compute/provider.py @@ -13,3 +13,7 @@ def map(self, func, iterable): """Apply 'func' to each element in 'iterable', collecting the results in a list that is returned. """ + + @abstractmethod + def close(self): + """Closes the provider.""" diff --git a/hub/core/compute/serial.py b/hub/core/compute/serial.py index 0fc1a81765..51bb28bbfe 100644 --- a/hub/core/compute/serial.py +++ b/hub/core/compute/serial.py @@ -7,3 +7,6 @@ def __init__(self): def map(self, func, iterable): return map(func, iterable) + + def close(self): + return diff --git a/hub/core/compute/thread.py b/hub/core/compute/thread.py index d0983afca2..9e10e1a6de 100644 --- a/hub/core/compute/thread.py +++ b/hub/core/compute/thread.py @@ -9,3 +9,8 @@ def __init__(self, workers): def map(self, func, iterable): return self.pool.map(func, iterable) + + def close(self): + self.pool.close() + self.pool.join() + self.pool.clear() diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 328ef9422f..47b92b2e90 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -114,8 +114,15 @@ def eval( ds_out[tensor].chunk_engine.chunk_id_encoder compute_provider = get_compute_provider(scheduler, num_workers) - self.run(data_in, ds_out, tensors, compute_provider, num_workers) + + try: + self.run(data_in, ds_out, tensors, compute_provider, num_workers) + except Exception: + compute_provider.close() + raise + ds_out.storage.autoflush = initial_autoflush + compute_provider.close() def run( self, From a92c777cf821c7bf309006b9e4d25bd69cf6d69f Mon Sep 17 00:00:00 2001 From: AbhinavTuli Date: Mon, 13 Sep 2021 17:45:57 +0530 Subject: [PATCH 81/83] lint fix --- hub/core/transform/transform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hub/core/transform/transform.py b/hub/core/transform/transform.py index 47b92b2e90..42aac75dfe 100644 --- a/hub/core/transform/transform.py +++ b/hub/core/transform/transform.py @@ -87,6 +87,7 @@ def eval( InvalidOutputDatasetError: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this. TensorMismatchError: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform. UnsupportedSchedulerError: If the scheduler passed is not recognized. Supported values include: "serial", 'threaded' and 'processed'. + Exception: If any other exception is raised during the transform. """ num_workers = max(num_workers, 0) if num_workers == 0: From d793a48a2c4d79de4078df259a3ed72239c7f168 Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 13 Sep 2021 17:19:27 +0400 Subject: [PATCH 82/83] Add windows path --- hub/core/storage/gcs.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index f4fa3e2f7c..c989c54b00 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -3,6 +3,7 @@ import json import os import tempfile +import platform from typing import Dict, Union from google.cloud import storage # type: ignore @@ -123,12 +124,15 @@ def _connect_browser(self): GCSDefaultCredsNotFoundError: if application deafault credentials can't be found. """ try: - with open( - posixpath.expanduser( + if platform.system() == "Windows": + path = os.path.join( + os.getenv("APPDATA"), "gcloud/application_default_credentials.json" + ) + else: + path = posixpath.expanduser( "~/.config/gcloud/application_default_credentials.json" - ), - "r", - ) as f: + ) + with open(path, "r") as f: default_config = json.load(f) except: raise GCSDefaultCredsNotFoundError() From 1f92d2a9b39ff9a1c0547e4426c67b05cb45ae5b Mon Sep 17 00:00:00 2001 From: kristinagrig06 Date: Mon, 13 Sep 2021 17:45:01 +0400 Subject: [PATCH 83/83] Change platform to os.name --- hub/core/storage/gcs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hub/core/storage/gcs.py b/hub/core/storage/gcs.py index c989c54b00..4942d65bcc 100644 --- a/hub/core/storage/gcs.py +++ b/hub/core/storage/gcs.py @@ -3,7 +3,6 @@ import json import os import tempfile -import platform from typing import Dict, Union from google.cloud import storage # type: ignore @@ -124,7 +123,7 @@ def _connect_browser(self): GCSDefaultCredsNotFoundError: if application deafault credentials can't be found. """ try: - if platform.system() == "Windows": + if os.name == "nt": path = os.path.join( os.getenv("APPDATA"), "gcloud/application_default_credentials.json" )