Skip to content

Commit

Permalink
Issue #2574: pipe storage cp shall start data upload before traversin…
Browse files Browse the repository at this point in the history
…g full source hierarchy (S3 provider) - disable async batch by default
  • Loading branch information
ekazachkova committed Jul 4, 2024
1 parent 967e252 commit 633380a
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
FOLDER_MARKER = '.DS_Store'
STORAGE_DETAILS_SEPARATOR = ', '
DEFAULT_BATCH_SIZE = 1000
BATCH_SIZE = os.getenv('CP_CLI_STORAGE_LIST', DEFAULT_BATCH_SIZE)
BATCH_SIZE = os.getenv('CP_CLI_STORAGE_BATCH_SIZE', DEFAULT_BATCH_SIZE)
ASYNC_BATCH_ENABLE = str(os.getenv('CP_CLI_STORAGE_ASYNC_BATCH_ENABLE', 'false')).lower() == 'true'
ARCHIVED_PERMISSION_ERROR_MASSAGE = 'Error: Failed to apply --show-archived option: Permission denied.'


Expand Down Expand Up @@ -226,21 +227,29 @@ def _transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, de
cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags,
io_threads, on_failures, None, checksum_algorithm, checksum_skip)
return
transfer_worker = multiprocessing.Process(target=cls._transfer_items,
args=(items, manager, source_wrapper, destination_wrapper,
clean, quiet, tags, io_threads, on_failures, None,
checksum_algorithm, checksum_skip))
transfer_worker.start()
listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper,
destination_wrapper, permission_to_check,
include, exclude, force, quiet,
skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token)
workers = [transfer_worker, listing_worker]
cls._handle_keyboard_interrupt(workers)
items, next_token = listing_results.get()
if ASYNC_BATCH_ENABLE:
transfer_worker = multiprocessing.Process(target=cls._transfer_items,
args=(items, manager, source_wrapper, destination_wrapper,
clean, quiet, tags, io_threads, on_failures, None,
checksum_algorithm, checksum_skip))
transfer_worker.start()
listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper,
destination_wrapper, permission_to_check,
include, exclude, force, quiet,
skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token)
workers = [transfer_worker, listing_worker]
cls._handle_keyboard_interrupt(workers)
else:
cls._transfer_items(items, manager, source_wrapper, destination_wrapper, clean, quiet, tags,
io_threads, on_failures, None, checksum_algorithm, checksum_skip)
items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE)
items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper,
permission_to_check, include, exclude, force, quiet, skip_existing,
sync_newer, verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement, on_empty_files)

@classmethod
def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_skip, clean, destination_wrapper,
Expand All @@ -259,47 +268,44 @@ def _multiprocess_transfer_batch(cls, audit_ctx, checksum_algorithm, checksum_sk
destination_wrapper, audit_ctx, clean, quiet, tags,
io_threads, on_failures, checksum_algorithm,
checksum_skip)
listing_worker, listing_results = cls._start_fetch_items(manager, source_wrapper,
destination_wrapper, permission_to_check,
include, exclude, force, quiet,
skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token)
listing_worker, items, next_token = cls._start_fetch_items(manager, source_wrapper,
destination_wrapper, permission_to_check,
include, exclude, force, quiet,
skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token)
workers = []
workers.extend(transfer_workers)
workers.append(listing_worker)
cls._handle_keyboard_interrupt(workers)
items, next_token = listing_results.get()

@classmethod
def _start_fetch_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude,
force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement, on_empty_files, next_token):
listing_results = multiprocessing.Queue()

def get_paging_items():
listing_results.put(cls._fetch_paging_items(manager, source_wrapper, destination_wrapper,
permission_to_check, include, exclude, force,
quiet, skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token))

listing_worker = multiprocessing.Process(target=get_paging_items)
listing_worker = multiprocessing.Process(target=cls._fetch_paging_items,
args=(manager, source_wrapper, destination_wrapper,
permission_to_check, include, exclude, force,
quiet, skip_existing, sync_newer,
verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement,
on_empty_files, next_token, listing_results))
listing_worker.start()
return listing_worker, listing_results
items, token = listing_results.get()
return listing_worker, items, token

@classmethod
def _fetch_paging_items(cls, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude,
force, quiet, skip_existing, sync_newer, verify_destination, on_unsafe_chars,
on_unsafe_chars_replacement, on_empty_files, next_token):
on_unsafe_chars_replacement, on_empty_files, next_token, listing_results):
items_batch, next_token = source_wrapper.get_paging_items(next_token, BATCH_SIZE)
items = cls._filter_items(items_batch, manager, source_wrapper, destination_wrapper,
permission_to_check, include, exclude, force, quiet, skip_existing,
sync_newer, verify_destination, on_unsafe_chars, on_unsafe_chars_replacement,
on_empty_files)
return items, next_token
listing_results.put((items, next_token))

@classmethod
def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check,
Expand Down

0 comments on commit 633380a

Please sign in to comment.