Skip to content

Commit

Permalink
Issue #2574: pipe storage cp shall start data upload before traversin…
Browse files Browse the repository at this point in the history
…g full source hierarchy (S3 and Local provider) (#2597)
  • Loading branch information
ekazachkova authored Jul 9, 2024
1 parent 59c40be commit e73c60c
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 61 deletions.
2 changes: 2 additions & 0 deletions pipe-cli/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def cli():
CP_LOGGING_LEVEL Explicit logging level: CRITICAL, ERROR, WARNING, INFO or DEBUG. Defaults to ERROR.
CP_LOGGING_FORMAT Explicit logging format. Default is `%(asctime)s:%(levelname)s: %(message)s`
CP_TRACE=[True|False] Enables verbose errors.
CP_CLI_STORAGE_BATCH_SIZE The number of objects per request for pipe storage operations (Default: 1000)
CP_CLI_STORAGE_ASYNC_BATCH_ENABLE Enables asynchronous batch transfer
"""
pass

Expand Down
25 changes: 13 additions & 12 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,17 +522,18 @@ def get_items(self, quiet=False):

if os.path.isfile(self.path):
if os.path.islink(self.path) and self.symlinks == AllowedSymlinkValues.SKIP:
return []
return [(FILE, self.path, self._leaf_path(self.path), os.path.getsize(self.path), None)]

return self._list_items(self.path, self._leaf_path(self.path), result=[], visited_symlinks=set(),
root=True, quiet=quiet)
return
yield (FILE, self.path, self._leaf_path(self.path), os.path.getsize(self.path), None)
else:
for item in self._list_items(self.path, self._leaf_path(self.path), visited_symlinks=set(), root=True,
quiet=quiet):
yield item

def _leaf_path(self, source_path):
head, tail = os.path.split(source_path)
return tail or os.path.basename(head)

def _list_items(self, path, parent, result, visited_symlinks, root, quiet):
def _list_items(self, path, parent, visited_symlinks, root, quiet):
logging.debug(u'Listing directory {}...'.format(path))
path = to_unicode(path)
parent = to_unicode(parent)
Expand All @@ -542,7 +543,8 @@ def _list_items(self, path, parent, result, visited_symlinks, root, quiet):
while attempts > 0:
attempts -= 1
try:
self._collect_item(path, parent, item, result, visited_symlinks, root, quiet)
for result_item in self._collect_item(path, parent, item, visited_symlinks, root, quiet):
yield result_item
break
except DefectiveFileSystemError:
if attempts > 0:
Expand All @@ -552,8 +554,6 @@ def _list_items(self, path, parent, result, visited_symlinks, root, quiet):
else:
raise

return result

def _os_listdir(self, path):
try:
return os.listdir(to_string(path))
Expand All @@ -564,7 +564,7 @@ def _os_listdir(self, path):
raise DefectiveFileSystemError(err_msg)
raise

def _collect_item(self, path, parent, item, result, visited_symlinks, root, quiet):
def _collect_item(self, path, parent, item, visited_symlinks, root, quiet):
safe_item = to_unicode(item, replacing=True)
safe_absolute_path = os.path.join(path, safe_item)

Expand Down Expand Up @@ -602,9 +602,10 @@ def _collect_item(self, path, parent, item, result, visited_symlinks, root, quie

if os.path.isfile(to_string(absolute_path)):
logging.debug(u'Collected file {}.'.format(safe_absolute_path))
result.append((FILE, absolute_path, relative_path, os.path.getsize(to_string(absolute_path)), None))
yield tuple([FILE, absolute_path, relative_path, os.path.getsize(to_string(absolute_path)), None])
elif os.path.isdir(to_string(absolute_path)):
self._list_items(absolute_path, relative_path, result, visited_symlinks, root=False, quiet=quiet)
for folder_item in self._list_items(absolute_path, relative_path, visited_symlinks, root=False, quiet=quiet):
yield folder_item

if symlink_target and os.path.islink(to_string(path)) and symlink_target in visited_symlinks:
visited_symlinks.remove(symlink_target)
Expand Down
Loading

0 comments on commit e73c60c

Please sign in to comment.