Skip to content

Commit

Permalink
Switch resolver to threaded variant since ExecutorResolver by default…
Browse files Browse the repository at this point in the history
… uses blocking implementation
  • Loading branch information
Vladimir Didenko committed Apr 29, 2015
1 parent a8235b3 commit ed8e068
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ PyYAML
MarkupSafe
requests >= 1.0.0
tornado >= 4.0
# Required by Tornado to handle threads stuff.
futures >= 2.0
36 changes: 29 additions & 7 deletions salt/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,28 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout


# TODO: better doc strings
class AsyncReqChannel(object):
class AsyncChannel(object):
'''
Parent class for Async communication channels
'''
# Resolver used by Tornado TCPClient
# This static field is shared between
# AsyncReqChannel and AsyncPubChannel
_resolver = None

@classmethod
def _init_resolver(cls, num_threads=10):
from tornado.netutil import ThreadedResolver
cls._resolver = ThreadedResolver()
cls._resolver.initialize(num_threads=num_threads)

# TODO: better doc strings
class AsyncReqChannel(AsyncChannel):
'''
Factory class to create a Async communication channels to the ReqServer
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

Expand All @@ -60,8 +76,11 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
Expand All @@ -83,12 +102,12 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout
raise NotImplementedError()


class AsyncPubChannel(object):
class AsyncPubChannel(AsyncChannel):
'''
Factory class to create subscription channels to the master's Publisher
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

Expand All @@ -106,6 +125,9 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:
Expand Down
12 changes: 5 additions & 7 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ def __init__(self, opts, **kwargs):
if self.crypt != 'clear':
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)

resolver = kwargs.get('resolver')

parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))

self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop)
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)

def __del__(self):
self.message_client.destroy()
Expand Down Expand Up @@ -379,17 +381,13 @@ class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, host, port, io_loop=None):
def __init__(self, host, port, io_loop=None, resolver=None):
self.host = host
self.port = port

self.io_loop = io_loop or tornado.ioloop.IOLoop.current()

# Configure the resolver to use a non-blocking one
# Not Threaded since we need to work on python2
tornado.netutil.Resolver.configure('tornado.netutil.ExecutorResolver')

self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop)
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop, resolver=resolver)

self._mid = 1
self._max_messages = sys.maxint - 1 # number of IDs before we wrap
Expand Down

0 comments on commit ed8e068

Please sign in to comment.