Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) #2597

Merged
merged 22 commits into from
Jul 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f319967
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 5, 2022
e8aad3c
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 6, 2022
2255ef4
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 11, 2022
54a9ede
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 11, 2022
de1448b
Merge remote-tracking branch 'origin/develop' into issue_2574-batch-p…
ekazachkova Apr 11, 2022
5a2ed7c
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 12, 2022
c6562e0
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 12, 2022
6e2f3c2
Merge remote-tracking branch 'refs/remotes/origin/develop' into issue…
ekazachkova Jul 2, 2024
284cb13
Merge remote-tracking branch 'refs/remotes/origin/develop' into issue…
ekazachkova Jul 2, 2024
8cc035a
test
ekazachkova Jul 3, 2024
5f64ee7
test
ekazachkova Jul 4, 2024
51da258
test
ekazachkova Jul 4, 2024
5aa1ac5
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
967e252
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
633380a
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
f141842
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
2fedd3e
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
bf08e1b
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
8300564
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
bb6abd4
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 8, 2024
a62beba
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 9, 2024
392162a
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Issue #2574: pipe storage cp shall start data upload before traversin…
…g full source hierarchy (WIP)
ekazachkova committed Apr 5, 2022
commit f319967fd1ab3131d1e71cbd6fe65bb07980f42a
3 changes: 3 additions & 0 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
@@ -233,6 +233,9 @@ def exists(self):
def get_items(self):
return self.get_list_manager().get_items(self.path)

def get_paging_items(self, next_token, page_size):
return self.get_list_manager().get_paging_items(self.path, next_token, page_size)

def is_empty(self, relative=None):
if not self.exists():
return True
29 changes: 26 additions & 3 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
from src.utilities.storage.umount import Umount

FOLDER_MARKER = '.DS_Store'
BATCH_SIZE = 2


class DataStorageOperations(object):
@@ -95,8 +96,26 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags
command = 'mv' if clean else 'cp'
permission_to_check = os.R_OK if command == 'cp' else os.W_OK
manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, command)
items = files_to_copy if file_list else source_wrapper.get_items()
items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check,

if verify_destination or file_list or not cls._is_cloud_source(source_wrapper.get_type()):
items = files_to_copy if file_list else source_wrapper.get_items()
cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude,
force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads)
else:
items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE)
while True:
cls._transfer(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check, include,
exclude, force, quiet, skip_existing, verify_destination, threads, clean, tags,
io_threads)
if not next_token:
return
items_batch, next_token = source_wrapper.get_paging_items(next_token=next_token, page_size=BATCH_SIZE)

@classmethod
def _transfer(cls, source_wrapper, destination_wrapper, items_part, manager, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination, threads,
clean, tags, io_threads):
items = cls._filter_items(items_part, manager, source_wrapper, destination_wrapper, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination)
sorted_items = list()
transfer_results = []
@@ -122,6 +141,10 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags
cls._flush_transfer_results(source_wrapper, destination_wrapper,
transfer_results, clean=clean, flush_size=1)

@classmethod
def _is_cloud_source(cls, source_wrapper_type):
return source_wrapper_type in [WrapperType.S3, WrapperType.AZURE, WrapperType.GS]

@classmethod
def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination):
@@ -140,7 +163,7 @@ def _filter_items(cls, items, manager, source_wrapper, destination_wrapper, perm
continue
if source_wrapper.is_file() and not source_wrapper.path == full_path:
continue
if not include and not exclude:
if not include and not exclude and not skip_existing and not verify_destination:
if not source_wrapper.is_file():
possible_folder_name = source_wrapper.path_with_trailing_separator()
# if operation from source root
3 changes: 3 additions & 0 deletions pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
@@ -88,6 +88,9 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
absolute_items = [self._to_storage_item(blob) for blob in blobs_generator]
return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items]

def get_paging_items(self, relative_path, next_token, page_size):
return self.get_items(relative_path), None

def get_summary(self, relative_path=None):
prefix = StorageOperations.get_prefix(relative_path)
blobs_generator = self.service.list_blobs(self.bucket.path,
4 changes: 4 additions & 0 deletions pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
@@ -329,6 +329,10 @@ def get_items(self, relative_path):
item_relative_path = StorageOperations.get_item_name(item.name, prefix + StorageOperations.PATH_SEPARATOR)
yield ('File', item.name, item_relative_path, item.size)

@abstractmethod
def get_paging_items(self, relative_path, next_token, page_size):
pass

def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR):
prefix = StorageOperations.get_prefix(relative_path).rstrip(delimiter) + delimiter
for item in self.list_items(prefix, show_all=True):
3 changes: 3 additions & 0 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
@@ -437,6 +437,9 @@ def get_summary(self, relative_path=None):
count += 1
return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size]

def get_paging_items(self, relative_path, next_token, page_size):
return self.get_items(relative_path), None

def get_summary_with_depth(self, max_depth, relative_path=None):
prefix = StorageOperations.get_prefix(relative_path)
bucket = self.client.bucket(self.bucket.path)
37 changes: 37 additions & 0 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
@@ -789,6 +789,10 @@ def get_folder_object(self, name):
def get_items(self, relative_path):
return S3BucketOperations.get_items(self.bucket, session=self.session)

def get_paging_items(self, relative_path, next_token, page_size):
return S3BucketOperations.get_paging_items(self.bucket, page_size=page_size, next_token=next_token,
session=self.session)

def get_file_tags(self, relative_path):
return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager(
self.session, self.bucket, self.region_name), relative_path)
@@ -932,6 +936,39 @@ def get_items(cls, storage_wrapper, session=None):
continue
yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'])

@classmethod
def get_paging_items(cls, storage_wrapper, page_size, next_token=None, session=None):
if session is None:
session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp')

delimiter = S3BucketOperations.S3_PATH_SEPARATOR
client = cls._get_client(session, storage_wrapper.bucket.region)
paginator = client.get_paginator('list_objects_v2')
prefix = cls.get_prefix(delimiter, storage_wrapper.path)
operation_parameters = {
'Bucket': storage_wrapper.bucket.path,
'Prefix': prefix,
'PaginationConfig':
{
'PageSize': page_size,
'MaxKeys': page_size
}
}

if next_token:
operation_parameters['ContinuationToken'] = next_token

page_iterator = paginator.paginate(**operation_parameters)
res = []
for page in page_iterator:
if 'Contents' in page:
for file in page['Contents']:
name = cls.get_item_name(file['Key'], prefix=prefix)
if name.endswith(delimiter):
continue
res.append(('File', file['Key'], cls.get_prefix(delimiter, name), file['Size']))
return res, page.get('NextContinuationToken', None) if page else None

@classmethod
def path_exists(cls, storage_wrapper, relative_path, session=None):
delimiter = S3BucketOperations.S3_PATH_SEPARATOR