forked from Rongronggg9/RSS-to-Telegram-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
compat.py
255 lines (208 loc) · 9.28 KB
/
compat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# RSS to Telegram Bot
# Copyright (C) 2022-2024 Rongrong <i@rong.moe>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import sys
_version_info = sys.version_info
if _version_info < (3, 9):
raise RuntimeError("This bot requires Python 3.9 or later")
from typing import Callable
from typing_extensions import Final
import copy
import functools
import itertools
import listparser.opml
import listparser.common
from aiohttp import ClientResponse
from bs4 import BeautifulSoup
from cachetools.keys import hashkey
from minify_html import minify as minify_html
try:
from minify_html_onepass import minify as minify_html_onepass
except ImportError:
minify_html_onepass = None
import ssl
from contextlib import AbstractContextManager, AbstractAsyncContextManager, suppress
# all supported architectures are 64-bit, so the below constants will be a native int (efficient)
INT64_T_MAX: Final = 2 ** 63 - 1
# backport `contextlib.nullcontext` for Python 3.9
if _version_info[1] >= 10:
# noinspection PyUnresolvedReferences
from contextlib import nullcontext
else:
# noinspection SpellCheckingInspection
class nullcontext(AbstractContextManager, AbstractAsyncContextManager):
"""Backported `contextlib.nullcontext` from Python 3.10"""
def __init__(self, enter_result=None):
self.enter_result = enter_result
def __enter__(self):
return self.enter_result
def __exit__(self, *excinfo):
pass
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, exc_type, exc_value, traceback):
pass
class AiohttpUvloopTransportHotfix(AbstractAsyncContextManager):
def __init__(self, response: ClientResponse):
self.transport = response.connection and response.connection.transport
async def __aexit__(self, exc_type, exc_value, traceback):
if self.transport:
self.transport.abort()
def ssl_create_default_context():
"""
Python 3.10+ disabled some legacy cipher, while some websites still use them.
The function will merge the default cipher list with the one from Python 3.9.
Some distributions (e.g., Debian) set `PY_SSL_DEFAULT_CIPHERS=2` for Python 3.11+,
effectively re-enabling all these legacy ciphers.
So we can assume that re-enabling them is not a major security issue.
"""
context = ssl.create_default_context()
if _version_info[1] >= 10: # Python 3.10+ also disabled TLS 1.1, here we only care about cipher
py39_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# https://github.com/python/cpython/blob/50c21ad35372983680b44130be560d856c5f27ca/Modules/_ssl.c#L163
py39_ctx.set_ciphers('DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK')
context.set_ciphers(':'.join(set(map(
lambda cipher: cipher['name'],
itertools.chain(py39_ctx.get_ciphers(), context.get_ciphers())
))))
return context
def _parsing_utils_html_validator_minify_preprocess(html: str, drop_sr_only: bool) -> str:
# fix malformed HTML first, since minify-html is not so robust
# (resulting in RecursionError or unexpected format while html_parser parsing the minified HTML)
# https://github.com/wilsonzlin/minify-html/issues/86
soup = BeautifulSoup(html, 'lxml')
if drop_sr_only:
for tag in soup.find_all(attrs={'class': 'sr-only'}):
with suppress(ValueError, AttributeError):
tag.decompose()
html = str(soup)
soup.decompose()
return html
def parsing_utils_html_validator_minify(html: str) -> str:
contains_sr_only = 'sr-only' in html
preprocessed = False
if (
minify_html_onepass is None # requires minify-html-onepass to workaround upstream issue
or
contains_sr_only # clear sr-only first, otherwise minify-html cannot strip spaces around them
):
html = _parsing_utils_html_validator_minify_preprocess(html, contains_sr_only)
preprocessed = True
if minify_html_onepass is not None:
try:
# workaround for https://github.com/wilsonzlin/minify-html/issues/86#issuecomment-1237677552
# minify-html-onepass does not allow invalid closing tags
return minify_html_onepass(html)
except SyntaxError:
if not preprocessed:
html = _parsing_utils_html_validator_minify_preprocess(html, contains_sr_only)
return minify_html(html)
def cached_async(cache, key=hashkey):
"""
https://github.com/tkem/cachetools/commit/3f073633ed4f36f05b57838a3e5655e14d3e3524
"""
def decorator(func):
if cache is None:
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
else:
async def wrapper(*args, **kwargs):
k = key(*args, **kwargs)
with suppress(KeyError): # key not found
return cache[k]
v = await func(*args, **kwargs)
with suppress(ValueError): # value too large
cache[k] = v
return v
return functools.update_wrapper(wrapper, func)
return decorator
def bozo_exception_removal_wrapper(func: Callable, *args, **kwargs):
"""
bozo_exception is un-pickle-able, preventing ret from returning from ProcessPoolExecutor, so remove it
"""
ret = func(*args, **kwargs)
if ret.get('bozo_exception'):
del ret['bozo_exception']
return ret
class OpmlMixin(listparser.opml.OpmlMixin):
"""
Monkey-patching `listparser.opml.OpmlMixin` to support `text` and `title_orig`
https://github.com/kurtmckee/listparser/issues/71
Originated from listparser v0.20 (MIT License)
https://github.com/kurtmckee/listparser/blob/v0.20/src/listparser/opml.py#L21-L76
Copyright 2009-2024 Kurt McKee <contactme@kurtmckee.org>
Copyright 2023-2024 RSS to Telegram Bot contributors
Distributed along with RSS to Telegram Bot under AGPLv3 License
"""
def start_opml_outline(self, attrs: dict[str, str]) -> None:
# Find an appropriate title in @text or @title (else empty)
# ================ DIFF ================
# if attrs.get("text", "").strip():
# title = attrs["text"].strip()
# else:
# title = attrs.get("title", "").strip()
text = attrs.get("text", "").strip()
title_orig = attrs.get("title", "").strip()
title = text or title_orig
url = None
append_to = None
# Determine whether the outline is a feed or subscription list
if "xmlurl" in attrs:
# It's a feed
url = attrs.get("xmlurl", "").strip()
append_to = "feeds"
if attrs.get("type", "").strip().lower() == "source":
# Actually, it's a subscription list!
append_to = "lists"
elif attrs.get("type", "").lower() in ("link", "include"):
# It's a subscription list
append_to = "lists"
url = attrs.get("url", "").strip()
elif title:
# Assume that this is a grouping node
self.hierarchy.append(title)
return
# Look for an opportunity URL
if not url and "htmlurl" in attrs:
url = attrs["htmlurl"].strip()
append_to = "opportunities"
if not url:
# Maintain the hierarchy
self.hierarchy.append("")
return
if url not in self.found_urls and append_to:
# This is a brand-new URL
# ================ DIFF ================
# obj = common.SuperDict({"url": url, "title": title})
obj = listparser.common.SuperDict({"url": url, "title": title, "text": text, "title_orig": title_orig})
self.found_urls[url] = (append_to, obj)
self.harvest[append_to].append(obj)
else:
obj = self.found_urls[url][1]
# Handle categories and tags
obj.setdefault("categories", [])
if "category" in attrs.keys():
for i in attrs["category"].split(","):
tmp = [j.strip() for j in i.split("/") if j.strip()]
if tmp and tmp not in obj["categories"]:
obj["categories"].append(tmp)
# Copy the current hierarchy into `categories`
if self.hierarchy and self.hierarchy not in obj["categories"]:
obj["categories"].append(copy.copy(self.hierarchy))
# Copy all single-element `categories` into `tags`
obj["tags"] = [i[0] for i in obj["categories"] if len(i) == 1]
self.hierarchy.append("")
listparser.opml.OpmlMixin.start_opml_outline = OpmlMixin.start_opml_outline