Skip to content

Commit

Permalink
Merge pull request #23164 from vladimir-didenko/nonblocking-resolver
Browse files Browse the repository at this point in the history
Switch resolver to threaded variant
  • Loading branch information
thatch45 committed May 19, 2015
2 parents e97a9c4 + fce3da3 commit cffa82f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ PyYAML
MarkupSafe
requests >= 1.0.0
tornado >= 4.0
# Required by Tornado to handle threads stuff.
futures >= 2.0
37 changes: 30 additions & 7 deletions salt/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,29 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout


# TODO: better doc strings
class AsyncReqChannel(object):
class AsyncChannel(object):
'''
Parent class for Async communication channels
'''
# Resolver used by Tornado TCPClient
# This static field is shared between
# AsyncReqChannel and AsyncPubChannel
_resolver = None

@classmethod
def _init_resolver(cls, num_threads=10):
from tornado.netutil import ThreadedResolver
cls._resolver = ThreadedResolver()
cls._resolver.initialize(num_threads=num_threads)


# TODO: better doc strings
class AsyncReqChannel(AsyncChannel):
'''
Factory class to create a Async communication channels to the ReqServer
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

Expand All @@ -60,8 +77,11 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
Expand All @@ -83,12 +103,12 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout
raise NotImplementedError()


class AsyncPubChannel(object):
class AsyncPubChannel(AsyncChannel):
'''
Factory class to create subscription channels to the master's Publisher
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

Expand All @@ -106,6 +126,9 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:
Expand Down
12 changes: 5 additions & 7 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,13 @@ def __singleton_init__(self, opts, **kwargs):
if self.crypt != 'clear':
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)

resolver = kwargs.get('resolver')

parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))

self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop)
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)

def __del__(self):
self.message_client.destroy()
Expand Down Expand Up @@ -425,17 +427,13 @@ class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, host, port, io_loop=None):
def __init__(self, host, port, io_loop=None, resolver=None):
self.host = host
self.port = port

self.io_loop = io_loop or tornado.ioloop.IOLoop.current()

# Configure the resolver to use a non-blocking one
# Not Threaded since we need to work on python2
tornado.netutil.Resolver.configure('tornado.netutil.ExecutorResolver')

self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop)
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop, resolver=resolver)

self._mid = 1
self._max_messages = sys.maxint - 1 # number of IDs before we wrap
Expand Down

0 comments on commit cffa82f

Please sign in to comment.