diff --git a/DB/redisClient.py b/DB/redisClient.py index 26c0effb0..abe1ecce9 100644 --- a/DB/redisClient.py +++ b/DB/redisClient.py @@ -59,7 +59,7 @@ def put(self, proxy_obj): :param proxy_obj: Proxy obj :return: """ - data = self.__conn.hset(self.name, proxy_obj.proxy, proxy_obj.info_json) + data = self.__conn.hset(self.name, proxy_obj.proxy, proxy_obj.to_json) return data def pop(self): @@ -97,7 +97,7 @@ def update(self, proxy_obj): :param proxy_obj: :return: """ - return self.__conn.hset(self.name, proxy_obj.proxy, proxy_obj.info_json) + return self.__conn.hset(self.name, proxy_obj.proxy, proxy_obj.to_json) def getAll(self): """ diff --git a/Test/testDbClient.py b/Test/testDbClient.py index 9b8f5a8a4..ec27b7d79 100644 --- a/Test/testDbClient.py +++ b/Test/testDbClient.py @@ -12,7 +12,7 @@ """ __author__ = 'JHao' -from db.DbClient import DbClient +from db.dbClient import DbClient if __name__ == '__main__': # ############### ssdb ############### diff --git a/handler/proxyHandler.py b/handler/proxyHandler.py index 6afe59102..2eb293b9e 100644 --- a/handler/proxyHandler.py +++ b/handler/proxyHandler.py @@ -32,7 +32,7 @@ def get(self): self.db.changeTable(self.conf.useProxy) proxy = self.db.get() if proxy: - return Proxy.newProxyFromJson(proxy) + return Proxy.createFromJson(proxy) return None def pop(self): @@ -43,9 +43,17 @@ def pop(self): self.db.changeTable(self.conf.useProxy) proxy = self.db.pop() if proxy: - return Proxy.newProxyFromJson(proxy) + return Proxy.createFromJson(proxy) return None + def put(self, proxy_obj): + """ + put proxy into use proxy + :return: + """ + self.db.changeTable(self.conf.useProxy) + self.db.put(proxy_obj) + def delete(self, proxy_str): """ delete useful proxy @@ -62,7 +70,7 @@ def getAll(self): """ self.db.changeTable(self.conf.useProxy) proxies_dict = self.db.getAll() - return [Proxy.newProxyFromJson(value) for _, value in proxies_dict.items()] + return [Proxy.createFromJson(value) for _, value in proxies_dict.items()] def exists(self, proxy_str): """ diff --git a/helper/ProxyUtil.py b/helper/ProxyUtil.py deleted file mode 100644 index 373d6d306..000000000 --- a/helper/ProxyUtil.py +++ /dev/null @@ -1,40 +0,0 @@ -# -*- coding: utf-8 -*- -""" -------------------------------------------------- - File Name: ProxyHelper - Description : - Author : JHao - date: 2019/8/8 -------------------------------------------------- - Change Activity: - 2019/8/8: -------------------------------------------------- -""" -__author__ = 'JHao' - -from util import validUsefulProxy - -from datetime import datetime - - -def checkProxyUseful(proxy_obj): - """ - 检测代理是否可用 - :param proxy_obj: Proxy object - :return: Proxy object, status - """ - - if validUsefulProxy(proxy_obj.proxy): - # 检测通过 更新proxy属性 - proxy_obj.check_count += 1 - proxy_obj.last_status = 1 - proxy_obj.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if proxy_obj.fail_count > 0: - proxy_obj.fail_count -= 1 - return proxy_obj, True - else: - proxy_obj.check_count += 1 - proxy_obj.last_status = 0 - proxy_obj.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - proxy_obj.fail_count += 1 - return proxy_obj, False diff --git a/schedule/check.py b/helper/check.py similarity index 73% rename from schedule/check.py rename to helper/check.py index 0bb82a636..28c1f4f83 100644 --- a/schedule/check.py +++ b/helper/check.py @@ -7,14 +7,14 @@ date: 2019/8/6 ------------------------------------------------- Change Activity: - 2019/8/6: + 2019/08/06: ------------------------------------------------- """ __author__ = 'JHao' +from util.six import Empty from threading import Thread from datetime import datetime -from util.six import Queue, Empty from helper.proxy import Proxy from util.validators import validators @@ -51,7 +51,7 @@ def __proxyCheck(proxy): return proxy_obj -class ProxyCheck(Thread): +class Checker(Thread): def __init__(self, check_type, queue, thread_name): Thread.__init__(self, name=thread_name) @@ -69,17 +69,17 @@ def run(self): self.log.info("ProxyCheck - {} : exit".format(self.name)) break - proxy = Proxy.newProxyFromJson(proxy_json) + proxy = Proxy.createFromJson(proxy_json) proxy = proxyCheck(proxy) if self.type == "raw": if proxy.last_status: - if self.proxy_handler.exists(proxy_obj.proxy): - self.log.info('RawProxyCheck - {} : {} validation exists'.format(self.name, - proxy_obj.proxy.ljust(20))) + if self.proxy_handler.exists(proxy.proxy): + self.log.info('ProxyCheck - {} : {} exists'.format(self.name, proxy.proxy.ljust(23))) + else: + self.log.info('ProxyCheck - {} : {} success'.format(self.name, proxy.proxy.ljust(23))) + self.proxy_handler.put(proxy) else: - self.db.put(proxy_obj) - self.log.info( - 'RawProxyCheck - {} : {} validation pass'.format(self.name, proxy_obj.proxy.ljust(20))) + self.log.info('ProxyCheck - {} : {} fail'.format(self.name, proxy.proxy.ljust(23))) else: - self.log.info('RawProxyCheck - {} : {} validation fail'.format(self.name, proxy_obj.proxy.ljust(20))) + pass self.queue.task_done() diff --git a/schedule/fetch.py b/helper/fetch.py similarity index 54% rename from schedule/fetch.py rename to helper/fetch.py index d1ff08809..53cbe530f 100644 --- a/schedule/fetch.py +++ b/helper/fetch.py @@ -32,38 +32,27 @@ def fetch(self): :return: """ proxy_set = set() - self.log.info("ProxyFetcher : start") + self.log.info("ProxyFetch : start") for fetch_name in self.conf.fetchers: - self.log.info("ProxyFetcher - {func}: start".format(func=fetch_name)) + self.log.info("ProxyFetch - {func}: start".format(func=fetch_name)) fetcher = getattr(ProxyFetcher, fetch_name, None) if not fetcher: - self.log.error("ProxyFetcher - {func}: class method not exists!") + self.log.error("ProxyFetch - {func}: class method not exists!") continue if not callable(fetcher): - self.log.error("ProxyFetcher - {func}: must be class method") + self.log.error("ProxyFetch - {func}: must be class method") continue try: for proxy in fetcher(): - proxy = proxy.strip() - if not proxy or not verifyProxyFormat(proxy): - self.log.error('ProxyFetch - {func}: ' - '{proxy} illegal'.format(func=proxyGetter, proxy=proxy.ljust(20))) - continue - elif proxy in proxy_set: - self.log.info('ProxyFetch - {func}: ' - '{proxy} exist'.format(func=proxyGetter, proxy=proxy.ljust(20))) + if proxy in proxy_set: + self.log.info('ProxyFetch - %s: %s exist' % (fetch_name, proxy.ljust(23))) continue else: - self.log.info('ProxyFetch - {func}: ' - '{proxy} success'.format(func=proxyGetter, proxy=proxy.ljust(20))) - self.db.put(Proxy(proxy, source=proxyGetter)) + self.log.info('ProxyFetch - %s: %s success' % (fetch_name, proxy.ljust(23))) + if proxy.strip(): proxy_set.add(proxy) except Exception as e: - self.log.error("ProxyFetch - {func}: error".format(func=proxyGetter)) + self.log.error("ProxyFetch - {func}: error".format(func=fetch_name)) self.log.error(str(e)) - - -if __name__ == '__main__': - a = callable(getattr(ProxyFetcher, 'freeProxy01')) - pass + return proxy_set diff --git a/helper/proxy.py b/helper/proxy.py index fd4e05f2e..97074d99c 100644 --- a/helper/proxy.py +++ b/helper/proxy.py @@ -29,7 +29,7 @@ def __init__(self, proxy, fail_count=0, region="", proxy_type="", self._last_time = last_time @classmethod - def newProxyFromJson(cls, proxy_json): + def createFromJson(cls, proxy_json): """ 根据proxy属性json创建Proxy实例 :param proxy_json: @@ -87,7 +87,7 @@ def last_time(self): return self._last_time @property - def info_dict(self): + def to_dict(self): """ 属性字典 """ return {"proxy": self._proxy, "fail_count": self._fail_count, @@ -99,9 +99,9 @@ def info_dict(self): "last_time": self.last_time} @property - def info_json(self): + def to_json(self): """ 属性json格式 """ - return json.dumps(self.info_dict, ensure_ascii=False) + return json.dumps(self.to_dict, ensure_ascii=False) # --- proxy method --- @fail_count.setter diff --git a/helper/proxyHelper.py b/helper/proxyHelper.py deleted file mode 100644 index d9d61f965..000000000 --- a/helper/proxyHelper.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -""" -------------------------------------------------- - File Name: proxyHelper - Description : - Author : JHao - date: 2020/6/24 -------------------------------------------------- - Change Activity: - 2020/6/24: -------------------------------------------------- -""" -__author__ = 'JHao' - -from util.validators import validators - -from datetime import datetime - - -def proxyCheck(proxy_obj): - """ - 检测代理是否可用 - :param proxy_obj: Proxy object - :return: Proxy object, status - """ - - def __proxyCheck(proxy): - for func in validators: - if not func(proxy): - return False - return True - - if __proxyCheck(proxy_obj.proxy): - # 检测通过 更新proxy属性 - proxy_obj.check_count += 1 - proxy_obj.last_status = 1 - proxy_obj.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if proxy_obj.fail_count > 0: - proxy_obj.fail_count -= 1 - return proxy_obj - else: - proxy_obj.check_count += 1 - proxy_obj.last_status = 0 - proxy_obj.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - proxy_obj.fail_count += 1 - return proxy_obj diff --git a/helper/scheduler.py b/helper/scheduler.py new file mode 100644 index 000000000..a49d1059d --- /dev/null +++ b/helper/scheduler.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +------------------------------------------------- + File Name: proxyScheduler + Description : + Author : JHao + date: 2019/8/5 +------------------------------------------------- + Change Activity: + 2019/8/5: proxyScheduler +------------------------------------------------- +""" +__author__ = 'JHao' + +from apscheduler.schedulers.blocking import BlockingScheduler + +from util.six import Queue +from helper.fetch import Fetcher +from helper.check import Checker +from helper.proxy import Proxy +from handler.logHandler import LogHandler +from handler.proxyHandler import ProxyHandler + + +def doProxyFetch(): + proxy_queue = Queue() + + fetcher = Fetcher() + for proxy in fetcher.fetch(): + proxy_queue.put(Proxy(proxy).to_json) + + thread_list = list() + for index in range(20): + thread_list.append(Checker("raw", proxy_queue, "thread_%s" % str(index).zfill(2))) + + for thread in thread_list: + thread.start() + + for thread in thread_list: + thread.join() + + +def doProxyCheck(): + proxy_queue = Queue() + + proxy_handler = ProxyHandler() + for proxy in proxy_handler.getAll(): + proxy_queue.put(proxy.to_json) + + +# class DoFetchProxy(ProxyManager): +# """ fetch proxy""" +# +# def __init__(self): +# ProxyManager.__init__(self) +# self.log = LogHandler('fetch_proxy') +# +# def main(self): +# self.log.info("start fetch proxy") +# self.fetch() +# self.log.info("finish fetch proxy") +# +# +# def rawProxyScheduler(): +# DoFetchProxy().main() +# doRawProxyCheck() +# +# +# def usefulProxyScheduler(): +# doUsefulProxyCheck() + + +def runScheduler(): + doProxyFetch() + + scheduler_log = LogHandler("scheduler") + scheduler = BlockingScheduler(logger=scheduler_log) + + scheduler.add_job(doProxyFetch, 'interval', minutes=5, id="proxy_fetch", name="proxy采集") + # scheduler.add_job(usefulProxyScheduler, 'interval', minutes=1, id="useful_proxy_check", name="useful_proxy定时检查") + + scheduler.start() + + +if __name__ == '__main__': + runScheduler() diff --git a/proxyPool.py b/proxyPool.py index 4c56e98b2..0330acdd0 100644 --- a/proxyPool.py +++ b/proxyPool.py @@ -14,10 +14,9 @@ import click -from util import six from config.setting import BANNER -# from Schedule.ProxyScheduler import runScheduler +from helper.proxyScheduler import runScheduler from api.proxyApi import runFlask CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -33,7 +32,7 @@ def cli(): def schedule(): """ 启动调度程序 """ click.echo(BANNER) - # runScheduler() + runScheduler() @cli.command(name="server") diff --git a/schedule/proxyScheduler.py b/schedule/proxyScheduler.py deleted file mode 100644 index cbd0df036..000000000 --- a/schedule/proxyScheduler.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding: utf-8 -*- -""" -------------------------------------------------- - File Name: proxyScheduler - Description : - Author : JHao - date: 2019/8/5 -------------------------------------------------- - Change Activity: - 2019/8/5: proxyScheduler -------------------------------------------------- -""" -__author__ = 'JHao' - -import sys -from apscheduler.schedulers.blocking import BlockingScheduler - -sys.path.append('../') - -from schedule import doRawProxyCheck, doUsefulProxyCheck -from handler import ProxyManager -from util import LogHandler - - -class DoFetchProxy(ProxyManager): - """ fetch proxy""" - - def __init__(self): - ProxyManager.__init__(self) - self.log = LogHandler('fetch_proxy') - - def main(self): - self.log.info("start fetch proxy") - self.fetch() - self.log.info("finish fetch proxy") - - -def rawProxyScheduler(): - DoFetchProxy().main() - doRawProxyCheck() - - -def usefulProxyScheduler(): - doUsefulProxyCheck() - - -def runScheduler(): - rawProxyScheduler() - usefulProxyScheduler() - - scheduler_log = LogHandler("scheduler_log") - scheduler = BlockingScheduler(logger=scheduler_log) - - scheduler.add_job(rawProxyScheduler, 'interval', minutes=5, id="raw_proxy_check", name="raw_proxy定时采集") - scheduler.add_job(usefulProxyScheduler, 'interval', minutes=1, id="useful_proxy_check", name="useful_proxy定时检查") - - scheduler.start() - - -if __name__ == '__main__': - runScheduler()