Skip to content

Commit

Permalink
Update share functions to be more effective
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterDing committed Apr 10, 2024
1 parent ab31b26 commit a5a9032
Showing 1 changed file with 79 additions and 92 deletions.
171 changes: 79 additions & 92 deletions alipcs_py/commands/share.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import List, Dict, Set, Union
from pathlib import PurePosixPath
from collections import deque
import re

from alipcs_py.alipcs import AliPCSApi, PcsFile
Expand Down Expand Up @@ -65,79 +63,47 @@ def _extract_file_id(share_url: str) -> str:
return m.group(1) if m else ""


def save_shared_by_url(api: AliPCSApi, remotedir: str, share_url: str, password: str = ""):
def extract_shared_info_from_url(share_url: str):
share_url = _redirect(share_url)
share_id = _extract_share_id(share_url)
file_id = _extract_file_id(share_url)
file_ids = [file_id] if file_id else []

assert share_id

save_shared_by_file_ids(api, remotedir, share_id, file_ids, password=password)
return share_id, file_id


def save_shared_by_file_ids(
api: AliPCSApi,
remotedir: str,
share_id: str,
file_ids: List[str],
password: str = "",
def save_shared_files_to_remotedir(
api: AliPCSApi, shared_pcs_files: List[PcsFile], share_id: str, remote_pcs_file: PcsFile
):
"""Save shared files to the remote directory `remotedir`. Ignore existed files."""

assert share_id

api.get_share_token(share_id, share_password=password)

file_ids = file_ids or ["root"]

shared_pcs_files = deque()
for file_id in file_ids:
pf = api.get_file(file_id=file_id, share_id=share_id)
if pf is not None:
if pf.is_root():
# No need to save root directory, save its sub files/directories
shared_pcs_files.extend(api.list_iter(file_id, share_id=share_id, recursive=False, include_dir=True))
else:
shared_pcs_files.append(pf)

# Record the remote directory of each shared_file
shared_file_id_to_remotedir: Dict[str, str] = {}
wanted_pcs_files = []
remain_pcs_files = []
for sp in shared_pcs_files:
shared_file_id_to_remotedir[sp.file_id] = remotedir

# Map the remote directory to its pcs_file
remotedir_to_its_pcs_file: Dict[str, PcsFile] = {}

while shared_pcs_files:
shared_file = shared_pcs_files.popleft()
remote_dir = shared_file_id_to_remotedir[shared_file.file_id]

# Make sure remote directory exists
if remote_dir not in remotedir_to_its_pcs_file:
remotedir_to_its_pcs_file[remote_dir] = api.makedir_path(remote_dir)[0]
dest_pcs_file = remotedir_to_its_pcs_file[remote_dir]

if not shared_file.is_root() and not remotepath_exists(api, shared_file.name, dest_pcs_file.file_id):
api.transfer_shared_files(
[shared_file.file_id],
dest_pcs_file.file_id,
share_id,
auto_rename=False,
)
print(f"save: `{shared_file.path}` to `{remote_dir}`")
if not remotepath_exists(api, sp.name, remote_pcs_file.file_id):
wanted_pcs_files.append(sp)
else:
remain_pcs_files.append(sp)

if wanted_pcs_files:
api.transfer_shared_files(
[sp.file_id for sp in wanted_pcs_files],
remote_pcs_file.file_id,
share_id,
auto_rename=False,
)
for sp in wanted_pcs_files:
print(f"save: `{sp.path}` to `{remote_pcs_file.path}`")

for sp in remain_pcs_files:
if sp.is_file:
print(f"[yellow]WARNING[/]: `{sp.path}` has be in `{remote_pcs_file.path}`")
else:
# Ignore existed file
if shared_file.is_file:
print(f"[yellow]WARNING[/]: `{shared_file.path}` has be in `{remote_dir}`")
continue
else: # shared_file.is_dir
sub_files = list(api.list_iter(shared_file.file_id, share_id=share_id))
remote_dir_pcs_file = api.get_file(remotepath=f"{remote_pcs_file.path}/{sp.name}")
if remote_dir_pcs_file is None:
remote_dir_pcs_file = api.makedir(remote_pcs_file.file_id, sp.name)

remote_dir = (PurePosixPath(remote_dir) / shared_file.name).as_posix()
for sp in sub_files:
shared_file_id_to_remotedir[sp.file_id] = remote_dir
shared_pcs_files.extendleft(sub_files[::-1])
sub_files = list(api.list_iter(sp.file_id, share_id=share_id, recursive=False, include_dir=True))
save_shared_files_to_remotedir(api, sub_files, share_id, remote_dir_pcs_file)


def save_shared(
Expand All @@ -154,10 +120,42 @@ def save_shared(

assert int(bool(share_id)) ^ int(bool(share_url)), "`share_id` and `share_url` only can be given one"

# 1. Get shared info
if share_url:
save_shared_by_url(api, remotedir, share_url, password=password)
share_id, file_id = extract_shared_info_from_url(share_url)
if file_id:
file_ids.append(file_id)
file_ids = list(set(file_ids))

# Default save the sub files in root directory
if not file_ids:
file_ids = ["root"]
assert share_id

# 2. Get shared link token
api.get_share_token(share_id, share_password=password)

# 3. Make sure the remote directory exists
remote_pcs_file = api.get_file(remotepath=remotedir)
if remote_pcs_file is not None:
if not remote_pcs_file.is_dir:
print(f"[yellow]WARNING[/]: `{remotedir}` is not a directory")
return
else:
save_shared_by_file_ids(api, remotedir, share_id, file_ids, password=password)
remote_pcs_file = api.makedir_path(remotedir)[0]

# 4. Get shared files and save them to the remote directory
shared_pcs_files = []
for file_id in file_ids:
shared_pcs_file = api.get_file(file_id=file_id, share_id=share_id)
if shared_pcs_file is not None:
if shared_pcs_file.is_root():
# No need to save root directory, save its sub files/directories
shared_pcs_files.extend(api.list_iter(file_id, share_id=share_id, recursive=False, include_dir=True))
else:
shared_pcs_files.append(shared_pcs_file)

save_shared_files_to_remotedir(api, shared_pcs_files, share_id, remote_pcs_file)


def list_shared_files(
Expand Down Expand Up @@ -187,19 +185,16 @@ def list_shared_files(

assert int(bool(share_id)) ^ int(bool(share_url)), "`share_id` and `share_url` only can be given one"

share_url = _redirect(share_url)

if share_url:
share_id = _extract_share_id(share_url)
if not file_ids:
file_id = _extract_file_id(share_url)
file_ids = [file_id] if file_id else []
share_id, file_id = extract_shared_info_from_url(share_url)
if file_id:
file_ids.append(file_id)
file_ids = list(set(file_ids))
assert share_id

if not remotepaths and not file_ids:
return

assert share_id

api.get_share_token(share_id, share_password=password)

list_files(
Expand Down Expand Up @@ -258,19 +253,16 @@ def download_shared(

assert int(bool(share_id)) ^ int(bool(share_url)), "`share_id` and `share_url` only can be given one"

share_url = _redirect(share_url)

if share_url:
share_id = _extract_share_id(share_url)
if not file_ids:
file_id = _extract_file_id(share_url)
file_ids = [file_id] if file_id else []
share_id, file_id = extract_shared_info_from_url(share_url)
if file_id:
file_ids.append(file_id)
file_ids = list(set(file_ids))
assert share_id

if not remotepaths and not file_ids:
return

assert share_id

api.get_share_token(share_id, share_password=password)

download(
Expand Down Expand Up @@ -314,13 +306,12 @@ def play_shared(

assert int(bool(share_id)) ^ int(bool(share_url)), "`share_id` and `share_url` only can be given one"

share_url = _redirect(share_url)

if share_url:
share_id = _extract_share_id(share_url)
if not file_ids:
file_id = _extract_file_id(share_url)
file_ids = [file_id] if file_id else []
share_id, file_id = extract_shared_info_from_url(share_url)
if file_id:
file_ids.append(file_id)
file_ids = list(set(file_ids))
assert share_id

if not remotepaths and not file_ids:
return
Expand Down Expand Up @@ -352,11 +343,7 @@ def get_share_token(api: AliPCSApi, share_id: str, share_url: str = "", password

assert int(bool(share_id)) ^ int(bool(share_url)), "`share_id` and `share_url` only can be given one"

share_url = _redirect(share_url)

if share_url:
share_id = _extract_share_id(share_url)

share_id, _ = extract_shared_info_from_url(share_url)
assert share_id

return api.get_share_token(share_id, share_password=password)

0 comments on commit a5a9032

Please sign in to comment.