diff --git a/ext/amazon.py b/ext/amazon.py index a3c6bf1..7ed3c3d 100644 --- a/ext/amazon.py +++ b/ext/amazon.py @@ -75,6 +75,7 @@ def main_command(session, url, email, password, LOG_LEVEL, quality, vrange): profile = "default" vcodec = "H265" # default bitrate = "CBR" # default + single = False # Force single episode/season instead of getting series ASIN, vrange = vrange vquality = None device_id = None @@ -181,12 +182,12 @@ class Types(Enum): logger.debug(f"Device_token: {device_token}", extra={"service_name": "Amazon"}) #logger.error("Failed to get Title Metadata, Episode Type Data | Reason: Authorization is invalid", extra={"service_name": "Amazon"}) - status, meta_response = amazon_downloader.get_titles(session) - if status == False: - logger.error("Failed to Get Series Json", extra={"service_name": "Amazon"}) - exit(1) - else: - title_name = meta_response["titleName"] + meta_response = amazon_downloader.get_titles(session, title, single, vcodec, bitrate, vquality) + #title_name = meta_response["titleName"] + logger.info("Get Title for Season", extra={"service_name": "Amazon"}) + for title in meta_response: + if title["type"] == "TV": + logger.info(" + {tv_title}_S{season:02}{episode_name}".format(tv_title=title["name"],season=title["season"] or 0, episode=title["episode"] or 0, episode_name=f" - {title["episode_name"]}" if title["episode_name"] else ""), extra={"service_name": "Amazon"}) except Exception as error: import traceback diff --git a/ext/utils/amazon.py b/ext/utils/amazon.py index 70f47bd..e3b117a 100644 --- a/ext/utils/amazon.py +++ b/ext/utils/amazon.py @@ -7,13 +7,11 @@ import requests import logging import jsonpickle -import unicodedata -from enum import Enum from pathlib import Path from langcodes import Language -from unidecode import unidecode from tldextract import tldextract -from urllib.parse import urlencode +from collections import defaultdict +from urllib.parse import urlencode, quote from http.cookiejar import MozillaCookieJar @@ -21,6 +19,14 @@ class Amazon_downloader: def __init__(self, session, pv_status): self.session = session self.service = "Amazon" + self.profile = None + self.domain_region = None + self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd" # browser client id + self.VIDEO_RANGE_MAP = { + "SDR": "None", + "HDR10": "Hdr10", + "DV": "DolbyVision", + } self.pv = pv_status # if url is primevideo self.region = { "us": { @@ -176,7 +182,9 @@ def get_region(self) -> dict: pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}") region["base_manifest"] = f"{pv_region}.primevideo.com" region["base"] = "www.primevideo.com" - + + self.domain_region = domain_region + return region, None, self.cookies def prepare_endpoint(self, name: str, uri: str, region: dict) -> str: if name in ("browse", "playback", "licence", "xray"): @@ -195,6 +203,7 @@ def prepare_endpoints(self, region: dict) -> dict: return {k: self.prepare_endpoint(k, v, region) for k, v in self.endpoints.items()} def get_device(self, profile, endpoints): self.endpoints = endpoints + self.profile = profile return (self.device or {}).get(profile, {}) def register_device(self, session, profile, logger): @@ -215,7 +224,10 @@ def register_device(self, session, profile, logger): raise logger.error(f" - A device serial is required in the config, perhaps use: {os.urandom(8).hex()}", extra={"service_name": "Amazon"}) return self.device_id, self.device_token - def get_titles(self): + def get_titles(self, session, title_id, single, vcodec, bitrate, vquality): + self.session = session + self.title = title_id + self.single = single res = self.session.get( url=self.endpoints["details"], params={ @@ -229,92 +241,310 @@ def get_titles(self): ) if not res.ok: - raise self.log.exit(f"Unable to get title: {res.text} [{res.status_code}]") + raise print(f"Unable to get title: {res.text} [{res.status_code}]") data = res.json()["widgets"] product_details = data.get("productDetails", {}).get("detail") if not product_details: error = res.json()["degradations"][0] - raise self.log.exit(f"Unable to get title: {error['message']} [{error['code']}]") + raise print(f"Unable to get title: {error['message']} [{error['code']}]") titles = [] if data["pageContext"]["subPageType"] == "Movie": card = data["productDetails"]["detail"] - titles.append(Title( - id_=card["catalogId"], - type_=Title.Types.MOVIE, - name=product_details["title"], - #year=card["releaseYear"], - year=card.get("releaseYear", ""), - # language is obtained afterward - original_lang=None, - source=self.ALIASES[0], - service_data=card - )) - else: - cards = [ - x["detail"] - for x in data["titleContent"][0]["cards"] - if not self.single or - (self.single and self.title in data["self"]["asins"]) or - (self.single and self.title in x["self"]["asins"]) - ] - for card in cards: - episode_number = card.get("episodeNumber", 0) - if episode_number != 0: - titles.append(Title( - id_=card["catalogId"], - type_=Title.Types.TV, - name=product_details["parentTitle"], - season=product_details["seasonNumber"], - episode=episode_number, - episode_name=card["title"], - # language is obtained afterward - original_lang=None, - source=self.ALIASES[0], - service_data=card - )) + temp_json = {} + temp_json["id"] = card["catalogId"] + temp_json["type"] = "Movie" + temp_json["name"] = product_details["title"] + temp_json["year"] = card.get("releaseYear", "") + titles.append(temp_json) + else: + if not data["titleContent"]: + episodes = data["episodeList"]["episodes"] + for episode in episodes: + details = episode["detail"] + #titles.append( + # Title( + # id_=details["catalogId"], + # type_=Title.Types.TV, + # name=product_details["parentTitle"], + # season=data["productDetails"]["detail"]["seasonNumber"], + # episode=episode["self"]["sequenceNumber"], + # episode_name=details["title"], + # # language is obtained afterward + # original_lang=None, + # source=self.ALIASES[0], + # service_data=details, + # ) + #) + temp_json = {} + temp_json["id"] = details["catalogId"] + temp_json["type"] = "TV" + temp_json["name"] = product_details["title"] + temp_json["season"] = data["productDetails"]["detail"]["seasonNumber"] + temp_json["episode"] = episode["self"]["sequenceNumber"] + temp_json["episode_name"] = details["title"] + temp_json["year"] = details["releaseYear"] + titles.append(temp_json) + if len(titles) == 25: + page_count = 1 + pagination_data = data.get('episodeList', {}).get('actions', {}).get('pagination', []) + token = next((quote(item.get('token')) for item in pagination_data if item.get('tokenType') == 'NextPage'), None) + while True: + page_count += 1 + res = self.session.get( + url=self.endpoints["getDetailWidgets"], + params={ + "titleID": self.title, + "isTvodOnRow": "1", + "widgets": f'[{{"widgetType":"EpisodeList","widgetToken":"{token}"}}]' + }, + headers={ + "Accept": "application/json" + } + ).json() + episodeList = res['widgets'].get('episodeList', {}) + for item in episodeList.get('episodes', []): + episode = int(item.get('self', {}).get('sequenceNumber', {})) + #titles.append(Title( + # id_=item["detail"]["catalogId"], + # type_=Title.Types.TV, + # name=product_details["parentTitle"], + # season=product_details["seasonNumber"], + # episode=episode, + # episode_name=item["detail"]["title"], + # # language is obtained afterward + # original_lang=None, + # source=self.ALIASES[0], + # service_data=item + #)) + temp_json = {} + temp_json["id"] = item["detail"]["catalogId"] + temp_json["type"] = "TV" + temp_json["name"] = product_details["parentTitle"] + temp_json["season"] = product_details["seasonNumber"] + temp_json["episode"] = episode + temp_json["episode_name"] = item["detail"]["title"] + temp_json["year"] = item["detail"]["releaseYear"] + titles.append(temp_json) + pagination_data = res['widgets'].get('episodeList', {}).get('actions', {}).get('pagination', []) + token = next((quote(item.get('token')) for item in pagination_data if item.get('tokenType') == 'NextPage'), None) + if not token: + break + else: + cards = [ + x["detail"] + for x in data["titleContent"][0]["cards"] + if not self.single or + (self.single and self.title in data["self"]["asins"]) or (self.single and self.title in data["self"]["compactGTI"]) or + (self.single and self.title in x["self"]["asins"]) or (self.single and self.title == x["detail"]["catalogId"]) + ] + for card in cards: + episode_number = card.get("episodeNumber", 0) + if episode_number != 0: + #titles.append(Title( + # id_=card["catalogId"], + # type_=Title.Types.TV, + # name=product_details["parentTitle"], + # season=product_details["seasonNumber"], + # episode=episode_number, + # episode_name=card["title"], + # # language is obtained afterward + # original_lang=None, + # source=self.ALIASES[0], + # service_data=card + #)) + temp_json = {} + temp_json["id"] = card["catalogId"] + temp_json["type"] = "TV" + temp_json["name"] = product_details["parentTitle"] + temp_json["season"] = product_details["seasonNumber"] + temp_json["episode"] = episode_number + temp_json["episode_name"] = card["title"] + temp_json["year"] = card.get("releaseYear", "") + titles.append(temp_json) + if not self.single: temp_title = self.title temp_single = self.single - + self.single = True - for season in data["seasonSelector"]: - try: - if data["self"]["asins"][0] in season["self"]["asins"]: - continue - self.title = season["self"]["asins"][0] - except: - if data["self"]["asins"][0] in season["titleID"]: + for season in data.get('seasonSelector', []): + season_link = season["seasonLink"] + match = re.search(r'/([a-zA-Z0-9]+)\/ref=', season_link) #extract other season id using re + if match: + extracted_value = match.group(1) + if data["self"]["compactGTI"] == extracted_value: #skip entered asin season data and grab rest id's continue - self.title = season["titleID"] - for title in self.get_titles(): - titles.append(title) - + + self.title = extracted_value + for title in self.get_titles(self.session, self.title, self.single, vcodec, bitrate, vquality): + titles.append(title) + self.title = temp_title self.single = temp_single - + if titles: # TODO: Needs playback permission on first title, title needs to be available original_lang = self.get_original_language(self.get_manifest( - next((x for x in titles if x.type == Title.Types.MOVIE or x.episode > 0), titles[0]), - video_codec=self.vcodec, - bitrate_mode=self.bitrate, - quality=self.vquality, + next((x for x in titles if x["type"] == "Movie" or x["episode"] > 0), titles[0]), + video_codec=vcodec, + bitrate_mode=bitrate, + quality=vquality, ignore_errors=True )) if original_lang: for title in titles: - title.original_lang = Language.get(original_lang) + title["original_lang"] = Language.get(original_lang) else: #self.log.warning(" - Unable to obtain the title's original language, setting 'en' default...") for title in titles: - title.original_lang = Language.get("en") + title["original_lang"] = Language.get("en") + filtered_titles = [] + season_episode_count = defaultdict(int) + for title in titles: + key = (title["season"], title["episode"]) + if season_episode_count[key] < 1: + filtered_titles.append(title) + season_episode_count[key] += 1 + + titles = filtered_titles + + print(titles) + return titles + def get_manifest( + self, title: json, video_codec: str, bitrate_mode: str, quality: str, hdr=None, + ignore_errors: bool = False + ) -> dict: + res = self.session.get( + url=self.endpoints["playback"], + params={ + "asin": title["id"], + "consumptionType": "Streaming", + "desiredResources": ",".join([ + "PlaybackUrls", + "AudioVideoUrls", + "CatalogMetadata", + "ForcedNarratives", + "SubtitlePresets", + "SubtitleUrls", + "TransitionTimecodes", + "TrickplayUrls", + "CuepointPlaylist", + "XRayMetadata", + "PlaybackSettings", + ]), + "deviceID": self.device_id, + "deviceTypeID": self.device[self.profile]["device_type"], + "firmware": 1, + "gascEnabled": str(self.pv).lower(), + "marketplaceID": self.region[self.domain_region]["marketplace_id"], + "resourceUsage": "CacheResources", + "videoMaterialType": "Feature", + "playerType": "html5", + "clientId": self.client_id, + **({ + "operatingSystemName": "Linux" if quality == "SD" else "Windows", + "operatingSystemVersion": "unknown" if quality == "SD" else "10.0", + } if not self.device_token else {}), + "deviceDrmOverride": "CENC", + "deviceStreamingTechnologyOverride": "DASH", + "deviceProtocolOverride": "Https", + "deviceVideoCodecOverride": video_codec, + "deviceBitrateAdaptationsOverride": bitrate_mode.replace("+", ","), + "deviceVideoQualityOverride": quality, + "deviceHdrFormatsOverride": self.VIDEO_RANGE_MAP.get(hdr, "None"), + "supportedDRMKeyScheme": "DUAL_KEY", # ? + "liveManifestType": "live,accumulating", # ? + "titleDecorationScheme": "primary-content", + "subtitleFormat": "TTMLv2", + "languageFeature": "MLFv2", # ? + "uxLocale": "en_US", + "xrayDeviceClass": "normal", + "xrayPlaybackMode": "playback", + "xrayToken": "XRAY_WEB_2020_V1", + "playbackSettingsFormatVersion": "1.0.0", + "playerAttributes": json.dumps({"frameRate": "HFR"}), + # possibly old/unused/does nothing: + "audioTrackId": "all", + }, + headers={ + "Authorization": f"Bearer {self.device_token}" if self.device_token else None, + }, + ) + try: + manifest = res.json() + except json.JSONDecodeError: + if ignore_errors: + return {} + + raise print(" - Amazon didn't return JSON data when obtaining the Playback Manifest.") + + if "error" in manifest: + if ignore_errors: + return {} + raise print(" - Amazon reported an error when obtaining the Playback Manifest.") + + # Commented out as we move the rights exception check elsewhere + # if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: + # if ignore_errors: + # return {} + # raise print(" - The profile used does not have the rights to this title.") + + # Below checks ignore NoRights errors + + if ( + manifest.get("errorsByResource", {}).get("PlaybackUrls") and + manifest["errorsByResource"]["PlaybackUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["PlaybackUrls"] + raise print(f" - Amazon had an error with the Playback Urls: {error['message']} [{error['errorCode']}]") + + if ( + manifest.get("errorsByResource", {}).get("AudioVideoUrls") and + manifest["errorsByResource"]["AudioVideoUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["AudioVideoUrls"] + raise print(f" - Amazon had an error with the A/V Urls: {error['message']} [{error['errorCode']}]") + + return manifest + + def get_original_language(self, manifest): + """Get a title's original language from manifest data.""" + try: + return next( + x["language"].replace("_", "-") + for x in manifest["catalogMetadata"]["playback"]["audioTracks"] + if x["isOriginalLanguage"] + ) + except (KeyError, StopIteration): + pass + + if "defaultAudioTrackId" in manifest.get("playbackUrls", {}): + try: + return manifest["playbackUrls"]["defaultAudioTrackId"].split("_")[0] + except IndexError: + pass + + try: + return sorted( + manifest["audioVideoUrls"]["audioTrackMetadata"], + key=lambda x: x["index"] + )[0]["languageCode"] + except (KeyError, IndexError): + pass + + return None + class DeviceRegistration: def __init__(self, device: dict, endpoints: dict, cache_path: Path, session: requests.Session, log: logging.Logger):