-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstartup.py
160 lines (131 loc) · 6.3 KB
/
startup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import logging
import os
import platform
import subprocess
import time
import requests
import urllib3
import yaml
logger = logging.getLogger(__name__)
urllib3.disable_warnings()
def init_logging(log_level=logging.INFO):
log_format = '[%(levelname)s] %(asctime)s %(filename)s line %(lineno)d: %(message)s'
date_fmt = '%a, %d %b %Y %H:%M:%S'
logging.basicConfig(
format=log_format,
datefmt=date_fmt,
level=log_level,
)
def download_file(url):
local_filename = url.split('/')[-1]
# NOTE the stream=True parameter below
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
# if chunk:
f.write(chunk)
return local_filename
def get_latest_provider_config(provider_url):
provider_url = provider_url.strip(' ').strip('"')
if not provider_url.lower().startswith('http'):
logger.error('provider url invalid, url={}'.format(provider_url))
return None
try:
res = requests.get(provider_url, verify=False)
logger.info('provider request status code={}'.format(res.status_code))
if res.status_code > 299:
logger.error("status code={}; reason={}".format(res.status_code, res.reason))
return None
config_yaml = yaml.safe_load(res.content)
return config_yaml
except Exception as e:
logger.exception("get provider config failed; error=", e)
return None
def update_clash_config(provider_config_tmp=None):
api_secret = os.getenv('api_secret', None)
# "abc:abc efg:efg"
proxy_authentication_env = os.environ.get("proxy_authentications", None)
local_config = None
if provider_config_tmp is not None:
local_temp_config_path = './.config/clash/config_template.yaml'
if os.path.exists(local_temp_config_path):
logger.info('load local clash config template...')
with open(local_temp_config_path, 'r', encoding='utf-8') as f:
local_config = yaml.safe_load(f)
local_config['proxies'] = provider_config_tmp['proxies']
for proxy_group_detail in local_config['proxy-groups']:
if proxy_group_detail['name'] == '🔰国外流量':
proxy_group_detail['proxies'] = []
for proxy_detail in local_config['proxies']:
proxy_group_detail['proxies'].append(proxy_detail['name'])
proxy_group_detail['proxies'].append('🚀直接连接')
local_config_path = './.config/clash/config.yaml'
if local_config is None:
logger.info('no valid provider config, load local clash config...')
with open(local_config_path, 'r', encoding='utf-8') as f:
local_config = yaml.safe_load(f)
if proxy_authentication_env is not None:
proxy_authentication_list_tmp = proxy_authentication_env.strip(' ').strip('"').split(" ")
proxy_authentication_list = []
for proxy_authentication_tmp in proxy_authentication_list_tmp:
if len(proxy_authentication_tmp.split(':')) == 2:
proxy_authentication_list.append(proxy_authentication_tmp)
else:
logger.warning('proxy authentication invalid, input str={}'.format(proxy_authentication_tmp))
if len(proxy_authentication_list) > 0:
logger.info('update proxy authentication...')
local_config['authentication'] = proxy_authentication_list
if api_secret is not None:
logger.info('update api secret...')
local_config['secret'] = api_secret
subprocess.call('cp {} {}'.format(local_config_path, local_config_path + '.bak'), shell=True)
with open(local_config_path, 'w', encoding='utf-8') as f:
logger.info('dump clash config to local config file={}'.format(local_config_path))
yaml.safe_dump(local_config, f, indent=4, encoding='utf-8', allow_unicode=True, sort_keys=False)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
init_logging()
provider_subscribe_url = os.getenv('provider_url', '')
check_interval_time = float(os.getenv('check_interval', 1)) * 60
os_type = platform.system()
cpu_type = platform.machine()
logger.info("os type={}; cpu type={}".format(os_type, cpu_type))
provider_check_time = time.time()
logger.info('start to get latest provider config...')
provider_config = get_latest_provider_config(provider_subscribe_url)
logger.info('start update clash config file ...')
update_clash_config(provider_config)
logger.info('start up clash app...')
clash_app = subprocess.Popen('/clash')
time.sleep(check_interval_time)
while True:
try:
if time.time() - provider_check_time > 36000:
# get provider config 10 hour onetime
provider_config_old = provider_config
logger.info('start to get latest provider config...')
provider_check_time = time.time()
provider_config = get_latest_provider_config(provider_subscribe_url)
if provider_config_old != provider_config:
logger.info('provider config has updated...')
logger.info('start update clash config file ...')
update_clash_config(provider_config)
logger.info('kill current clash app...')
clash_app.kill()
time.sleep(3)
logger.info('start up clash app...')
clash_app = subprocess.Popen('/clash')
else:
logger.info('provider config not change...')
except Exception as e:
logger.exception("check provider config failed; error=", e)
time.sleep(check_interval_time)
if clash_app.poll() is not None:
logger.info('clash app status={}; restart clash app...'.format(clash_app.poll()))
clash_app = subprocess.Popen('/clash')
else:
logger.info("clash app run normal; status={}".format(clash_app.poll()))
logger.info('......clash service check finish......')