-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.py
177 lines (146 loc) · 5.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import requests
import json
import urllib.parse
from subprocess import check_output
from concurrent.futures import as_completed, ThreadPoolExecutor
import time
import os
import string
import random
TOKEN = ''
def assure_folder_exists(folder, root):
full_path = os.path.join(root, folder)
if os.path.isdir(full_path):
pass
else:
os.mkdir(full_path)
return full_path
def random_filename(length, ext):
return ''.join([random.choice(string.ascii_lowercase) for _ in range(length)]) + '.{}'.format(ext)
# TODO: Replace with a named tuple
class File:
def __init__(self, name, link):
self.name = name
self.link = link
def __repr__(self):
return '<F:{}>'.format(self.name)
class StickerDownloader:
def __init__(self, token, session=None, multithreading=4):
self.THREADS = multithreading
self.token = token
self.cwd = assure_folder_exists('downloads', root=os.getcwd())
if session is None:
self.session = requests.Session()
else:
self.session = session
self.api = 'https://api.telegram.org/bot{}/'.format(self.token)
verify = self._api_request('getMe', {})
if verify['ok']:
pass
else:
print('Invalid token.')
exit()
def _api_request(self, fstring, params):
try:
param_string = '?' + urllib.parse.urlencode(params)
res = self.session.get('{}{}{}'.format(self.api, fstring, param_string))
if res.status_code != 200:
raise Exception
res = json.loads(res.content.decode('utf-8'))
if not res['ok']:
raise Exception(res['description'])
return res
except Exception as e:
print('API method {} failed. Error: "{}"'.format(fstring, e))
return None
def get_file(self, file_id):
info = self._api_request('getFile', {'file_id': file_id})
f = File(name=info['result']['file_path'].split('/')[-1],
link='https://api.telegram.org/file/bot{}/{}'.format(self.token, info['result']['file_path']))
return f
def get_sticker_set(self, name):
"""
Get a list of File objects.
:param name:
:return:
"""
params = {'name': name}
res = self._api_request('getStickerSet', params)
if res is None:
return None
stickers = res['result']['stickers']
files = []
print('Starting to scrape "{}" ..'.format(name))
start = time.time()
with ThreadPoolExecutor(max_workers=self.THREADS) as executor:
futures = [executor.submit(self.get_file, i['file_id']) for i in stickers]
for i in as_completed(futures):
files.append(i.result())
end = time.time()
print('Time taken to scrape {} stickers - {:.3f}s'.format(len(files), end - start))
print()
sticker_set = {
'name': res['result']['name'].lower(),
'title': res['result']['title'],
'files': files
}
return sticker_set
def download_file(self, name, link, path):
file_path = os.path.join(path, name)
with open(file_path, 'wb') as f:
res = self.session.get(link)
f.write(res.content)
return file_path
def download_sticker_set(self, sticker_set):
swd = assure_folder_exists(sticker_set['name'], root=self.cwd)
download_path = assure_folder_exists('webp', root=swd)
downloads = []
print('Starting download of "{}" into {}'.format(sticker_set['name'], download_path))
start = time.time()
with ThreadPoolExecutor(max_workers=self.THREADS) as executor:
futures = [executor.submit(self.download_file, f.name, f.link, download_path) for f in sticker_set['files']]
for i in as_completed(futures):
downloads.append(i.result())
end = time.time()
print('Time taken to download {} stickers - {:.3f}s'.format(len(downloads), end - start))
print()
return downloads
@staticmethod
def convert_file(_input, _output):
command = 'dwebp -quiet "{}" -o "{}"'.format(_input, _output)
check_output(command, shell=True)
return _output
def convert_to_pngs(self, name):
swd = assure_folder_exists(name, root=self.cwd)
webp_folder = assure_folder_exists('webp', root=swd)
png_folder = assure_folder_exists('png', root=swd)
webp_files = [os.path.join(webp_folder, i) for i in os.listdir(webp_folder)]
png_files = []
print('Converting stickers to pngs "{}"..'.format(name))
start = time.time()
with ThreadPoolExecutor(max_workers=self.THREADS) as executor:
futures = [executor.submit(self.convert_file, _input, os.path.join(png_folder, random_filename(6, 'png')))
for _input in webp_files]
for i in as_completed(futures):
png_files.append(i.result())
end = time.time()
print('Time taken to convert {} stickers - {:.3f}s'.format(len(png_files), end - start))
print()
if __name__ == "__main__":
downloader = StickerDownloader(TOKEN)
print('Welcome to Telegram Downloader..')
names = []
while True:
name = input("Enter sticker_set url (leave blank to stop): ").strip()
if name == '':
break
names.append(name.split('/')[-1])
for sset in names:
print('=' * 60)
_ = downloader.get_sticker_set(sset)
if _ is None:
continue
print('-' * 60)
_ = downloader.download_sticker_set(_)
print('-' * 60)
downloader.convert_to_pngs(sset)