Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #2574: pipe storage cp shall start data upload before traversing full source hierarchy (S3 provider) #2597

Merged
merged 22 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f319967
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 5, 2022
e8aad3c
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 6, 2022
2255ef4
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 11, 2022
54a9ede
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 11, 2022
de1448b
Merge remote-tracking branch 'origin/develop' into issue_2574-batch-p…
ekazachkova Apr 11, 2022
5a2ed7c
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 12, 2022
c6562e0
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Apr 12, 2022
6e2f3c2
Merge remote-tracking branch 'refs/remotes/origin/develop' into issue…
ekazachkova Jul 2, 2024
284cb13
Merge remote-tracking branch 'refs/remotes/origin/develop' into issue…
ekazachkova Jul 2, 2024
8cc035a
test
ekazachkova Jul 3, 2024
5f64ee7
test
ekazachkova Jul 4, 2024
51da258
test
ekazachkova Jul 4, 2024
5aa1ac5
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
967e252
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
633380a
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
f141842
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 4, 2024
2fedd3e
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
bf08e1b
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
8300564
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 5, 2024
bb6abd4
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 8, 2024
a62beba
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 9, 2024
392162a
Issue #2574: pipe storage cp shall start data upload before traversin…
ekazachkova Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Issue #2574: pipe storage cp shall start data upload before traversin…
…g full source hierarchy - refactor cp/mv paging
  • Loading branch information
ekazachkova committed Apr 12, 2022
commit c6562e090d4c8473430913ef3ae05499b587f180
4 changes: 2 additions & 2 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ def exists(self):
def get_items(self):
return self.get_list_manager().get_items(self.path)

def get_paging_items(self, next_token, page_size):
return self.get_list_manager().get_paging_items(self.path, next_token, page_size)
def get_paging_items(self, start_token, page_size):
return self.get_list_manager().get_paging_items(self.path, start_token=start_token, page_size=page_size)

def is_empty(self, relative=None):
if not self.exists():
Expand Down
6 changes: 3 additions & 3 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags
cls._transfer(source_wrapper, destination_wrapper, items, manager, permission_to_check, include, exclude,
force, quiet, skip_existing, verify_destination, threads, clean, tags, io_threads)
else:
items_batch, next_token = source_wrapper.get_paging_items(next_token=None, page_size=BATCH_SIZE)
items_batch, next_token = source_wrapper.get_paging_items(start_token=None, page_size=BATCH_SIZE)
cls._transfer_batch_items(source_wrapper, destination_wrapper, items_batch, manager, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination, threads,
clean, tags, io_threads, next_token)
Expand Down Expand Up @@ -472,7 +472,7 @@ def __print_data_storage_contents(cls, bucket_model, relative_path, show_details
manager = wrapper.get_list_manager(show_versions=show_versions)
if paging_allowed:
items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive,
page_size=BATCH_SIZE, next_token=None)
page_size=BATCH_SIZE, start_token=None)
else:
items = manager.list_items(relative_path, recursive=recursive, page_size=page_size, show_all=show_all)
else:
Expand Down Expand Up @@ -500,7 +500,7 @@ def __print_paging_storage_contents(cls, manager, bucket_model, items_table, rel
items_table.header = False
while True:
items, next_page_token = manager.list_paging_items(relative_path=relative_path, recursive=recursive,
page_size=BATCH_SIZE, next_token=next_page_token)
page_size=BATCH_SIZE, start_token=next_page_token)
cls.__print_items(bucket_model, items, show_details, items_table, False, show_versions)
if not next_page_token:
click.echo()
Expand Down
6 changes: 5 additions & 1 deletion pipe-cli/src/utilities/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
absolute_items = [self._to_storage_item(blob) for blob in blobs_generator]
return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items]

def get_paging_items(self, relative_path, next_token, page_size):
def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
start_token=None):
return self.list_items(relative_path, recursive, page_size, show_all=False), None

def get_paging_items(self, relative_path, start_token, page_size):
return self.get_items(relative_path), None

def get_summary(self, relative_path=None):
Expand Down
15 changes: 14 additions & 1 deletion pipe-cli/src/utilities/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
"""
pass

@abstractmethod
def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
start_token=None):
"""
Lists files and folders on the specified page by a relative path in the current storage.

:param relative_path: Storage relative path to be listed.
:param recursive: Specifies if the listing has to be recursive.
:param page_size: Max number of items on the page.
:param start_token: Paging continuation token.
"""
pass

@abstractmethod
def get_summary_with_depth(self, max_depth, relative_path=None):
"""
Expand Down Expand Up @@ -330,7 +343,7 @@ def get_items(self, relative_path):
yield ('File', item.name, item_relative_path, item.size)

@abstractmethod
def get_paging_items(self, relative_path, next_token, page_size):
def get_paging_items(self, relative_path, start_token, page_size):
pass

def folder_exists(self, relative_path, delimiter=StorageOperations.PATH_SEPARATOR):
Expand Down
4 changes: 4 additions & 0 deletions pipe-cli/src/utilities/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ def get_summary(self, relative_path=None):
count += 1
return [StorageOperations.PATH_SEPARATOR.join([self.bucket.path, relative_path]), count, size]

def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
start_token=None):
return self.list_items(relative_path, recursive, page_size, show_all=False), None

def get_paging_items(self, relative_path, next_token, page_size):
return self.get_items(relative_path), None

Expand Down
16 changes: 7 additions & 9 deletions pipe-cli/src/utilities/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
AbstractRestoreManager, AbstractTransferManager, TransferResult, UploadResult
from src.config import Config

S3_DEFAULT_BATCH_SIZE = 1000


class UploadedObjectsContainer:

Expand Down Expand Up @@ -520,7 +518,7 @@ def __init__(self, bucket, session, region_name=None):
self.bucket = bucket

def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False,
page_size=S3_DEFAULT_BATCH_SIZE):
page_size=StorageOperations.DEFAULT_PAGE_SIZE):
client = self._get_client()
delimiter = S3BucketOperations.S3_PATH_SEPARATOR
bucket = self.bucket.bucket.path
Expand Down Expand Up @@ -631,8 +629,8 @@ def list_items(self, relative_path=None, recursive=False, page_size=StorageOpera
else:
return self.list_objects(client, prefix, operation_parameters, recursive, page_size)

def list_paging_items(self, relative_path=None, recursive=False, page_size=S3_DEFAULT_BATCH_SIZE,
next_token=None):
def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE,
start_token=None):
delimiter = S3BucketOperations.S3_PATH_SEPARATOR
client = self._get_client()
operation_parameters = {
Expand All @@ -645,7 +643,7 @@ def list_paging_items(self, relative_path=None, recursive=False, page_size=S3_DE
if relative_path:
operation_parameters['Prefix'] = prefix

return self.list_paging_objects(client, prefix, operation_parameters, recursive, next_token)
return self.list_paging_objects(client, prefix, operation_parameters, recursive, start_token)

def get_summary_with_depth(self, max_depth, relative_path=None):
bucket_name = self.bucket.bucket.path
Expand Down Expand Up @@ -779,9 +777,9 @@ def list_objects(self, client, prefix, operation_parameters, recursive, page_siz
break
return items

def list_paging_objects(self, client, prefix, operation_parameters, recursive, next_token):
if next_token:
operation_parameters['ContinuationToken'] = next_token
def list_paging_objects(self, client, prefix, operation_parameters, recursive, start_token):
if start_token:
operation_parameters['ContinuationToken'] = start_token

paginator = client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(**operation_parameters)
Expand Down