From ed8e068bf76574e20f49109bc378f2cb891d71bc Mon Sep 17 00:00:00 2001 From: Vladimir Didenko Date: Wed, 29 Apr 2015 11:08:29 +0300 Subject: [PATCH] Switch resolver to threaded variant since ExecutorResolver by default uses blocking implementation --- requirements/base.txt | 2 ++ salt/transport/client.py | 36 +++++++++++++++++++++++++++++------- salt/transport/tcp.py | 12 +++++------- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/requirements/base.txt b/requirements/base.txt index 44f8581db47..b142c05069f 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -4,3 +4,5 @@ PyYAML MarkupSafe requests >= 1.0.0 tornado >= 4.0 +# Required by Tornado to handle threads stuff. +futures >= 2.0 diff --git a/salt/transport/client.py b/salt/transport/client.py index ef99bbbbc6c..9e64ace6e8d 100644 --- a/salt/transport/client.py +++ b/salt/transport/client.py @@ -37,12 +37,28 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout # TODO: better doc strings -class AsyncReqChannel(object): +class AsyncChannel(object): + ''' + Parent class for Async communication channels + ''' + # Resolver used by Tornado TCPClient + # This static field is shared between + # AsyncReqChannel and AsyncPubChannel + _resolver = None + + @classmethod + def _init_resolver(cls, num_threads=10): + from tornado.netutil import ThreadedResolver + cls._resolver = ThreadedResolver() + cls._resolver.initialize(num_threads=num_threads) + +# TODO: better doc strings +class AsyncReqChannel(AsyncChannel): ''' Factory class to create a Async communication channels to the ReqServer ''' - @staticmethod - def factory(opts, **kwargs): + @classmethod + def factory(cls, opts, **kwargs): # Default to ZeroMQ for now ttype = 'zeromq' @@ -60,8 +76,11 @@ def factory(opts, **kwargs): import salt.transport.raet return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs) elif ttype == 'tcp': + if not cls._resolver: + # TODO: add opt to specify number of resolver threads + AsyncChannel._init_resolver() import salt.transport.tcp - return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs) + return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs) elif ttype == 'local': import salt.transport.local return salt.transport.local.AsyncLocalChannel(opts, **kwargs) @@ -83,12 +102,12 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout raise NotImplementedError() -class AsyncPubChannel(object): +class AsyncPubChannel(AsyncChannel): ''' Factory class to create subscription channels to the master's Publisher ''' - @staticmethod - def factory(opts, **kwargs): + @classmethod + def factory(cls, opts, **kwargs): # Default to ZeroMQ for now ttype = 'zeromq' @@ -106,6 +125,9 @@ def factory(opts, **kwargs): import salt.transport.raet return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs) elif ttype == 'tcp': + if not cls._resolver: + # TODO: add opt to specify number of resolver threads + AsyncChannel._init_resolver() import salt.transport.tcp return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs) elif ttype == 'local': # TODO: diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 14221ea5c53..e79eff8cf3b 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -119,11 +119,13 @@ def __init__(self, opts, **kwargs): if self.crypt != 'clear': self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop) + resolver = kwargs.get('resolver') + parse = urlparse.urlparse(self.opts['master_uri']) host, port = parse.netloc.rsplit(':', 1) self.master_addr = (host, int(port)) - self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop) + self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver) def __del__(self): self.message_client.destroy() @@ -379,17 +381,13 @@ class SaltMessageClient(object): ''' Low-level message sending client ''' - def __init__(self, host, port, io_loop=None): + def __init__(self, host, port, io_loop=None, resolver=None): self.host = host self.port = port self.io_loop = io_loop or tornado.ioloop.IOLoop.current() - # Configure the resolver to use a non-blocking one - # Not Threaded since we need to work on python2 - tornado.netutil.Resolver.configure('tornado.netutil.ExecutorResolver') - - self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop) + self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop, resolver=resolver) self._mid = 1 self._max_messages = sys.maxint - 1 # number of IDs before we wrap