Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

AMQP support #16

Merged
merged 5 commits into from
Nov 3, 2011
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes from review of #16
  • Loading branch information
nkvoll committed Nov 3, 2011
commit daf6895e03c822e845f96bd2ebd3eea16db9ee9f
106 changes: 72 additions & 34 deletions contrib/amqp/piped/contrib/amqp/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,43 @@ class AMQPConnectionProvider(object, service.MultiService):

def __init__(self):
self._connection_by_name = dict()
self._connection_config_by_name = dict()
service.MultiService.__init__(self)

def configure(self, runtime_environment):
self.setServiceParent(runtime_environment.application)
self.runtime_environment = runtime_environment

connections = runtime_environment.get_configuration_value('amqp.connections', dict())

for connection_name, connection_config in connections.items():
# basic consumers are handled by the AMQPConsumerProvider:
connection_config = copy.copy(connection_config)
connection_config.pop('basic_consumers', None)
self._connection_config_by_name[connection_name] = connection_config

logical_name = 'amqp.connection.{0}'.format(connection_name)
runtime_environment.resource_manager.register(logical_name, provider=self)

def _get_or_create_connection(self, connection_name):
if connection_name not in self._connection_by_name:
connection_config = self._connection_config_by_name[connection_name]
connection = AMQPConnection(connection_name, **connection_config)

connection.configure(self.runtime_environment)
connection.setServiceParent(self)
self._connection_by_name[connection_name] = connection

logical_name = 'amqp.connection.{0}'.format(connection_name)

runtime_environment.resource_manager.register(logical_name, provider=self)
return self._connection_by_name[connection_name]

def add_consumer(self, resource_dependency):
name = resource_dependency.provider.rsplit('.', 1)[-1]
connection = self._connection_by_name[name]
connection_name = resource_dependency.provider.rsplit('.', 1)[-1]
connection = self._get_or_create_connection(connection_name)

connection.on_connected += resource_dependency.on_resource_ready
connection.on_disconnected += resource_dependency.on_resource_lost

# the connection might already be ready:
if connection.ready:
resource_dependency.on_resource_ready(connection.protocol)

Expand All @@ -77,7 +87,7 @@ def __init__(self, parameters):
super(AMQProtocol, self).__init__(parameters)
self.on_lost = event.Event()

self._idle_state = (None, None)
self._previous_idle_state = (None, None)
self._named_channels = dict()
self._pending_named_channels = dict()
self.idle_checker = task.LoopingCall(self._check_idle)
Expand All @@ -88,15 +98,13 @@ def _adapter_disconnect(self):

def connectionLost(self, reason):
self.on_lost(reason)
# stop the idle checker if it is running
self.idle_checker.stop() if self.idle_checker.running else None
return super(AMQProtocol, self).connectionLost(reason)

def _check_idle(self):
state = self.bytes_sent, self.bytes_received
idle_state = self.bytes_sent, self.bytes_received

if not state == self._idle_state:
self._idle_state = state
if not idle_state == self._previous_idle_state:
self._previous_idle_state = idle_state
return

if self.connection_state not in (connection.CONNECTION_CLOSED, connection.CONNECTION_CLOSING):
Expand All @@ -112,14 +120,18 @@ def get_named_channel(self, channel_name):
a failure that describes the reason the channel is unavailable.
"""
if channel_name in self._named_channels:
# if we already have this channel cached, return it
defer.returnValue(self._named_channels[channel_name])

if channel_name in self._pending_named_channels:
# if another invocation is currently requesting this channel for us, share the result
d = defer.Deferred()
self._pending_named_channels[channel_name] += d.callback
channel = yield d
defer.returnValue(channel)

# we are the first ones to request this channel, so we create an event that we will
# fire with the channel or a failure.
self._pending_named_channels[channel_name] = pending_event = event.Event()

try:
Expand Down Expand Up @@ -162,7 +174,7 @@ def __init__(self, name, servers, max_idle_time=None, reconnect_interval=1, para
service.MultiService.__init__(self)

self.name = name
self.servers = servers
self.servers = [servers] if isinstance(servers, basestring) else servers
self._server_index = -1
self.reconnect_interval = reconnect_interval

Expand Down Expand Up @@ -243,7 +255,7 @@ def _keep_connecting(self):
def _connect(self, endpoint):
currently = util.create_deferred_state_watcher(self, '_connecting')

# make sure we're not directly overwriting an existing protocol
# make sure we're not overwriting an existing protocol without attempting to disconnect it first
if self.protocol:
if self.protocol.connection_state not in (connection.CONNECTION_CLOSED, connection.CONNECTION_CLOSING):
self._disconnect('reconnecting')
Expand All @@ -256,6 +268,8 @@ def _connect(self, endpoint):

self.on_connected(self.protocol)

# create a protocol.on_lost handler that stops the idle checker and calls our
# on_disconnected event if it is still our current protocol
def on_lost(reason):
protocol.on_lost -= on_lost
if self.protocol == protocol:
Expand Down Expand Up @@ -331,52 +345,67 @@ class AMQPConsumer(object, service.Service):
_working = None

def __init__(self, name, pipeline, connection, queue=None, qos=None,
ack_before_processing=False, ack_after_successful_processing=True,
nack_after_failed_processing=True, channel_reopen_interval=1):
no_ack = False, exclusive = False,
ack_after_failed_processing=False, ack_after_successful_processing=True,
nack_after_failed_processing=True, channel_reopen_interval=1,
log_processor_exceptions='warn'):
"""
:param name: Logical name of this consumer.
:param pipeline: The pipeline used to process the messages.
:param connection: Name of the connection.
:param queue: Either a name (string) or a queue declaration (dict).
See `queue_declare <http://www.rabbitmq.com/amqp-0-9-1-quickref.html#queue.declare>`_.
:param qos: Used to specify the QOS on the channel.
:param ack_before_processing: If true, acks messages before they are processed.

:param no_ack: Informs the broker that we do not intend to ack consumed messages.
If no_ack is true, ack_after/nack_after settings are ignored.
:param ack_after_successful_processing: If true, acks messages if processing finishes
without errbacking.
:param nack_after_failed_processing: If true, rejects messages if processing errbacks.
:param ack_after_failed_processing: If true, acks messages even if processing errbacks.

:param exclusive: Request exclusive consumer access, meaning only this consumer can access the queue.

:param channel_reopen_interval: Time (in seconds) to wait before reopening the consuming
channel if it closes.
:param log_processor_exceptions: Log level for exceptions raised by our processor. Set to None
to disable.
"""
self.name = name
self.pipeline_name = pipeline
self.connection_name = connection

if isinstance(queue, basestring):
queue = dict(queue=queue, passive=True)

self.queue_declare = queue or dict()
self.queue_declare.setdefault('queue', '')
self.queue_declare.setdefault('auto_delete', True)
self.queue_declare.setdefault('durable', False)

# default the queue to exclusive if the queue name is auto-generated
self.queue_declare.setdefault('exclusive', True if not self.queue_declare['queue'] else False)
self.queue_declare = queue or dict(queue='')

self.qos = qos or dict()

self.ack_before_processing = ack_before_processing
self.no_ack = no_ack
self.ack_after_successful_processing = ack_after_successful_processing
self.nack_after_failed_processing = nack_after_failed_processing
self.ack_after_failed_processing = ack_after_failed_processing

if self.ack_before_processing and (self.ack_after_successful_processing or self.nack_after_failed_processing):
e_msg = 'Cannot both ack before processing and ack/nack after processing.'
if self.ack_after_failed_processing and self.nack_after_failed_processing:
e_msg = 'Cannot both ack and nack after failed processing.'
raise exceptions.ConfigurationError(e_msg)

self.exclusive = exclusive
self.channel_reopen_interval = channel_reopen_interval

self.log_processor_exceptions = log_processor_exceptions
if log_processor_exceptions:
# make sure the provided log level actually exists:
available_log_levels = [key.lower() for key in log.level_value_by_name]
if log_processor_exceptions not in available_log_levels:
e_msg = 'Invalid log level {0!r}.'.format(log_processor_exceptions.lower())
hint = 'Available log levels: {0}'.format(available_log_levels)
raise exceptions.ConfigurationError(e_msg, hint)

def configure(self, runtime_environment):
dm = runtime_environment.dependency_manager

# start and stop consuming as our dependency counterpart becomes ready/lost
self.dependency = dm.as_dependency(self)
self.dependency.on_ready += lambda _: self._run()
self.dependency.on_lost += lambda _, reason: self._stop_working()
Expand Down Expand Up @@ -406,20 +435,19 @@ def _run(self):
frame = yield currently(channel.queue_declare(**self.queue_declare))
queue_name = frame.method.queue

queue, consumer_tag = yield currently(channel.basic_consume(queue=queue_name))
queue, consumer_tag = yield currently(channel.basic_consume(queue=queue_name, no_ack=self.no_ack, exclusive=self.exclusive))

while self.running:
channel, method, properties, body = yield currently(queue.get())

if self.ack_before_processing:
yield currently(channel.basic_ack(delivery_tag=method.delivery_tag))

self._process(channel, method, properties, body)

except pika_exceptions.ChannelClosed as cc:
log.warn()
yield util.wait(self.channel_reopen_interval)

except defer.CancelledError as ce:
return

except Exception as e:
log.warn()

Expand All @@ -428,11 +456,21 @@ def _process(self, channel, method, properties, body):
try:
yield self.process(channel=channel, method=method, properties=properties, body=body)
except Exception as e:
log.warn()
if self.log_processor_exceptions:
logger = getattr(log, self.log_processor_exceptions.lower())
logger()

if self.no_ack:
return

if self.nack_after_failed_processing:
yield channel.basic_reject(delivery_tag=method.delivery_tag)

if self.ack_after_failed_processing:
yield channel.basic_ack(delivery_tag=method.delivery_tag)

else:
if self.ack_after_successful_processing:
if not self.no_ack and self.ack_after_successful_processing:
yield channel.basic_ack(delivery_tag=method.delivery_tag)

@defer.inlineCallbacks
Expand Down
Loading