Skip to content
This repository has been archived by the owner on Nov 30, 2022. It is now read-only.

Commit

Permalink
Add consumer thread to process the messages (not doing anything yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcfarlane committed Jul 21, 2008
1 parent 51386c1 commit 1b318b0
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions scripts/chula-mqueue
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ import os
import socket
import sys
import thread
import time

from chula import collection
from chula.queue import message

localhost = socket.gethostname()
localhost = 'localhost'
config = collection.Collection()
config.mqueue_db = 'sqlite:memory'
config.mqueue_db = 'sqlite:/tmp/mqueue-test.db'

POLL_INTERVAL = 30

class MessageQueueServer(object):
def worker(self, client):
def producer(self, client):
chars_left = 1
msg = ['']
msg_length = None
Expand Down Expand Up @@ -47,29 +50,45 @@ class MessageQueueServer(object):

# Do something with the data
print '[%s]' % ''.join(msg)
queue = message.MessageQueue(config)
queue = self.queue()
queue.add('testing', 'payload', 'type')
queue.close()

client.shutdown(0)
client.close()

def consumer(self):
while True:
queue = self.queue()
msg = queue.pop()
if not msg is None:
print msg
queue.close()
print 'Queue polled for messages'
time.sleep(POLL_INTERVAL)

def start(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((localhost, 8080))
s.listen(5)

# Startup the consumer thread
thread.start_new_thread(self.consumer, ())

# Serve forever
while True:
try:
(clientsocket, address) = s.accept()
thread.start_new_thread(self.worker, (clientsocket,))
thread.start_new_thread(self.producer, (clientsocket,))
except KeyboardInterrupt:
break

s.shutdown(0)
s.close()

def queue(self):
return message.MessageQueue(config)

if __name__ == '__main__':
print 'Running with pid: %s' % os.getpid()
daemon = sys.argv[0].rsplit('/', 1)[-1]
Expand Down

0 comments on commit 1b318b0

Please sign in to comment.