Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch resolver to threaded variant #23164

Merged
merged 2 commits into from
May 19, 2015
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Switch resolver to threaded variant since ExecutorResolver by default…
… uses blocking implementation
  • Loading branch information
Vladimir Didenko committed Apr 29, 2015
commit ed8e068bf76574e20f49109bc378f2cb891d71bc
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -4,3 +4,5 @@ PyYAML
MarkupSafe
requests >= 1.0.0
tornado >= 4.0
# Required by Tornado to handle threads stuff.
futures >= 2.0
36 changes: 29 additions & 7 deletions salt/transport/client.py
Original file line number Diff line number Diff line change
@@ -37,12 +37,28 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout


# TODO: better doc strings
class AsyncReqChannel(object):
class AsyncChannel(object):
'''
Parent class for Async communication channels
'''
# Resolver used by Tornado TCPClient
# This static field is shared between
# AsyncReqChannel and AsyncPubChannel
_resolver = None

@classmethod
def _init_resolver(cls, num_threads=10):
from tornado.netutil import ThreadedResolver
cls._resolver = ThreadedResolver()
cls._resolver.initialize(num_threads=num_threads)

# TODO: better doc strings
class AsyncReqChannel(AsyncChannel):
'''
Factory class to create a Async communication channels to the ReqServer
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

@@ -60,8 +76,11 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
return salt.transport.tcp.AsyncTCPReqChannel(opts, resolver=cls._resolver, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
@@ -83,12 +102,12 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout
raise NotImplementedError()


class AsyncPubChannel(object):
class AsyncPubChannel(AsyncChannel):
'''
Factory class to create subscription channels to the master's Publisher
'''
@staticmethod
def factory(opts, **kwargs):
@classmethod
def factory(cls, opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'

@@ -106,6 +125,9 @@ def factory(opts, **kwargs):
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver:
# TODO: add opt to specify number of resolver threads
AsyncChannel._init_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:
12 changes: 5 additions & 7 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
@@ -119,11 +119,13 @@ def __init__(self, opts, **kwargs):
if self.crypt != 'clear':
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)

resolver = kwargs.get('resolver')

parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))

self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop)
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)

def __del__(self):
self.message_client.destroy()
@@ -379,17 +381,13 @@ class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, host, port, io_loop=None):
def __init__(self, host, port, io_loop=None, resolver=None):
self.host = host
self.port = port

self.io_loop = io_loop or tornado.ioloop.IOLoop.current()

# Configure the resolver to use a non-blocking one
# Not Threaded since we need to work on python2
tornado.netutil.Resolver.configure('tornado.netutil.ExecutorResolver')

self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop)
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop, resolver=resolver)

self._mid = 1
self._max_messages = sys.maxint - 1 # number of IDs before we wrap