Skip to content

Commit

Permalink
Issue #2574: pipe storage cp shall start data upload before traversin…
Browse files Browse the repository at this point in the history
…g full source hierarchy - async batch collection for cp/mv operation
  • Loading branch information
ekazachkova committed Apr 11, 2022
1 parent e8aad3c commit 2255ef4
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ def exists(self):
def get_items(self):
return self.get_list_manager().get_items(self.path)

def get_paging_items(self, next_token, page_size):
return self.get_list_manager().get_paging_items(self.path, next_token, page_size)
def get_paging_items(self, next_token, page_size, results=None):
return self.get_list_manager().get_paging_items(self.path, next_token, page_size, results=results)

def is_empty(self, relative=None):
if not self.exists():
Expand Down
37 changes: 29 additions & 8 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from src.utilities.storage.umount import Umount

FOLDER_MARKER = '.DS_Store'
DEFAULT_BATCH_SIZE = 2
DEFAULT_BATCH_SIZE = 1000
BATCH_SIZE = os.getenv('CP_CLI_STORAGE_LIST', DEFAULT_BATCH_SIZE)


Expand Down Expand Up @@ -104,13 +104,34 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags
force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads)
else:
items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE)
while True:
cls._transfer(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include,
exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags,
io_threads)
if not next_token:
return
items_batch, next_token = source_wrapper.get_paging_items(next_token=next_token, page_size=BATCH_SIZE)
cls._transfer_batch_items(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination, threads,
clean, tags, io_threads, next_token)

@classmethod
def _transfer_batch_items(cls, source_wrapper, destination_wrapper, items_batch, manager, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags,
io_threads, next_token):
while True:
workers = []
transfer_process = multiprocessing.Process(target=cls._transfer,
args=(source_wrapper, destination_wrapper, items_batch,
manager, permission_to_check, include,
exclude, force, quiet, skip_existing,
verify_destination, threads, clean, tags, io_threads))
workers.append(transfer_process)
if not next_token:
transfer_process.start()
cls._handle_keyboard_interrupt(workers)
return
listing_results = multiprocessing.Queue()
listing_process = multiprocessing.Process(target=source_wrapper.get_paging_items,
args=(next_token, BATCH_SIZE, listing_results))
workers.append(listing_process)
transfer_process.start()
listing_process.start()
cls._handle_keyboard_interrupt(workers)
items_batch, next_token = listing_results.get()

@classmethod
def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, permission_to_check,
Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
absolute_items = [self._to_storage_item(blob) for blob in blobs_generator]
return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items]

def get_paging_items(self, relative_path, next_token, page_size):
def get_paging_items(self, relative_path, next_token, page_size, results=None):
return self.get_items(relative_path), None

def get_summary(self, relative_path=None):
Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def get_items(self, relative_path):
yield ('File', item.name, item_relative_path, item.size)

@abstractmethod
def get_paging_items(self, relative_path, next_token, page_size):
def get_paging_items(self, relative_path, next_token, page_size, results=None):
pass

def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR):
Expand Down
2 changes: 1 addition & 1 deletion pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def get_summary(self, relative_path=None):
count += 1
return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size]

def get_paging_items(self, relative_path, next_token, page_size):
def get_paging_items(self, relative_path, next_token, page_size, results=None):
return self.get_items(relative_path), None

def get_summary_with_depth(self, max_depth, relative_path=None):
Expand Down
11 changes: 7 additions & 4 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,9 @@ def get_folder_object(self, name):
def get_items(self, relative_path):
return S3BucketOperations.get_items(self.bucket, session=self.session)

def get_paging_items(self, relative_path, next_token, page_size):
def get_paging_items(self, relative_path, next_token, page_size, results=None):
return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token,
session=self.session)
session=self.session, results=results)

def get_file_tags(self, relative_path):
return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager(
Expand Down Expand Up @@ -979,7 +979,7 @@ def get_items(cls, storage_wrapper, session=None):
yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])

@classmethod
def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None):
def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None, results=None):
if session is None:
session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp')

Expand Down Expand Up @@ -1009,7 +1009,10 @@ def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=N
if name.endswith(delimiter):
continue
res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']))
return res, page.get('NextContinuationToken', None) if page else None
next_page_token = page.get('NextContinuationToken', None) if page else None
if results is not None:
results.put((res, next_page_token))
return res, next_page_token

@classmethod
def path_exists(cls, storage_wrapper, relative_path, session=None):
Expand Down

0 comments on commit 2255ef4

Please sign in to comment.