From f319967fd1ab3131d1e71cbd6fe65bb07980f42a Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Tue, 5 Apr 2022 13:49:43 +0300 Subject: [PATCH 01/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (WIP) --- pipe-cli/src/model/data_storage_wrapper.py | 3 ++ .../src/utilities/datastorage_operations.py | 29 +++++++++++++-- pipe-cli/src/utilities/storage/azure.py | 3 ++ pipe-cli/src/utilities/storage/common.py | 4 ++ pipe-cli/src/utilities/storage/gs.py | 3 ++ pipe-cli/src/utilities/storage/s3.py | 37 +++++++++++++++++++ 6 files changed, 76 insertions(+), 3 deletions(-) diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 1c2c99328e..25e4c10e2d 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -233,6 +233,9 @@ def exists(self): def get_items(self): return self.get_list_manager().get_items(self.path) + def get_paging_items(self, next_token, page_size): + return self.get_list_manager().get_paging_items(self.path, next_token, page_size) + def is_empty(self, relative=None): if not self.exists(): return True diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 0ef0f560b3..cb0dd973b0 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -36,6 +36,7 @@ from src.utilities.storage.umount import Umount FOLDER_MARKER = '.DS_Store' +BATCH_SIZE = 2 class DataStorageOperations(object): @@ -95,8 +96,26 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags command = 'mv' if clean else 'cp' permission_to_check = os.R_OK if command == 'cp' else os.W_OK manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, command) - items = files_to_copy if file_list else source_wrapper.get_items() - items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check, + + if verify_destination or file_list or not cls._is_cloud_source(source_wrapper.get_type()): + items = files_to_copy if file_list else source_wrapper.get_items() + cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude, + force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads) + else: + items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE) + while True: + cls._transfer(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include, + exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, + io_threads) + if not next_token: + return + items_batch, next_token = source_wrapper.get_paging_items(next_token=next_token, page_size=BATCH_SIZE) + + @classmethod + def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, permission_to_check, + include, exclude, force, quiet, skip_existing, verify_destination, threads, + clean, tags, io_threads): + items = cls._filter_items(items_part, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination) sorted_items = list() transfer_results = [] @@ -122,6 +141,10 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags cls._flush_transfer_results(source_wrapper, destination_wrapper, transfer_results, clean=clean, flush_size=1) + @classmethod + def _is_cloud_source(cls, source_wrapper_type): + return source_wrapper_type in [WrapperType.S3, WrapperType.AZURE, WrapperType.GS] + @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination): @@ -140,7 +163,7 @@ def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, perm continue if source_wrapper.is_file() and not source_wrapper.path == full_path: continue - if not include and not exclude: + if not include and not exclude and not skip_existing and not verify_destination: if not source_wrapper.is_file(): possible_folder_name = source_wrapper.path_with_trailing_separator() # if operation from source root diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 798de93dd3..0f0274a7e1 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -88,6 +88,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera absolute_items = [self._to_storage_item(blob) for blob in blobs_generator] return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items] + def get_paging_items(self, relative_path, next_token, page_size): + return self.get_items(relative_path), None + def get_summary(self, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index f7287bf8ea..0144364404 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -329,6 +329,10 @@ def get_items(self, relative_path): item_relative_path = StorageOperations.get_item_name(item.name, prefix + StorageOperations.PATH_SEPARATOR) yield ('File', item.name, item_relative_path, item.size) + @abstractmethod + def get_paging_items(self, relative_path, next_token, page_size): + pass + def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + delimiter for item in self.list_items(prefix, show_all=True): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index 40b3c6d172..ed17eedb82 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -437,6 +437,9 @@ def get_summary(self, relative_path=None): count += 1 return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size] + def get_paging_items(self, relative_path, next_token, page_size): + return self.get_items(relative_path), None + def get_summary_with_depth(self, max_depth, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) bucket = self.client.bucket(self.bucket.path) diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index d6951706e0..eb14a38cd0 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -789,6 +789,10 @@ def get_folder_object(self, name): def get_items(self, relative_path): return S3BucketOperations.get_items(self.bucket, session=self.session) + def get_paging_items(self, relative_path, next_token, page_size): + return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token, + session=self.session) + def get_file_tags(self, relative_path): return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager( self.session, self.bucket, self.region_name), relative_path) @@ -932,6 +936,39 @@ def get_items(cls, storage_wrapper, session=None): continue yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']) + @classmethod + def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None): + if session is None: + session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') + + delimiter = S3BucketOperations.S3_PATH_SEPARATOR + client = cls._get_client(session, storage_wrapper.bucket.region) + paginator = client.get_paginator('list_objects_v2') + prefix = cls.get_prefix(delimiter, storage_wrapper.path) + operation_parameters = { + 'Bucket': storage_wrapper.bucket.path, + 'Prefix': prefix, + 'PaginationConfig': + { + 'PageSize': page_size, + 'MaxKeys': page_size + } + } + + if next_token: + operation_parameters['ContinuationToken'] = next_token + + page_iterator = paginator.paginate(**operation_parameters) + res = [] + for page in page_iterator: + if 'Contents' in page: + for file in page['Contents']: + name = cls.get_item_name(file['Key'], prefix=prefix) + if name.endswith(delimiter): + continue + res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])) + return res, page.get('NextContinuationToken', None) if page else None + @classmethod def path_exists(cls, storage_wrapper, relative_path, session=None): delimiter = S3BucketOperations.S3_PATH_SEPARATOR From e8aad3ced8ca3e2219acf3181d57e74263aea76a Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Wed, 6 Apr 2022 19:40:16 +0300 Subject: [PATCH 02/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy - s3 ls (WIP) --- .../src/utilities/datastorage_operations.py | 73 ++++++++++++++----- pipe-cli/src/utilities/storage/s3.py | 46 +++++++++++- 2 files changed, 97 insertions(+), 22 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index cb0dd973b0..fcecd78358 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -36,7 +36,8 @@ from src.utilities.storage.umount import Umount FOLDER_MARKER = '.DS_Store' -BATCH_SIZE = 2 +DEFAULT_BATCH_SIZE = 2 +BATCH_SIZE = os.getenv('CP_CLI_STORAGE_LIST', DEFAULT_BATCH_SIZE) class DataStorageOperations(object): @@ -97,7 +98,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags permission_to_check = os.R_OK if command == 'cp' else os.W_OK manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, command) - if verify_destination or file_list or not cls._is_cloud_source(source_wrapper.get_type()): + if verify_destination or file_list or not source_wrapper.get_type() == WrapperType.S3: items = files_to_copy if file_list else source_wrapper.get_items() cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads) @@ -234,7 +235,8 @@ def storage_remove_item(cls, path, yes, version, hard_delete, recursive, exclude manager = source_wrapper.get_delete_manager(versioning=version or hard_delete) manager.delete_items(source_wrapper.path, version=version, hard_delete=hard_delete, - exclude=exclude, include=include, recursive=recursive and not source_wrapper.is_file()) + exclude=exclude, include=include, recursive=recursive and not source_wrapper.is_file(), + page_size=BATCH_SIZE) click.echo(' done.') @classmethod @@ -435,13 +437,19 @@ def create_table(cls, tags): @classmethod def __print_data_storage_contents(cls, bucket_model, relative_path, show_details, recursive, page_size=None, show_versions=False, show_all=False): - items = [] - header = None + next_page_token = None + manager = None + paging_allowed = recursive and not page_size and not show_versions and bucket_model is not None \ + and bucket_model.type == WrapperType.S3 if bucket_model is not None: wrapper = DataStorageWrapper.get_cloud_wrapper_for_bucket(bucket_model, relative_path) manager = wrapper.get_list_manager(show_versions=show_versions) - items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all) + if paging_allowed: + items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive, + page_size=BATCH_SIZE, next_token=None) + else: + items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all) else: hidden_object_manager = HiddenObjectManager() # If no argument is specified - list brief details of all buckets @@ -451,19 +459,31 @@ def __print_data_storage_contents(cls, bucket_model, relative_path, show_details click.echo("No datastorages available.") sys.exit(0) - if recursive and header is not None: - click.echo(header) + items_table = cls.__init_items_table(show_versions) + cls.__print_items(bucket_model, items, show_details, items_table, show_versions) + + if not next_page_token: + click.echo() + return + + cls.__print_paging_storage_contents(manager, bucket_model, items_table, relative_path, + recursive, show_details, next_page_token, show_versions) + @classmethod + def __print_paging_storage_contents(cls, manager, bucket_model, items_table, relative_path, + recursive, show_details, next_page_token, show_versions): + items_table.header = False + while True: + items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive, + page_size=BATCH_SIZE, next_token=next_page_token) + cls.__print_items(bucket_model, items, show_details, items_table, show_versions) + if not next_page_token: + click.echo() + return + + @classmethod + def __print_items(cls, bucket_model, items, show_details, items_table, show_versions=False): if show_details: - items_table = prettytable.PrettyTable() - fields = ["Type", "Labels", "Modified", "Size", "Name"] - if show_versions: - fields.append("Version") - items_table.field_names = fields - items_table.align = "l" - items_table.border = False - items_table.padding_width = 2 - items_table.align['Size'] = 'r' for item in items: name = item.name changed = '' @@ -498,15 +518,28 @@ def __print_data_storage_contents(cls, bucket_model, relative_path, show_details version_label = "{} (latest)".format(version.version) if version.latest else version.version labels = ', '.join(map(lambda i: i.value, version.labels)) size = '' if version.size is None else version.size - row = [version_type, labels, version.changed.strftime('%Y-%m-%d %H:%M:%S'), size, name, version_label] + row = [version_type, labels, version.changed.strftime('%Y-%m-%d %H:%M:%S'), size, name, + version_label] items_table.add_row(row) click.echo(items_table) - click.echo() + items_table.clear_rows() else: for item in items: click.echo('{}\t\t'.format(item.path), nl=False) - click.echo() + + @classmethod + def __init_items_table(cls, show_versions): + items_table = prettytable.PrettyTable() + fields = ["Type", "Labels", "Modified", "Size", "Name"] + if show_versions: + fields.append("Version") + items_table.field_names = fields + items_table.align = "l" + items_table.border = False + items_table.padding_width = 2 + items_table.align['Size'] = 'r' + return items_table @classmethod def __get_file_to_copy(cls, file_path, source_path): diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index eb14a38cd0..9272f97fb0 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -44,6 +44,8 @@ AbstractRestoreManager, AbstractTransferManager, TransferResult, UploadResult from src.config import Config +S3_DEFAULT_BATCH_SIZE = 1000 + class UploadedObjectsContainer: @@ -517,7 +519,8 @@ def __init__(self, bucket, session, region_name=None): super(DeleteManager, self).__init__(session, region_name=region_name) self.bucket = bucket - def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): + def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, + page_size=S3_DEFAULT_BATCH_SIZE): client = self._get_client() delimiter = S3BucketOperations.S3_PATH_SEPARATOR bucket = self.bucket.bucket.path @@ -552,7 +555,10 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v else: operation_parameters = { 'Bucket': bucket, - 'Prefix': prefix + 'Prefix': prefix, + 'PaginationConfig': { + 'PageSize': page_size + } } if hard_delete: paginator = client.get_paginator('list_object_versions') @@ -625,6 +631,22 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera else: return self.list_objects(client, prefix, operation_parameters, recursive, page_size) + def list_paging_items(self, relative_path=None, recursive=False, page_size=S3_DEFAULT_BATCH_SIZE, + next_token=None): + delimiter = S3BucketOperations.S3_PATH_SEPARATOR + client = self._get_client() + operation_parameters = { + 'Bucket': self.bucket.bucket.path, + 'PaginationConfig': { + 'PageSize': page_size + } + } + prefix = S3BucketOperations.get_prefix(delimiter, relative_path) + if relative_path: + operation_parameters['Prefix'] = prefix + + return self.list_paging_objects(client, prefix, operation_parameters, recursive, next_token) + def get_summary_with_depth(self, max_depth, relative_path=None): bucket_name = self.bucket.bucket.path delimiter = S3BucketOperations.S3_PATH_SEPARATOR @@ -757,6 +779,26 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz break return items + def list_paging_objects(self, client, prefix, operation_parameters, recursive, next_token): + if next_token: + operation_parameters['ContinuationToken'] = next_token + + paginator = client.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(**operation_parameters) + items = [] + + for page in page_iterator: + if 'CommonPrefixes' in page: + for folder in page['CommonPrefixes']: + name = S3BucketOperations.get_item_name(folder['Prefix'], prefix=prefix) + items.append(self.get_folder_object(name)) + if 'Contents' in page: + for file in page['Contents']: + name = self.get_file_name(file, prefix, recursive) + item = self.get_file_object(file, name) + items.append(item) + return items, page.get('NextContinuationToken', None) if page else None + def get_file_object(self, file, name, version=False, storage_class=True): item = DataStorageItemModel() item.type = 'File' From 2255ef46c493f0788017a4a6a0e93df6a74d3e42 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Mon, 11 Apr 2022 17:51:52 +0300 Subject: [PATCH 03/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy - async batch collection for cp/mv operation --- pipe-cli/src/model/data_storage_wrapper.py | 4 +- .../src/utilities/datastorage_operations.py | 37 +++++++++++++++---- pipe-cli/src/utilities/storage/azure.py | 2 +- pipe-cli/src/utilities/storage/common.py | 2 +- pipe-cli/src/utilities/storage/gs.py | 2 +- pipe-cli/src/utilities/storage/s3.py | 11 ++++-- 6 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 25e4c10e2d..778f488541 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -233,8 +233,8 @@ def exists(self): def get_items(self): return self.get_list_manager().get_items(self.path) - def get_paging_items(self, next_token, page_size): - return self.get_list_manager().get_paging_items(self.path, next_token, page_size) + def get_paging_items(self, next_token, page_size, results=None): + return self.get_list_manager().get_paging_items(self.path, next_token, page_size, results=results) def is_empty(self, relative=None): if not self.exists(): diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index fcecd78358..32aef11a66 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -36,7 +36,7 @@ from src.utilities.storage.umount import Umount FOLDER_MARKER = '.DS_Store' -DEFAULT_BATCH_SIZE = 2 +DEFAULT_BATCH_SIZE = 1000 BATCH_SIZE = os.getenv('CP_CLI_STORAGE_LIST', DEFAULT_BATCH_SIZE) @@ -104,13 +104,34 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads) else: items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE) - while True: - cls._transfer(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include, - exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, - io_threads) - if not next_token: - return - items_batch, next_token = source_wrapper.get_paging_items(next_token=next_token, page_size=BATCH_SIZE) + cls._transfer_batch_items(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, + include, exclude, force, quiet, skip_existing, verify_destination, threads, + clean, tags, io_threads, next_token) + + @classmethod + def _transfer_batch_items(cls, source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, + include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, + io_threads, next_token): + while True: + workers = [] + transfer_process = multiprocessing.Process(target=cls._transfer, + args=(source_wrapper, destination_wrapper, items_batch, + manager, permission_to_check, include, + exclude, force, quiet, skip_existing, + verify_destination, threads, clean, tags, io_threads)) + workers.append(transfer_process) + if not next_token: + transfer_process.start() + cls._handle_keyboard_interrupt(workers) + return + listing_results = multiprocessing.Queue() + listing_process = multiprocessing.Process(target=source_wrapper.get_paging_items, + args=(next_token, BATCH_SIZE, listing_results)) + workers.append(listing_process) + transfer_process.start() + listing_process.start() + cls._handle_keyboard_interrupt(workers) + items_batch, next_token = listing_results.get() @classmethod def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, permission_to_check, diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 0f0274a7e1..96dc0401b0 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -88,7 +88,7 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera absolute_items = [self._to_storage_item(blob) for blob in blobs_generator] return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items] - def get_paging_items(self, relative_path, next_token, page_size): + def get_paging_items(self, relative_path, next_token, page_size, results=None): return self.get_items(relative_path), None def get_summary(self, relative_path=None): diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 0144364404..70b4d49e44 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -330,7 +330,7 @@ def get_items(self, relative_path): yield ('File', item.name, item_relative_path, item.size) @abstractmethod - def get_paging_items(self, relative_path, next_token, page_size): + def get_paging_items(self, relative_path, next_token, page_size, results=None): pass def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index ed17eedb82..3a2359137d 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -437,7 +437,7 @@ def get_summary(self, relative_path=None): count += 1 return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size] - def get_paging_items(self, relative_path, next_token, page_size): + def get_paging_items(self, relative_path, next_token, page_size, results=None): return self.get_items(relative_path), None def get_summary_with_depth(self, max_depth, relative_path=None): diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index 9272f97fb0..d6f7a29ff3 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -831,9 +831,9 @@ def get_folder_object(self, name): def get_items(self, relative_path): return S3BucketOperations.get_items(self.bucket, session=self.session) - def get_paging_items(self, relative_path, next_token, page_size): + def get_paging_items(self, relative_path, next_token, page_size, results=None): return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token, - session=self.session) + session=self.session, results=results) def get_file_tags(self, relative_path): return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager( @@ -979,7 +979,7 @@ def get_items(cls, storage_wrapper, session=None): yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']) @classmethod - def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None): + def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None, results=None): if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') @@ -1009,7 +1009,10 @@ def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=N if name.endswith(delimiter): continue res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])) - return res, page.get('NextContinuationToken', None) if page else None + next_page_token = page.get('NextContinuationToken', None) if page else None + if results is not None: + results.put((res, next_page_token)) + return res, next_page_token @classmethod def path_exists(cls, storage_wrapper, relative_path, session=None): From 54a9ede33d4937a22e06f31acfd1e390667ca98f Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Mon, 11 Apr 2022 19:12:54 +0300 Subject: [PATCH 04/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy - cleanups --- pipe-cli/src/utilities/datastorage_operations.py | 4 ---- pipe-cli/src/utilities/storage/azure.py | 3 ++- pipe-cli/src/utilities/storage/common.py | 4 +++- pipe-cli/src/utilities/storage/gs.py | 3 ++- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 32aef11a66..f488dade73 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -163,10 +163,6 @@ def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, per cls._flush_transfer_results(source_wrapper, destination_wrapper, transfer_results, clean=clean, flush_size=1) - @classmethod - def _is_cloud_source(cls, source_wrapper_type): - return source_wrapper_type in [WrapperType.S3, WrapperType.AZURE, WrapperType.GS] - @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination): diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 96dc0401b0..69b47c46a0 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -149,7 +149,8 @@ def __init__(self, blob_service, bucket): self.delimiter = StorageOperations.PATH_SEPARATOR self.listing_manager = AzureListingManager(self.service, self.bucket) - def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): + def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, + page_size=None): if version or hard_delete: raise RuntimeError('Versioning is not supported by AZURE cloud provider') prefix = StorageOperations.get_prefix(relative_path) diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 70b4d49e44..29cd195732 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -356,7 +356,8 @@ class AbstractDeleteManager: __metaclass__ = ABCMeta @abstractmethod - def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): + def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, + page_size=None): """ Deletes all items under the given path. @@ -366,6 +367,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v :param include: Include item pattern. :param version: Version to be deleted. :param hard_delete: Specifies if all item versions have to be deleted. + :param page_size: Specifies page size for listing objects. """ pass diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index 3a2359137d..15d8a07cc3 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -520,7 +520,8 @@ def __init__(self, client, bucket): self.bucket = bucket self.delimiter = StorageOperations.PATH_SEPARATOR - def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): + def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, + page_size=None): if recursive and version: raise RuntimeError('Recursive folder deletion with specified version is not available ' 'for GCP cloud provider.') From 5a2ed7c5636ed5e2167dfe2369d633c40ad824f0 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Tue, 12 Apr 2022 14:13:50 +0300 Subject: [PATCH 05/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy - refactor cp/mv paging --- pipe-cli/src/model/data_storage_wrapper.py | 4 +- .../src/utilities/datastorage_operations.py | 14 ++--- pipe-cli/src/utilities/storage/azure.py | 2 +- pipe-cli/src/utilities/storage/common.py | 2 +- pipe-cli/src/utilities/storage/gs.py | 2 +- pipe-cli/src/utilities/storage/s3.py | 56 +++++++------------ 6 files changed, 32 insertions(+), 48 deletions(-) diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 778f488541..25e4c10e2d 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -233,8 +233,8 @@ def exists(self): def get_items(self): return self.get_list_manager().get_items(self.path) - def get_paging_items(self, next_token, page_size, results=None): - return self.get_list_manager().get_paging_items(self.path, next_token, page_size, results=results) + def get_paging_items(self, next_token, page_size): + return self.get_list_manager().get_paging_items(self.path, next_token, page_size) def is_empty(self, relative=None): if not self.exists(): diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 553927b6f5..f0363a1b06 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -115,24 +115,24 @@ def _transfer_batch_items(cls, source_wrapper, destination_wrapper, items_batch, include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads, next_token): while True: - workers = [] transfer_process = multiprocessing.Process(target=cls._transfer, args=(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads)) - workers.append(transfer_process) if not next_token: transfer_process.start() - cls._handle_keyboard_interrupt(workers) + cls._handle_keyboard_interrupt([transfer_process]) return listing_results = multiprocessing.Queue() - listing_process = multiprocessing.Process(target=source_wrapper.get_paging_items, - args=(next_token, BATCH_SIZE, listing_results)) - workers.append(listing_process) + + def get_paging_items(): + listing_results.put(source_wrapper.get_paging_items(next_token, BATCH_SIZE)) + + listing_process = multiprocessing.Process(target=get_paging_items) transfer_process.start() listing_process.start() - cls._handle_keyboard_interrupt(workers) + cls._handle_keyboard_interrupt([transfer_process, listing_process]) items_batch, next_token = listing_results.get() @classmethod diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 69b47c46a0..84e90afdd4 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -88,7 +88,7 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera absolute_items = [self._to_storage_item(blob) for blob in blobs_generator] return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items] - def get_paging_items(self, relative_path, next_token, page_size, results=None): + def get_paging_items(self, relative_path, next_token, page_size): return self.get_items(relative_path), None def get_summary(self, relative_path=None): diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 29cd195732..555d075e83 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -330,7 +330,7 @@ def get_items(self, relative_path): yield ('File', item.name, item_relative_path, item.size) @abstractmethod - def get_paging_items(self, relative_path, next_token, page_size, results=None): + def get_paging_items(self, relative_path, next_token, page_size): pass def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index 15d8a07cc3..72766ad6ae 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -437,7 +437,7 @@ def get_summary(self, relative_path=None): count += 1 return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size] - def get_paging_items(self, relative_path, next_token, page_size, results=None): + def get_paging_items(self, relative_path, next_token, page_size): return self.get_items(relative_path), None def get_summary_with_depth(self, max_depth, relative_path=None): diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index d6f7a29ff3..59227bed31 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -831,9 +831,9 @@ def get_folder_object(self, name): def get_items(self, relative_path): return S3BucketOperations.get_items(self.bucket, session=self.session) - def get_paging_items(self, relative_path, next_token, page_size, results=None): - return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token, - session=self.session, results=results) + def get_paging_items(self, relative_path, start_token, page_size): + return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, session=self.session, + start_token=start_token) def get_file_tags(self, relative_path): return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager( @@ -957,6 +957,11 @@ def get_item_name(cls, param, prefix=None): @classmethod def get_items(cls, storage_wrapper, session=None): + results, _ = cls.get_paging_items(storage_wrapper, session=session) + return results + + @classmethod + def get_paging_items(cls, storage_wrapper, page_size=None, session=None, start_token=None): if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') @@ -969,50 +974,29 @@ def get_items(cls, storage_wrapper, session=None): prefix = cls.get_prefix(delimiter, storage_wrapper.path) operation_parameters['Prefix'] = prefix - page_iterator = paginator.paginate(**operation_parameters) - for page in page_iterator: - if 'Contents' in page: - for file in page['Contents']: - name = cls.get_item_name(file['Key'], prefix=prefix) - if name.endswith(delimiter): - continue - yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']) - @classmethod - def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None, results=None): - if session is None: - session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') - - delimiter = S3BucketOperations.S3_PATH_SEPARATOR - client = cls._get_client(session, storage_wrapper.bucket.region) - paginator = client.get_paginator('list_objects_v2') - prefix = cls.get_prefix(delimiter, storage_wrapper.path) - operation_parameters = { - 'Bucket': storage_wrapper.bucket.path, - 'Prefix': prefix, - 'PaginationConfig': - { - 'PageSize': page_size, - 'MaxKeys': page_size - } - } + if page_size: + operation_parameters['PaginationConfig'] = { + 'PageSize': page_size, + 'MaxKeys': page_size + } - if next_token: - operation_parameters['ContinuationToken'] = next_token + if start_token: + operation_parameters['ContinuationToken'] = start_token page_iterator = paginator.paginate(**operation_parameters) - res = [] + results = [] for page in page_iterator: if 'Contents' in page: for file in page['Contents']: name = cls.get_item_name(file['Key'], prefix=prefix) if name.endswith(delimiter): continue - res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])) + results.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])) next_page_token = page.get('NextContinuationToken', None) if page else None - if results is not None: - results.put((res, next_page_token)) - return res, next_page_token + if page_size: + return results, next_page_token + return results, None @classmethod def path_exists(cls, storage_wrapper, relative_path, session=None): From c6562e090d4c8473430913ef3ae05499b587f180 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Tue, 12 Apr 2022 17:37:19 +0300 Subject: [PATCH 06/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy - refactor cp/mv paging --- pipe-cli/src/model/data_storage_wrapper.py | 4 ++-- pipe-cli/src/utilities/datastorage_operations.py | 6 +++--- pipe-cli/src/utilities/storage/azure.py | 6 +++++- pipe-cli/src/utilities/storage/common.py | 15 ++++++++++++++- pipe-cli/src/utilities/storage/gs.py | 4 ++++ pipe-cli/src/utilities/storage/s3.py | 16 +++++++--------- 6 files changed, 35 insertions(+), 16 deletions(-) diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 25e4c10e2d..a3bbee62a5 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -233,8 +233,8 @@ def exists(self): def get_items(self): return self.get_list_manager().get_items(self.path) - def get_paging_items(self, next_token, page_size): - return self.get_list_manager().get_paging_items(self.path, next_token, page_size) + def get_paging_items(self, start_token, page_size): + return self.get_list_manager().get_paging_items(self.path, start_token=start_token, page_size=page_size) def is_empty(self, relative=None): if not self.exists(): diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index f0363a1b06..75721d4fcf 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -105,7 +105,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads) else: - items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE) + items_batch, next_token = source_wrapper.get_paging_items(start_token=None, page_size=BATCH_SIZE) cls._transfer_batch_items(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads, next_token) @@ -472,7 +472,7 @@ def __print_data_storage_contents(cls, bucket_model, relative_path, show_details manager = wrapper.get_list_manager(show_versions=show_versions) if paging_allowed: items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive, - page_size=BATCH_SIZE, next_token=None) + page_size=BATCH_SIZE, start_token=None) else: items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all) else: @@ -500,7 +500,7 @@ def __print_paging_storage_contents(cls, manager, bucket_model, items_table, rel items_table.header = False while True: items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive, - page_size=BATCH_SIZE, next_token=next_page_token) + page_size=BATCH_SIZE, start_token=next_page_token) cls.__print_items(bucket_model, items, show_details, items_table, False, show_versions) if not next_page_token: click.echo() diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 84e90afdd4..5b17933967 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -88,7 +88,11 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera absolute_items = [self._to_storage_item(blob) for blob in blobs_generator] return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items] - def get_paging_items(self, relative_path, next_token, page_size): + def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, + start_token=None): + return self.list_items(relative_path, recursive, page_size, show_all=False), None + + def get_paging_items(self, relative_path, start_token, page_size): return self.get_items(relative_path), None def get_summary(self, relative_path=None): diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 555d075e83..ddafef0aa9 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -290,6 +290,19 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera """ pass + @abstractmethod + def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, + start_token=None): + """ + Lists files and folders on the specified page by a relative path in the current storage. + + :param relative_path: Storage relative path to be listed. + :param recursive: Specifies if the listing has to be recursive. + :param page_size: Max number of items on the page. + :param start_token: Paging continuation token. + """ + pass + @abstractmethod def get_summary_with_depth(self, max_depth, relative_path=None): """ @@ -330,7 +343,7 @@ def get_items(self, relative_path): yield ('File', item.name, item_relative_path, item.size) @abstractmethod - def get_paging_items(self, relative_path, next_token, page_size): + def get_paging_items(self, relative_path, start_token, page_size): pass def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index 72766ad6ae..3ceae4e07e 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -437,6 +437,10 @@ def get_summary(self, relative_path=None): count += 1 return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size] + def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, + start_token=None): + return self.list_items(relative_path, recursive, page_size, show_all=False), None + def get_paging_items(self, relative_path, next_token, page_size): return self.get_items(relative_path), None diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index 59227bed31..80dc7062c6 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -44,8 +44,6 @@ AbstractRestoreManager, AbstractTransferManager, TransferResult, UploadResult from src.config import Config -S3_DEFAULT_BATCH_SIZE = 1000 - class UploadedObjectsContainer: @@ -520,7 +518,7 @@ def __init__(self, bucket, session, region_name=None): self.bucket = bucket def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, - page_size=S3_DEFAULT_BATCH_SIZE): + page_size=StorageOperations.DEFAULT_PAGE_SIZE): client = self._get_client() delimiter = S3BucketOperations.S3_PATH_SEPARATOR bucket = self.bucket.bucket.path @@ -631,8 +629,8 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera else: return self.list_objects(client, prefix, operation_parameters, recursive, page_size) - def list_paging_items(self, relative_path=None, recursive=False, page_size=S3_DEFAULT_BATCH_SIZE, - next_token=None): + def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, + start_token=None): delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { @@ -645,7 +643,7 @@ def list_paging_items(self, relative_path=None, recursive=False, page_size=S3_DE if relative_path: operation_parameters['Prefix'] = prefix - return self.list_paging_objects(client, prefix, operation_parameters, recursive, next_token) + return self.list_paging_objects(client, prefix, operation_parameters, recursive, start_token) def get_summary_with_depth(self, max_depth, relative_path=None): bucket_name = self.bucket.bucket.path @@ -779,9 +777,9 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz break return items - def list_paging_objects(self, client, prefix, operation_parameters, recursive, next_token): - if next_token: - operation_parameters['ContinuationToken'] = next_token + def list_paging_objects(self, client, prefix, operation_parameters, recursive, start_token): + if start_token: + operation_parameters['ContinuationToken'] = start_token paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) From 8cc035a673e72e98819775002e79b8efcc8ca533 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Wed, 3 Jul 2024 13:41:12 +0400 Subject: [PATCH 07/19] test --- e2e/cli/buckets/cp/test_cp_with_folders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/cli/buckets/cp/test_cp_with_folders.py b/e2e/cli/buckets/cp/test_cp_with_folders.py index a59db40257..fe19e0ab1a 100644 --- a/e2e/cli/buckets/cp/test_cp_with_folders.py +++ b/e2e/cli/buckets/cp/test_cp_with_folders.py @@ -80,7 +80,7 @@ def setup_class(cls): @classmethod def teardown_class(cls): - delete_buckets(cls.bucket_name, cls.other_bucket_name) + # delete_buckets(cls.bucket_name, cls.other_bucket_name) clean_test_data(os.path.abspath(cls.test_file_1)) clean_test_data(os.path.abspath(cls.test_file_2)) clean_test_data(os.path.abspath(cls.test_folder)) From 5f64ee722866c1c5c24dc3c38e365696514ea6c8 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 12:27:11 +0400 Subject: [PATCH 08/19] test --- e2e/cli/buckets/cp/test_cp_with_folders.py | 2 +- .../src/utilities/datastorage_operations.py | 30 +++++-------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/e2e/cli/buckets/cp/test_cp_with_folders.py b/e2e/cli/buckets/cp/test_cp_with_folders.py index fe19e0ab1a..a59db40257 100644 --- a/e2e/cli/buckets/cp/test_cp_with_folders.py +++ b/e2e/cli/buckets/cp/test_cp_with_folders.py @@ -80,7 +80,7 @@ def setup_class(cls): @classmethod def teardown_class(cls): - # delete_buckets(cls.bucket_name, cls.other_bucket_name) + delete_buckets(cls.bucket_name, cls.other_bucket_name) clean_test_data(os.path.abspath(cls.test_file_1)) clean_test_data(os.path.abspath(cls.test_file_2)) clean_test_data(os.path.abspath(cls.test_folder)) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index f8fe7e26d9..4782be73bb 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -209,30 +209,16 @@ def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destinat permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): while True: - transfer_process = multiprocessing.Process(target=cls._transfer, - args=(items, threads, manager, source_wrapper, - destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, - checksum_skip)) + cls._transfer(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, + io_threads, on_failures, checksum_algorithm, checksum_skip) if not next_token: - transfer_process.start() - cls._handle_keyboard_interrupt([transfer_process]) return - listing_results = multiprocessing.Queue() - - def get_paging_items(): - items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - listing_results.put((items, new_next_token)) - - listing_process = multiprocessing.Process(target=get_paging_items) - transfer_process.start() - listing_process.start() - cls._handle_keyboard_interrupt([transfer_process, listing_process]) - items_batch, next_token = listing_results.get() + items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + next_token = new_next_token @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, From 51da258558662125ff105fd5ec581f7f14e0c58c Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 12:57:31 +0400 Subject: [PATCH 09/19] test --- .../src/utilities/datastorage_operations.py | 54 +++++++++++++++---- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 4782be73bb..836b9353d6 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -208,17 +208,49 @@ def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destinat quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): - while True: - cls._transfer(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, checksum_skip) - if not next_token: - return - items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - next_token = new_next_token + if threads: + while True: + cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper, + audit_ctx, clean, quiet, tags, io_threads, on_failures, + checksum_algorithm, + checksum_skip) + if not next_token: + return + items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + next_token = new_next_token + else: + with audit_ctx: + while True: + transfer_results = [] + fail_after_exception = None + for item in items: + transfer_results, fail_after_exception = cls._transfer_item(item, manager, + source_wrapper, + destination_wrapper, + transfer_results, + clean, quiet, tags, io_threads, + on_failures, None, + checksum_algorithm, + checksum_skip) + if not destination_wrapper.is_local(): + cls._flush_transfer_results(source_wrapper, destination_wrapper, + transfer_results, clean=clean, flush_size=1) + if fail_after_exception: + raise fail_after_exception + + if not next_token: + return + items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files) + next_token = new_next_token @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, From 5aa1ac5d8a0742f178244ecea617a5bc7a7d1b48 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 14:11:12 +0400 Subject: [PATCH 10/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanups --- .../src/utilities/datastorage_operations.py | 99 ++++++++----------- 1 file changed, 40 insertions(+), 59 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 836b9353d6..e38d55be09 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -170,11 +170,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags if not verify_destination and not file_list and source_wrapper.get_type() == WrapperType.S3: items_batch, next_token = source_wrapper.get_paging_items(start_token=None, page_size=BATCH_SIZE) if next_token: - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - cls._transfer_batch_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, + cls._transfer_batch_items(items_batch, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, @@ -188,69 +184,47 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) - cls._transfer(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, checksum_skip) - - @classmethod - def _transfer(cls, items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, checksum_skip): if threads: cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip) else: - cls._transfer_items(items, manager, source_wrapper, destination_wrapper, - audit_ctx, clean, quiet, tags, io_threads, on_failures, - checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip) + cls._transfer_items_with_audit_ctx(items, manager, source_wrapper, destination_wrapper, + audit_ctx, clean, quiet, tags, io_threads, on_failures, + checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip) @classmethod - def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, + def _transfer_batch_items(cls, items_batch, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): if threads: while True: + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip) if not next_token: return - items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - next_token = new_next_token + items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + else: with audit_ctx: while True: - transfer_results = [] - fail_after_exception = None - for item in items: - transfer_results, fail_after_exception = cls._transfer_item(item, manager, - source_wrapper, - destination_wrapper, - transfer_results, - clean, quiet, tags, io_threads, - on_failures, None, - checksum_algorithm, - checksum_skip) - if not destination_wrapper.is_local(): - cls._flush_transfer_results(source_wrapper, destination_wrapper, - transfer_results, clean=clean, flush_size=1) - if fail_after_exception: - raise fail_after_exception - - if not next_token: - return - items_batch, new_next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) - next_token = new_next_token + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, + io_threads, on_failures, None, checksum_algorithm, checksum_skip) + if not next_token: + return + items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, @@ -755,7 +729,7 @@ def _multiprocess_transfer_items(cls, sorted_items, threads, manager, source_wra workers = [] for i in range(threads): - process = multiprocessing.Process(target=cls._transfer_items, + process = multiprocessing.Process(target=cls._transfer_items_with_audit_ctx, args=(splitted_items[i], manager, source_wrapper, @@ -774,23 +748,30 @@ def _multiprocess_transfer_items(cls, sorted_items, threads, manager, source_wra cls._handle_keyboard_interrupt(workers) @classmethod - def _transfer_items(cls, items, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, lock=None, checksum_algorithm='md5', checksum_skip=False): + def _transfer_items_with_audit_ctx(cls, items, manager, source_wrapper, destination_wrapper, audit_ctx, clean, + quiet, tags, io_threads, on_failures, lock=None, checksum_algorithm='md5', + checksum_skip=False): with audit_ctx: - transfer_results = [] - fail_after_exception = None - for item in items: - transfer_results, fail_after_exception = cls._transfer_item(item, manager, - source_wrapper, destination_wrapper, - transfer_results, - clean, quiet, tags, io_threads, - on_failures, lock, checksum_algorithm, - checksum_skip) - if not destination_wrapper.is_local(): - cls._flush_transfer_results(source_wrapper, destination_wrapper, - transfer_results, clean=clean, flush_size=1) - if fail_after_exception: - raise fail_after_exception + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, + io_threads, on_failures, lock, checksum_algorithm, checksum_skip) + + @classmethod + def _transfer_items(cls, items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, + io_threads, on_failures, lock=None, checksum_algorithm='md5', checksum_skip=False): + transfer_results = [] + fail_after_exception = None + for item in items: + transfer_results, fail_after_exception = cls._transfer_item(item, manager, + source_wrapper, destination_wrapper, + transfer_results, + clean, quiet, tags, io_threads, + on_failures, lock, checksum_algorithm, + checksum_skip) + if not destination_wrapper.is_local(): + cls._flush_transfer_results(source_wrapper, destination_wrapper, + transfer_results, clean=clean, flush_size=1) + if fail_after_exception: + raise fail_after_exception @classmethod def _transfer_item(cls, item, manager, source_wrapper, destination_wrapper, transfer_results, From 967e252623e55d51779039a06dc7ef025a318922 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 17:06:13 +0400 Subject: [PATCH 11/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - async batch --- .../src/utilities/datastorage_operations.py | 134 ++++++++++++++---- 1 file changed, 109 insertions(+), 25 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index e38d55be09..5f585e1370 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -170,7 +170,11 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags if not verify_destination and not file_list and source_wrapper.get_type() == WrapperType.S3: items_batch, next_token = source_wrapper.get_paging_items(start_token=None, page_size=BATCH_SIZE) if next_token: - cls._transfer_batch_items(items_batch, threads, manager, source_wrapper, destination_wrapper, audit_ctx, + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + cls._transfer_batch_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, @@ -194,37 +198,108 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip) @classmethod - def _transfer_batch_items(cls, items_batch, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, + def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): if threads: + cls._multiprocess_transfer_batch(audit_ctx, checksum_algorithm, checksum_skip, clean, + destination_wrapper, exclude, force, include, io_threads, items, + manager, next_token, on_empty_files, on_failures, on_unsafe_chars, + on_unsafe_chars_replacement, permission_to_check, quiet, + skip_existing, source_wrapper, sync_newer, tags, threads, + verify_destination) + else: + cls._transfer_batch(audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, + exclude, force, include, io_threads, items, manager, next_token, on_empty_files, + on_failures, on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, + quiet, skip_existing, source_wrapper, sync_newer, tags, verify_destination) + + @classmethod + def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, exclude, force, + include, io_threads, items, manager, next_token, on_empty_files, on_failures, on_unsafe_chars, + on_unsafe_chars_replacement, permission_to_check, quiet, skip_existing, source_wrapper, + sync_newer, tags, verify_destination): + with audit_ctx: while True: - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper, - audit_ctx, clean, quiet, tags, io_threads, on_failures, - checksum_algorithm, - checksum_skip) if not next_token: - return - items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - - else: - with audit_ctx: - while True: - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files) cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, io_threads, on_failures, None, checksum_algorithm, checksum_skip) - if not next_token: - return - items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + return + transfer_worker = multiprocessing.Process(target=cls._transfer_items, + args=(items, manager, source_wrapper, destination_wrapper, + clean, quiet, tags, io_threads, on_failures, None, + checksum_algorithm, checksum_skip)) + transfer_worker.start() + listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper, + destination_wrapper, permission_to_check, + include, exclude, force, quiet, + skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token) + workers = [transfer_worker, listing_worker] + cls._handle_keyboard_interrupt(workers) + items, next_token = listing_results.get() + + @classmethod + def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, + exclude, force, include, io_threads, items, manager, next_token, on_empty_files, + on_failures, on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, + quiet, skip_existing, source_wrapper, sync_newer, tags, threads, + verify_destination): + while True: + if not next_token: + cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, + destination_wrapper, audit_ctx, clean, quiet, tags, + io_threads, on_failures, checksum_algorithm, + checksum_skip) + return + transfer_workers = cls._start_multiprocess_transfer(items, threads, manager, source_wrapper, + destination_wrapper, audit_ctx, clean, quiet, tags, + io_threads, on_failures, checksum_algorithm, + checksum_skip) + listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper, + destination_wrapper, permission_to_check, + include, exclude, force, quiet, + skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token) + workers = [] + workers.extend(transfer_workers) + workers.append(listing_worker) + cls._handle_keyboard_interrupt(workers) + items, next_token = listing_results.get() + + @classmethod + def _start_fetch_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, + force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files, next_token): + listing_results = multiprocessing.Queue() + + def get_paging_items(): + listing_results.put(cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, + quiet, skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token)) + + listing_worker = multiprocessing.Process(target=get_paging_items) + listing_worker.start() + return listing_worker, listing_results + + @classmethod + def _fetch_paging_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, + force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files, next_token): + items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + return items, next_token @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, @@ -719,7 +794,7 @@ def _split_items_by_process(cls, sorted_items, threads): return splitted_items @classmethod - def _multiprocess_transfer_items(cls, sorted_items, threads, manager, source_wrapper, destination_wrapper, + def _start_multiprocess_transfer(cls, sorted_items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip): size_index = 3 @@ -745,6 +820,15 @@ def _multiprocess_transfer_items(cls, sorted_items, threads, manager, source_wra checksum_skip)) process.start() workers.append(process) + return workers + + @classmethod + def _multiprocess_transfer_items(cls, sorted_items, threads, manager, source_wrapper, destination_wrapper, + audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, + checksum_skip): + workers = cls._start_multiprocess_transfer(sorted_items, threads, manager, source_wrapper, destination_wrapper, + audit_ctx, clean, quiet, tags, io_threads, on_failures, + checksum_algorithm, checksum_skip) cls._handle_keyboard_interrupt(workers) @classmethod From 633380afa8616c19ddfc131b6eaee12876e2ea20 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 19:23:22 +0400 Subject: [PATCH 12/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - disable async batch by default --- .../src/utilities/datastorage_operations.py | 80 ++++++++++--------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 5f585e1370..c1290631e6 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -42,7 +42,8 @@ FOLDER_MARKER = '.DS_Store' STORAGE_DETAILS_SEPARATOR = ', ' DEFAULT_BATCH_SIZE = 1000 -BATCH_SIZE = os.getenv('CP_CLI_STORAGE_LIST', DEFAULT_BATCH_SIZE) +BATCH_SIZE = os.getenv('CP_CLI_STORAGE_BATCH_SIZE', DEFAULT_BATCH_SIZE) +ASYNC_BATCH_ENABLE = str(os.getenv('CP_CLI_STORAGE_ASYNC_BATCH_ENABLE', 'false')).lower() == 'true' ARCHIVED_PERMISSION_ERROR_MASSAGE = 'Error: Failed to apply --show-archived option: Permission denied.' @@ -226,21 +227,29 @@ def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, de cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, io_threads, on_failures, None, checksum_algorithm, checksum_skip) return - transfer_worker = multiprocessing.Process(target=cls._transfer_items, - args=(items, manager, source_wrapper, destination_wrapper, - clean, quiet, tags, io_threads, on_failures, None, - checksum_algorithm, checksum_skip)) - transfer_worker.start() - listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper, - destination_wrapper, permission_to_check, - include, exclude, force, quiet, - skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token) - workers = [transfer_worker, listing_worker] - cls._handle_keyboard_interrupt(workers) - items, next_token = listing_results.get() + if ASYNC_BATCH_ENABLE: + transfer_worker = multiprocessing.Process(target=cls._transfer_items, + args=(items, manager, source_wrapper, destination_wrapper, + clean, quiet, tags, io_threads, on_failures, None, + checksum_algorithm, checksum_skip)) + transfer_worker.start() + listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper, + destination_wrapper, permission_to_check, + include, exclude, force, quiet, + skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token) + workers = [transfer_worker, listing_worker] + cls._handle_keyboard_interrupt(workers) + else: + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, + io_threads, on_failures, None, checksum_algorithm, checksum_skip) + items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files) @classmethod def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, @@ -259,47 +268,44 @@ def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_sk destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip) - listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper, - destination_wrapper, permission_to_check, - include, exclude, force, quiet, - skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token) + listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper, + destination_wrapper, permission_to_check, + include, exclude, force, quiet, + skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token) workers = [] workers.extend(transfer_workers) workers.append(listing_worker) cls._handle_keyboard_interrupt(workers) - items, next_token = listing_results.get() @classmethod def _start_fetch_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files, next_token): listing_results = multiprocessing.Queue() - - def get_paging_items(): - listing_results.put(cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, - quiet, skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token)) - - listing_worker = multiprocessing.Process(target=get_paging_items) + listing_worker = multiprocessing.Process(target=cls._fetch_paging_items, + args=(manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, + quiet, skip_existing, sync_newer, + verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, + on_empty_files, next_token, listing_results)) listing_worker.start() - return listing_worker, listing_results + items, token = listing_results.get() + return listing_worker, items, token @classmethod def _fetch_paging_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token): + on_unsafe_chars_replacement, on_empty_files, next_token, listing_results): items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) - return items, next_token + listing_results.put((items, next_token)) @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, From f1418421929bb6d84793ceaa590ec862baac00e5 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Thu, 4 Jul 2024 20:03:47 +0400 Subject: [PATCH 13/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- pipe-cli/src/utilities/datastorage_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index c1290631e6..88d1eb8356 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -42,7 +42,7 @@ FOLDER_MARKER = '.DS_Store' STORAGE_DETAILS_SEPARATOR = ', ' DEFAULT_BATCH_SIZE = 1000 -BATCH_SIZE = os.getenv('CP_CLI_STORAGE_BATCH_SIZE', DEFAULT_BATCH_SIZE) +BATCH_SIZE = int(os.getenv('CP_CLI_STORAGE_BATCH_SIZE', DEFAULT_BATCH_SIZE)) ASYNC_BATCH_ENABLE = str(os.getenv('CP_CLI_STORAGE_ASYNC_BATCH_ENABLE', 'false')).lower() == 'true' ARCHIVED_PERMISSION_ERROR_MASSAGE = 'Error: Failed to apply --show-archived option: Permission denied.' From 2fedd3ef713ea7e43516387e1107983689de044b Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Fri, 5 Jul 2024 12:11:23 +0400 Subject: [PATCH 14/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- .../src/utilities/datastorage_operations.py | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 88d1eb8356..71d73dbca6 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -23,6 +23,7 @@ from botocore.exceptions import ClientError from future.utils import iteritems from operator import itemgetter +import concurrent.futures from src.api.data_storage import DataStorage from src.api.folder import Folder @@ -233,15 +234,12 @@ def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, de clean, quiet, tags, io_threads, on_failures, None, checksum_algorithm, checksum_skip)) transfer_worker.start() - listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper, - destination_wrapper, permission_to_check, - include, exclude, force, quiet, - skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token) - workers = [transfer_worker, listing_worker] - cls._handle_keyboard_interrupt(workers) + items, next_token = cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, + skip_existing, sync_newer, verify_destination, + on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files, next_token) + cls._handle_keyboard_interrupt([transfer_worker]) else: cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, io_threads, on_failures, None, checksum_algorithm, checksum_skip) @@ -268,44 +266,22 @@ def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_sk destination_wrapper, audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip) - listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper, - destination_wrapper, permission_to_check, - include, exclude, force, quiet, - skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token) - workers = [] - workers.extend(transfer_workers) - workers.append(listing_worker) - cls._handle_keyboard_interrupt(workers) - - @classmethod - def _start_fetch_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, - force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token): - listing_results = multiprocessing.Queue() - listing_worker = multiprocessing.Process(target=cls._fetch_paging_items, - args=(manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, - quiet, skip_existing, sync_newer, - verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, - on_empty_files, next_token, listing_results)) - listing_worker.start() - items, token = listing_results.get() - return listing_worker, items, token + items, next_token = cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, + skip_existing, sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files, next_token) + cls._handle_keyboard_interrupt(transfer_workers) @classmethod def _fetch_paging_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token, listing_results): + on_unsafe_chars_replacement, on_empty_files, next_token): items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) - listing_results.put((items, next_token)) + return items, next_token @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, From bf08e1bd364bb1dc17e6e5d1d25034da743f7b2c Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Fri, 5 Jul 2024 12:19:20 +0400 Subject: [PATCH 15/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- .../src/utilities/datastorage_operations.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 71d73dbca6..c04d47cea6 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -229,17 +229,15 @@ def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, de io_threads, on_failures, None, checksum_algorithm, checksum_skip) return if ASYNC_BATCH_ENABLE: - transfer_worker = multiprocessing.Process(target=cls._transfer_items, - args=(items, manager, source_wrapper, destination_wrapper, - clean, quiet, tags, io_threads, on_failures, None, - checksum_algorithm, checksum_skip)) - transfer_worker.start() - items, next_token = cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, - skip_existing, sync_newer, verify_destination, - on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files, next_token) - cls._handle_keyboard_interrupt([transfer_worker]) + with concurrent.futures.ThreadPoolExecutor(1) as executor: + future = executor.submit(cls._fetch_paging_items, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files, next_token) + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, + clean, quiet, tags, io_threads, on_failures, None, + checksum_algorithm, checksum_skip) + items, next_token = future.result() else: cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, io_threads, on_failures, None, checksum_algorithm, checksum_skip) From 8300564d6de10d958280cd9535fdf28d0140369a Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Fri, 5 Jul 2024 12:35:28 +0400 Subject: [PATCH 16/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- pipe-cli/pipe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipe-cli/pipe.py b/pipe-cli/pipe.py index c3bd6c155a..9fd5cdf560 100644 --- a/pipe-cli/pipe.py +++ b/pipe-cli/pipe.py @@ -292,6 +292,8 @@ def cli(): CP_LOGGING_LEVEL Explicit logging level: CRITICAL, ERROR, WARNING, INFO or DEBUG. Defaults to ERROR. CP_LOGGING_FORMAT Explicit logging format. Default is `%(asctime)s:%(levelname)s: %(message)s` CP_TRACE=[True|False] Enables verbose errors. + CP_CLI_STORAGE_BATCH_SIZE The number of objects per request for pipe storage operations (Default: 1000) + CP_CLI_STORAGE_ASYNC_BATCH_ENABLE Enables asynchronous batch transfer """ pass From bb6abd476cd2964d04d65c9fcb8ec2348f4bd25b Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Mon, 8 Jul 2024 17:14:14 +0300 Subject: [PATCH 17/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - support local paths --- pipe-cli/src/model/data_storage_wrapper.py | 28 ++-- .../src/utilities/datastorage_operations.py | 148 +++++++++--------- pipe-cli/src/utilities/storage/azure.py | 3 - pipe-cli/src/utilities/storage/common.py | 4 - pipe-cli/src/utilities/storage/gs.py | 3 - pipe-cli/src/utilities/storage/s3.py | 27 +--- 6 files changed, 87 insertions(+), 126 deletions(-) diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index 64ce5d1707..7ae88ad87c 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -303,9 +303,6 @@ def exists(self): def get_items(self, quiet=False): return self.get_list_manager().get_items(self.path) - def get_paging_items(self, start_token, page_size): - return self.get_list_manager().get_paging_items(self.path, start_token=start_token, page_size=page_size) - def is_empty(self, relative=None): if not self.exists(): return True @@ -525,17 +522,18 @@ def get_items(self, quiet=False): if os.path.isfile(self.path): if os.path.islink(self.path) and self.symlinks == AllowedSymlinkValues.SKIP: - return [] - return [(FILE, self.path, self._leaf_path(self.path), os.path.getsize(self.path), None)] - - return self._list_items(self.path, self._leaf_path(self.path), result=[], visited_symlinks=set(), - root=True, quiet=quiet) + return + yield (FILE, self.path, self._leaf_path(self.path), os.path.getsize(self.path), None) + else: + for item in self._list_items(self.path, self._leaf_path(self.path), visited_symlinks=set(), root=True, + quiet=quiet): + yield item def _leaf_path(self, source_path): head, tail = os.path.split(source_path) return tail or os.path.basename(head) - def _list_items(self, path, parent, result, visited_symlinks, root, quiet): + def _list_items(self, path, parent, visited_symlinks, root, quiet): logging.debug(u'Listing directory {}...'.format(path)) path = to_unicode(path) parent = to_unicode(parent) @@ -545,7 +543,8 @@ def _list_items(self, path, parent, result, visited_symlinks, root, quiet): while attempts > 0: attempts -= 1 try: - self._collect_item(path, parent, item, result, visited_symlinks, root, quiet) + for result_item in self._collect_item(path, parent, item, visited_symlinks, root, quiet): + yield result_item break except DefectiveFileSystemError: if attempts > 0: @@ -555,8 +554,6 @@ def _list_items(self, path, parent, result, visited_symlinks, root, quiet): else: raise - return result - def _os_listdir(self, path): try: return os.listdir(to_string(path)) @@ -567,7 +564,7 @@ def _os_listdir(self, path): raise DefectiveFileSystemError(err_msg) raise - def _collect_item(self, path, parent, item, result, visited_symlinks, root, quiet): + def _collect_item(self, path, parent, item, visited_symlinks, root, quiet): safe_item = to_unicode(item, replacing=True) safe_absolute_path = os.path.join(path, safe_item) @@ -605,9 +602,10 @@ def _collect_item(self, path, parent, item, result, visited_symlinks, root, quie if os.path.isfile(to_string(absolute_path)): logging.debug(u'Collected file {}.'.format(safe_absolute_path)) - result.append((FILE, absolute_path, relative_path, os.path.getsize(to_string(absolute_path)), None)) + yield tuple([FILE, absolute_path, relative_path, os.path.getsize(to_string(absolute_path)), None]) elif os.path.isdir(to_string(absolute_path)): - self._list_items(absolute_path, relative_path, result, visited_symlinks, root=False, quiet=quiet) + for folder_item in self._list_items(absolute_path, relative_path, visited_symlinks, root=False, quiet=quiet): + yield folder_item if symlink_target and os.path.islink(to_string(path)) and symlink_target in visited_symlinks: visited_symlinks.remove(symlink_target) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index c04d47cea6..b7ae3ab566 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import click import datetime @@ -169,23 +170,21 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, events=audit_ctx.container, command=command) - if not verify_destination and not file_list and source_wrapper.get_type() == WrapperType.S3: - items_batch, next_token = source_wrapper.get_paging_items(start_token=None, page_size=BATCH_SIZE) - if next_token: - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - cls._transfer_batch_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, - clean, quiet, tags, io_threads, on_failures, checksum_algorithm, - checksum_skip, next_token, permission_to_check, include, exclude, force, - skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files) - sys.exit(0) - else: - items = items_batch - else: - items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet) + batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.S3 or + destination_wrapper.get_type() == WrapperType.S3) + if batch_allowed: + items_iterator = iter(source_wrapper.get_items(quiet=quiet)) + items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + cls._transfer_batch_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, + quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, + items_iterator, permission_to_check, include, exclude, force, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) + sys.exit(0) + items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet) if source_type not in [WrapperType.STREAM]: items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, @@ -201,85 +200,84 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags @classmethod def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, - quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, next_token, + quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, items_iterator, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): if threads: cls._multiprocess_transfer_batch(audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, exclude, force, include, io_threads, items, - manager, next_token, on_empty_files, on_failures, on_unsafe_chars, + manager, items_iterator, on_empty_files, on_failures, on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, quiet, skip_existing, source_wrapper, sync_newer, tags, threads, verify_destination) else: cls._transfer_batch(audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, - exclude, force, include, io_threads, items, manager, next_token, on_empty_files, + exclude, force, include, io_threads, items, manager, items_iterator, on_empty_files, on_failures, on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, quiet, skip_existing, source_wrapper, sync_newer, tags, verify_destination) @classmethod def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, exclude, force, - include, io_threads, items, manager, next_token, on_empty_files, on_failures, on_unsafe_chars, - on_unsafe_chars_replacement, permission_to_check, quiet, skip_existing, source_wrapper, - sync_newer, tags, verify_destination): + include, io_threads, items, manager, items_iterator, on_empty_files, on_failures, + on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, quiet, skip_existing, + source_wrapper, sync_newer, tags, verify_destination): with audit_ctx: while True: - if not next_token: - cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, - io_threads, on_failures, None, checksum_algorithm, checksum_skip) - return - if ASYNC_BATCH_ENABLE: - with concurrent.futures.ThreadPoolExecutor(1) as executor: - future = executor.submit(cls._fetch_paging_items, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token) - cls._transfer_items(items, manager, source_wrapper, destination_wrapper, - clean, quiet, tags, io_threads, on_failures, None, - checksum_algorithm, checksum_skip) - items, next_token = future.result() - else: - cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags, - io_threads, on_failures, None, checksum_algorithm, checksum_skip) - items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files) + try: + if ASYNC_BATCH_ENABLE: + with concurrent.futures.ThreadPoolExecutor(1) as executor: + future = executor.submit(cls._fetch_batch_items, items_iterator, manager, + source_wrapper, destination_wrapper, permission_to_check, + include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files) + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, + clean, quiet, tags, io_threads, on_failures, None, + checksum_algorithm, checksum_skip) + items = future.result() + else: + cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, + tags, io_threads, on_failures, None, checksum_algorithm, + checksum_skip) + items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, + destination_wrapper, + permission_to_check, include, exclude, force, quiet, + skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files) + except StopIteration: + break @classmethod def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, - exclude, force, include, io_threads, items, manager, next_token, on_empty_files, - on_failures, on_unsafe_chars, on_unsafe_chars_replacement, permission_to_check, - quiet, skip_existing, source_wrapper, sync_newer, tags, threads, - verify_destination): + exclude, force, include, io_threads, items, manager, items_iterator, + on_empty_files, on_failures, on_unsafe_chars, on_unsafe_chars_replacement, + permission_to_check, quiet, skip_existing, source_wrapper, sync_newer, tags, + threads, verify_destination): while True: - if not next_token: - cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, - destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, - checksum_skip) - return - transfer_workers = cls._start_multiprocess_transfer(items, threads, manager, source_wrapper, - destination_wrapper, audit_ctx, clean, quiet, tags, - io_threads, on_failures, checksum_algorithm, - checksum_skip) - items, next_token = cls._fetch_paging_items(manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, - skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token) - cls._handle_keyboard_interrupt(transfer_workers) - - @classmethod - def _fetch_paging_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, - force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars, - on_unsafe_chars_replacement, on_empty_files, next_token): - items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE) - items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - return items, next_token + try: + transfer_workers = cls._start_multiprocess_transfer(items, threads, manager, source_wrapper, + destination_wrapper, audit_ctx, clean, quiet, tags, + io_threads, on_failures, checksum_algorithm, + checksum_skip) + items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, + on_unsafe_chars_replacement, on_empty_files) + cls._handle_keyboard_interrupt(transfer_workers) + except StopIteration: + break + + @classmethod + def _fetch_batch_items(cls, items_iterator, manager, source_wrapper, destination_wrapper, permission_to_check, + include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, + on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): + batch_items_iterator = itertools.islice(items_iterator, BATCH_SIZE) + items_batch = itertools.chain([next(batch_items_iterator)], batch_items_iterator) + items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper, permission_to_check, + include, exclude, force, quiet, skip_existing, sync_newer, verify_destination, + on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) + return items @classmethod def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check, diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 91d8ab2e41..89b95b28af 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -96,9 +96,6 @@ def list_paging_items(self, relative_path=None, recursive=False, page_size=Stora start_token=None, show_archive=False): return self.list_items(relative_path, recursive, page_size, show_all=False, show_archive=show_archive), None - def get_paging_items(self, relative_path, start_token, page_size): - return self.get_items(relative_path), None - def get_summary(self, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, diff --git a/pipe-cli/src/utilities/storage/common.py b/pipe-cli/src/utilities/storage/common.py index 75c86b2765..5b7649d04d 100644 --- a/pipe-cli/src/utilities/storage/common.py +++ b/pipe-cli/src/utilities/storage/common.py @@ -397,10 +397,6 @@ def get_items(self, relative_path): yield ('File', item.name, item_relative_path, item.size, StorageOperations.get_item_modification_datetime_utc(item)) - @abstractmethod - def get_paging_items(self, relative_path, start_token, page_size): - pass - def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR): prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + delimiter for item in self.list_items(prefix, show_all=True): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index 5c51cbbd5e..5e7e6ec167 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -447,9 +447,6 @@ def list_paging_items(self, relative_path=None, recursive=False, page_size=Stora start_token=None, show_archive=False): return self.list_items(relative_path, recursive, page_size, show_all=False, show_archive=show_archive), None - def get_paging_items(self, relative_path, next_token, page_size): - return self.get_items(relative_path), None - def get_summary_with_depth(self, max_depth, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) bucket = self.client.bucket(self.bucket.path) diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index 8b30a03511..26f1ef1f06 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -971,10 +971,6 @@ def get_folder_object(self, name): def get_items(self, relative_path): return S3BucketOperations.get_items(self.bucket, session=self.session) - def get_paging_items(self, relative_path, start_token, page_size): - return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, session=self.session, - start_token=start_token) - def get_file_tags(self, relative_path): return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager( self.session, self.bucket, self.region_name, endpoint=self.endpoint), relative_path) @@ -1099,11 +1095,6 @@ def get_item_name(cls, param, prefix=None): @classmethod def get_items(cls, storage_wrapper, session=None): - results, _ = cls.get_paging_items(storage_wrapper, session=session) - return results - - @classmethod - def get_paging_items(cls, storage_wrapper, page_size=None, session=None, start_token=None): if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') @@ -1117,30 +1108,14 @@ def get_paging_items(cls, storage_wrapper, page_size=None, session=None, start_t prefix = cls.get_prefix(delimiter, storage_wrapper.path) operation_parameters['Prefix'] = prefix - - if page_size: - operation_parameters['PaginationConfig'] = { - 'PageSize': page_size, - 'MaxItems': page_size - } - - if start_token: - operation_parameters['ContinuationToken'] = start_token - page_iterator = paginator.paginate(**operation_parameters) - results = [] for page in page_iterator: if 'Contents' in page: for file in page['Contents']: name = cls.get_item_name(file['Key'], prefix=prefix) if name.endswith(delimiter): continue - results.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'], - file['LastModified'])) - next_page_token = page.get('NextContinuationToken', None) if page else None - if page_size: - return results, next_page_token - return results, None + yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'], file['LastModified']) @classmethod def path_exists(cls, storage_wrapper, relative_path, session=None): From a62bebaa55623a7bd8ad7a33662e0b6004a05db5 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Tue, 9 Jul 2024 13:31:12 +0300 Subject: [PATCH 18/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- pipe-cli/src/utilities/datastorage_operations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index b7ae3ab566..f393abe509 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -170,8 +170,8 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, events=audit_ctx.container, command=command) - batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.S3 or - destination_wrapper.get_type() == WrapperType.S3) + batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.LOCAL + or source_wrapper.get_type() == WrapperType.S3) if batch_allowed: items_iterator = iter(source_wrapper.get_items(quiet=quiet)) items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, destination_wrapper, From 392162a128ad986238eef9487246ee850a141d04 Mon Sep 17 00:00:00 2001 From: Ekaterina_Kazachkova Date: Tue, 9 Jul 2024 14:14:50 +0300 Subject: [PATCH 19/19] Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) - cleanup --- .../src/utilities/datastorage_operations.py | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index f393abe509..c93614abe2 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -173,16 +173,10 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.LOCAL or source_wrapper.get_type() == WrapperType.S3) if batch_allowed: - items_iterator = iter(source_wrapper.get_items(quiet=quiet)) - items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, destination_wrapper, - permission_to_check, include, exclude, force, quiet, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) - cls._transfer_batch_items(items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, - quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, - items_iterator, permission_to_check, include, exclude, force, skip_existing, - sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, - on_empty_files) + cls._transfer_batch_items(threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet, + tags, io_threads, on_failures, checksum_algorithm, checksum_skip, + permission_to_check, include, exclude, force, skip_existing, sync_newer, + verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files) sys.exit(0) items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet) if source_type not in [WrapperType.STREAM]: @@ -199,10 +193,15 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip) @classmethod - def _transfer_batch_items(cls, items, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, - quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, items_iterator, + def _transfer_batch_items(cls, threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, + quiet, tags, io_threads, on_failures, checksum_algorithm, checksum_skip, permission_to_check, include, exclude, force, skip_existing, sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files): + items_iterator = iter(source_wrapper.get_items(quiet=quiet)) + items = cls._fetch_batch_items(items_iterator, manager, source_wrapper, destination_wrapper, + permission_to_check, include, exclude, force, quiet, skip_existing, + sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, + on_empty_files) if threads: cls._multiprocess_transfer_batch(audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper, exclude, force, include, io_threads, items,