Skip to content

Commit

Permalink
Issue #2574: pipe storage cp shall start data upload before traversin…
Browse files Browse the repository at this point in the history
…g full source hierarchy (WIP)
  • Loading branch information
ekazachkova committed Apr 5, 2022
1 parent 8a06908 commit c4cdf8d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ def exists(self):
def get_items(self):
return self.get_list_manager().get_items(self.path)

def get_paging_items(self, next_token, page_size):
return self.get_list_manager().get_paging_items(self.path, next_token, page_size)

def is_empty(self, relative=None):
if not self.exists():
return True
Expand Down
25 changes: 22 additions & 3 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from src.utilities.storage.umount import Umount

FOLDER_MARKER = '.DS_Store'
BATCH_SIZE = 2


class DataStorageOperations(object):
Expand Down Expand Up @@ -95,8 +96,26 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags
command = 'mv' if clean else 'cp'
permission_to_check = os.R_OK if command == 'cp' else os.W_OK
manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, command)
items = files_to_copy if file_list else source_wrapper.get_items()
items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check,

if verify_destination or file_list:
items = files_to_copy if file_list else source_wrapper.get_items()
cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude,
force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads)
else:
items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE)
while True:
cls._transfer(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include,
exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags,
io_threads)
if not next_token:
return
items_part, next_token = source_wrapper.get_paging_items(next_token=next_token, page_size=BATCH_SIZE)

@classmethod
def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination, threads,
clean, tags, io_threads):
items = cls._filter_items(items_part, manager, source_wrapper, destination_wrapper, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination)
sorted_items = list()
transfer_results = []
Expand Down Expand Up @@ -140,7 +159,7 @@ def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, perm
continue
if source_wrapper.is_file() and not source_wrapper.path == full_path:
continue
if not include and not exclude:
if not include and not exclude and not skip_existing and not verify_destination:
if not source_wrapper.is_file():
possible_folder_name = source_wrapper.path_with_trailing_separator()
# if operation from source root
Expand Down
3 changes: 3 additions & 0 deletions pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
absolute_items = [self._to_storage_item(blob) for blob in blobs_generator]
return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items]

def get_paging_items(self, relative_path, next_token, page_size):
return self.get_items(relative_path), None

def get_summary(self, relative_path=None):
prefix = StorageOperations.get_prefix(relative_path)
blobs_generator = self.service.list_blobs(self.bucket.path,
Expand Down
4 changes: 4 additions & 0 deletions pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ def get_items(self, relative_path):
item_relative_path = StorageOperations.get_item_name(item.name, prefix + StorageOperations.PATH_SEPARATOR)
yield ('File', item.name, item_relative_path, item.size)

@abstractmethod
def get_paging_items(self, relative_path, next_token, page_size):
pass

def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR):
prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + delimiter
for item in self.list_items(prefix, show_all=True):
Expand Down
3 changes: 3 additions & 0 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ def get_summary(self, relative_path=None):
count += 1
return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size]

def get_paging_items(self, relative_path, next_token, page_size):
return self.get_items(relative_path), None

def get_summary_with_depth(self, max_depth, relative_path=None):
prefix = StorageOperations.get_prefix(relative_path)
bucket = self.client.bucket(self.bucket.path)
Expand Down
37 changes: 37 additions & 0 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,10 @@ def get_folder_object(self, name):
def get_items(self, relative_path):
return S3BucketOperations.get_items(self.bucket, session=self.session)

def get_paging_items(self, relative_path, next_token, page_size):
return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token,
session=self.session)

def get_file_tags(self, relative_path):
return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager(
self.session, self.bucket, self.region_name), relative_path)
Expand Down Expand Up @@ -932,6 +936,39 @@ def get_items(cls, storage_wrapper, session=None):
continue
yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])

@classmethod
def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None):
if session is None:
session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp')

delimiter = S3BucketOperations.S3_PATH_SEPARATOR
client = cls._get_client(session, storage_wrapper.bucket.region)
paginator = client.get_paginator('list_objects_v2')
prefix = cls.get_prefix(delimiter, storage_wrapper.path)
operation_parameters = {
'Bucket': storage_wrapper.bucket.path,
'Prefix': prefix,
'PaginationConfig':
{
'PageSize': page_size,
'MaxKeys': page_size
}
}

if next_token:
operation_parameters['ContinuationToken'] = next_token

page_iterator = paginator.paginate(**operation_parameters)
res = []
for page in page_iterator:
if 'Contents' in page:
for file in page['Contents']:
name = cls.get_item_name(file['Key'], prefix=prefix)
if name.endswith(delimiter):
continue
res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']))
return res, page.get('NextContinuationToken', None) if page else None

@classmethod
def path_exists(cls, storage_wrapper, relative_path, session=None):
delimiter = S3BucketOperations.S3_PATH_SEPARATOR
Expand Down

0 comments on commit c4cdf8d

Please sign in to comment.