Skip to content

Commit

Permalink
Merge pull request benoitc#4 from EnTeQuAk/unkillable
Browse files Browse the repository at this point in the history
Implemented workers without a timeout and added some simple AMQP example
  • Loading branch information
benoitc committed Sep 6, 2011
2 parents 71806d0 + 990dc6b commit 1a31961
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
87 changes: 87 additions & 0 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#-*- coding: utf-8 -*-
"""
A simple worker with a AMQP consumer.
This example shows how to implement a simple AMQP consumer
based on `Kombu <http://github.com/ask/kombu>`_ and shows you
what different kind of workers you can put to a arbiter
to manage the worker lifetime, event handling and shutdown/reload szenarios.
"""
import sys
import time
import socket
import logging
from pistil.arbiter import Arbiter
from pistil.worker import Worker
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer


CONNECTION = ('localhost', 'guest', 'default', '/')

log = logging.getLogger(__name__)


class AMQPWorker(Worker):

queues = [
{'routing_key': 'test',
'name': 'test',
'handler': 'handle_test'
}
]

_connection = None

def handle_test(self, body, message):
log.debug("Handle message: %s" % body)
message.ack()

def handle(self):
log.debug("Start consuming")
exchange = Exchange('amqp.topic', type='direct', durable=True)
self._connection = BrokerConnection(*CONNECTION)
channel = self._connection.channel()

for entry in self.queues:
log.debug("prepare to consume %s" % entry['routing_key'])
queue = Queue(entry['name'], exchange=exchange,
routing_key=entry['routing_key'])
consumer = Consumer(channel, queue)
consumer.register_callback(getattr(self, entry['handler']))
consumer.consume()

log.debug("start consuming...")
while True:
try:
self._connection.drain_events()
except socket.timeout:
log.debug("nothing to consume...")
break
self._connection.close()

def run(self):
while self.alive:
try:
self.handle()
except Exception:
self.alive = False
raise

def handle_quit(self, sig, frame):
if self._connection is not None:
self._connection.close()
self.alive = False

def handle_exit(self, sig, frame):
if self._connection is not None:
self._connection.close()
self.alive = False
sys.exit(0)


if __name__ == "__main__":
conf = {}
specs = [(AMQPWorker, None, "worker", {}, "test")]
a = Arbiter(conf, specs)
a.run()
8 changes: 5 additions & 3 deletions pistil/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def __init__(self, child_class, timeout, child_type,
self.child_type = child_type
self.args = args
self.name = name
print self.name


# chaine init worker:
# (WorkerClass, max_requests, timeout, type, args, name)
# types: supervisor, kill, brutal_kill, worker
# timeout: integer in seconds or None

class Arbiter(object):
"""
Expand Down Expand Up @@ -375,7 +375,7 @@ def reload(self):

# kill old workers
for wpid, (child, state) in OLD__WORKERS.items():
if state:
if state and child.timeout is not None:
if child.child_type == "supervisor":
# we only reload suprvisors.
sig = signal.SIGHUP
Expand All @@ -392,13 +392,15 @@ def murder_workers(self):
"""
for (pid, child_info) in self._WORKERS.items():
(child, state) = child_info
if state:
if state and child.timeout is not None:
try:
diff = time.time() - os.fstat(child.tmp.fileno()).st_ctime
if diff <= child.timeout:
continue
except ValueError:
continue
elif state and child.timeout is None:
continue

log.critical("WORKER TIMEOUT (pid:%s)", pid)
self.kill_worker(pid, signal.SIGKILL)
Expand Down

0 comments on commit 1a31961

Please sign in to comment.