-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathshadowChecker
executable file
·134 lines (104 loc) · 5.22 KB
/
shadowChecker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# Installing shadowsocks client
# https://github.com/shadowsocks/shadowsocks-libev
# https://www.linuxbabe.com/desktop-linux/how-to-install-and-use-shadowsocks-command-line-client
# sudo pip install shadowsocks
import argparse
import concurrent.futures
import tempfile
from proxyUtil import *
ch = logging.StreamHandler()
ch.setFormatter(CustomFormatter())
logging.basicConfig(level=logging.ERROR, handlers=[ch])
tempdir = tempfile.mkdtemp()
def Checker(shadowList, localPort, testDomain, timeOut):
liveProxy = []
proxy = PROXIES.copy() #deepcopy(PROXIES)
proxy['http'] = proxy['http'].format(LOCAL_PORT=localPort)
proxy['https'] = proxy['https'].format(LOCAL_PORT=localPort)
pidPath = f"{tempdir}/ss.pid.{localPort}"
for ss_url in shadowList :
server, server_port, method, password, plugin, plugin_opts, tag = parse_ss_withPlugin(ss_url)
if not isIPv4(server) and not isIPv6(server) and not getIP(server):
continue
cmd = ssURI2sslocal(ss_url, localPort, pidPath)
os.system(cmd)
time.sleep(0.2)
ping = is_alive(testDomain, proxy, timeOut)
if ping:
liveProxy.append((ss_url, ping))
try :
# http://httpbin.org/ip http://ip-api.com/json https://api.ipify.org
result = json.loads(requests.get('http://ip-api.com/json/', proxies=proxy, timeout = timeOut).content)
logging.info(f"[live] ip={result['query']} @ {result['country']} ping={ping}")
except Exception as x:
logging.warning(f"[{'failed'}] ip={server} with ping={ping}")
pass
else :
logging.debug(f"[dead] ip={server}")
os.system(f"if ps -p $(cat {pidPath}) > /dev/null 2>&1 ;then kill -9 $(cat {pidPath}); fi")
time.sleep(0.3) # sleep 0.3 seconds
return liveProxy
def main():
parser = argparse.ArgumentParser(description="Simple shadowsocks proxy checker")
parser.add_argument("-f", "--file", help="file contain ss proxy")
parser.add_argument("-d", "--domain", help="test connect domain", default='https://www.google.com')
parser.add_argument("-t", "--timeout", help="timeout in seconds, default is 3", default=3 , type=int)
parser.add_argument("-l", "--lport", help="start local port, default is 1080", default=1080, type=int)
parser.add_argument('-v', "--verbose", help="increase output verbosity", action="store_true", default=False)
parser.add_argument('-vv', '--debug', help="debug log", action='store_true', default=False)
parser.add_argument('-T', '--threads', help="threads number, default is 10", default=10, type=int)
parser.add_argument('--url', help="get proxy from url")
parser.add_argument('--free', help="get free proxy", action='store_true', default=False)
parser.add_argument('--stdin', help="get proxy from stdin", action='store_true', default=False)
parser.add_argument('--reuse', help="reuse last checked proxy", action='store_true', default=False)
parser.add_argument('-o', '--output', help="output file", default='sortedShadow.txt')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
if not is_tool('ss-local'):
logging.error("ss-local not found, please install shadowsocks client first")
logging.error("\thttps://github.com/shadowsocks/shadowsocks-libev")
exit(1)
killProcess('ss-local') # init system
lines = set()
if args.file and os.path.isfile(args.file):
with open(args.file, 'r', encoding='UTF-8') as file:
lines.update( parseContent(file.read().strip(), [ss_scheme]) )
logging.info(f"got {len(lines)} from reading proxy from file")
if args.reuse and os.path.isfile(args.output):
with open(args.output, 'r', encoding='UTF-8') as f:
lines.update( parseContent(f.read().strip()) )
if args.url :
lines.update( ScrapURL(args.url, [ss_scheme]) )
if args.free :
lines.update( ScrapURL('https://raw.githubusercontent.com/freefq/free/master/v2', [ss_scheme]) )
if args.stdin :
lines.update( parseContent(sys.stdin.read(), [ss_scheme]) )
lines = list(lines)
logging.info(f"We have {len(lines)} proxy to check")
if not lines:
logging.error("No proxy to check")
return
N = min(args.threads, len(lines))
openPort = []
port = args.lport
while len(openPort)<N :
if not is_port_in_use(port):
openPort.append(port)
port+=1
with concurrent.futures.ThreadPoolExecutor(max_workers=N) as executor:
results = executor.map(Checker , split2Npart(lines, N)
, openPort
, itertools.repeat(args.domain, N)
, itertools.repeat(args.timeout, N) )
liveProxy = [*itertools.chain(*results)]
liveProxy.sort(key=lambda x: x[1])
with open(args.output, 'w') as f:
for ss_url in liveProxy:
f.write(f"{ss_url[0]}\n")
if __name__ == '__main__':
main()
shutil.rmtree(tempdir)