Skip to content

Commit

Permalink
调试queue模块
Browse files Browse the repository at this point in the history
  • Loading branch information
firefoxbug committed Jun 4, 2014
1 parent afc04d8 commit 86cf348
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 53 deletions.
142 changes: 97 additions & 45 deletions lib/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,112 @@
http://node_ip:node_port/ocdn/purge/purge?token=node_token&domain=node_domain
"""

class Purge(object):
"""OpenCDN Domain cache Purge"""
def __init__(self, arg):opencdn
super(Purge, self).__init__()
self.arg = arg
import sys
import os
import time
try:
import json
except Exception, e:
import simplejson as json

parent, bindir = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0])))
if os.path.exists(os.path.join(parent, 'lib')):
sys.path.insert(0, os.path.join(parent, 'lib'))

from OcdnQueue import Consumer
from OcdnQueue import Producer
from OcdnLogger import init_logger
from OcdnJob import JobManager

class Purge(Consumer):
"""OpenCDN Domain cache Purge
1. Register task module into Consumer
"""
def __init__(self, queue_ip='103.6.222.21', queue_port=4730):
self.queue_ip = queue_ip
self.queue_port = queue_port
super(Purge, self).__init__(queue_ip, queue_port)
self.CURRENT_TASK_MODULE = 'OCDN_PURGE'
purge_queue.init()

def purge_loop():
self.logfile = os.path.join(parent,'logs','purge.log')
self.logger = init_logger(logfile=self.logfile, stdout=True)
self.logger.info('Connect to gearman server %s:%s'%(queue_ip, queue_port))

def run(self):
"""Register callback module and start a worker loop do tasks"""
self.register_task_callback(self.CURRENT_TASK_MODULE, self.do_task)
self.logger.info('Register TaskModule:%s'%(self.CURRENT_TASK_MODULE))
self.start_worker()

def do_task(self, gearman_worker, job):
"""run a task may cause 3 results
1. task excuted failed
2. task excuted success and the job fished
3. task excuted success but the job unfished
"""
while True:
current_task_json_str = Queue.get_task_from_queue(self.CURRENT_TASK_MODULE)
jobjson = JobJSON(current_task_json)

# Get task and parameters to run
(call_module, **arg) = jobjson.get_current_task_to_run()

# Run task
if self.purge_node(**arg) == False:
self.purge_node_failure()
continue

# Job is over
if jobjson.is_job_finished():
self.purge_job_success()
continue

# Job still has tasks to dispatch
next_task_json = jobjson.set_next_task_to_run(parameters)
next_task_module = jobjson.get_next_task_module_name()
if next_task_module:
Queue.put_task_into_queue(next_task_json)

def purge_node():
data = job.data
current_job_json = json.loads(data)
jobmanager = JobManager(current_job_json)
if not jobmanager.check_job_json_is_ok() :
self.logger.error('Parse job json failed.%s'%(job.data))
return "False"

# Get task and parameters to run
task_name, task_args = jobmanager.get_current_task_to_run()

# Run task
if self.purge_node(task_args) == False:
self.purge_node_failure(jobmanager)
return "False"

# Job is over
if jobmanager.is_job_finished():
self.purge_job_success()
return "True"
# Job still has tasks to dispatch
next_task_json = jobmanager.set_next_task_to_run()
if not next_task_json :
self.logger.error('Job is unfished but no more task to do')
return "False"
self.put_task_into_queue(next_task_json['CurrentTask'], next_task_json)
return "True"

def purge_node(self, task_args):
"""run task purge one node cache
return False: job filed
return True: job success
"""

def purge_node_failure():
"""do with purge one node cache failured, try to dispatch the task again.
"""

next_task_json = jobjson.try_run_current_task_again()
if next_task_json:
Queue.put_task_into_queue(self.CURRENT_TASK_MODULE)

def purge_job_success():
"""The purge job is excuted successfully
"""
for instance in task_args:
print instance
return True

def purge_node_failure(self, jobmanager):
"""do with purge one node cache failured, try to dispatch the task again."""

next_task_json = jobmanager.try_run_current_task_again()

if not next_task_json :
error_msg = 'TaskModule:%s Exceed MaxRunTimes, no more dispatch'%(self.CURRENT_TASK_MODULE)
else :
error_msg = 'Failed do task TaskModule:%s. try to redo task. AlreadyRunTimes=%s'%(self.CURRENT_TASK_MODULE, next_task_json['AlreadyRunTimes'])
self.put_task_into_queue(next_task_json['CurrentTask'], next_task_json)
self.logger.error(error_msg)

def purge_job_success(self):
"""The purge job is excuted successfully"""
message = 'Success do task TaskModule:%s'%(self.CURRENT_TASK_MODULE)
self.logger.info(message)

def put_task_into_queue(self, queue_name, task_json):
"""Put a new task json into task queue"""
result = self.push_task(self.queue_ip, self.queue_port, queue_name, task_json)
if result :
self.logger.info("Put task into TaskModule:%s"%(queue_name))
else :
self.logger.error("Put task into TaskModule:%s Data:%s"%(queue_name,str(task_json)))

if __name__ == '__main__':
purge = Purge()
purge.run()
4 changes: 3 additions & 1 deletion test/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
if os.path.exists(os.path.join(parent, 'lib')):
sys.path.insert(0, os.path.join(parent, 'lib'))

from ocdn_queue import Consumer
from OcdnQueue import Consumer
from OcdnLogger import init_logger

class ConsumerTest(Consumer):
"""docstring for Consumer"""
def __init__(self, queue_ip='103.6.222.21', queue_port=4730):
super(ConsumerTest, self).__init__(queue_ip, queue_port)
self.task_name = 'consumer'
self.logger = init_logger(logfile='consumer_test.log', stdout=True)

def run(self):
self.register_task_callback('OCDNQUEUE', self.do_task)
Expand Down
39 changes: 32 additions & 7 deletions test/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
if os.path.exists(os.path.join(parent, 'lib')):
sys.path.insert(0, os.path.join(parent, 'lib'))

from ocdn_queue import Producer
from OcdnQueue import Producer
from OcdnQueue import Consumer
from OcdnLogger import init_logger
from OcdnJob import OcdnJSON

import pprint
pp = pprint.PrettyPrinter()

class ProducerTest(Producer):
"""docstring for queue_test"""
def __init__(self, queue_ip='103.6.222.21', queue_port=4730):
super(ProducerTest, self).__init__(queue_ip, queue_port)
self.task_name = 'ProducerTest'
self.logger = init_logger(logfile='producer_test.log', stdout=True)
self.logger.info('Connect to gearman server %s:%s'%(queue_ip, queue_port))

def produce_job_loop(self):
self.producer_connect_queue()
job2do = {'id':'1', 'data':'hello'}
self.put_task_into_queue('OCDNQUEUE', job2do, Background=False)
def produce_job_loop(self, TaskModule, job2do):
self.connect_queue()
self.logger.info('Put task into TaskModule:%s'%(TaskModule))
self.put_task_into_queue(TaskModule, job2do, Background=False)

def produce_job_tes():
Consumer.push_task(queue_ip='103.6.222.21', queue_port=4730, queue_name='OCDNQUEUE', data='hello')

def purge_test():
TaskName = 'OCDN_PURGE'
TaskList = ['OCDN_PURGE']
Parameters = []
Parameters.append({'ip':'192.168.1.1','port':'80','domain':'www.firefoxbug.com','token':'821e57c57e8455e3e809e23df7bb6ce9'})
Parameters.append({'ip':'192.168.1.2','port':'80','domain':'www.firefoxbug.com','token':'821e57c57e8455e3e809e23df7bb6ce9'})

test = OcdnJSON()
job2do = test.create_job_json(TaskName, TaskList, Parameters)
pp.pprint(job2do)

if __name__ == '__main__':
producer = ProducerTest()
producer.produce_job_loop()
producer.produce_job_loop('OCDN_PURGE', job2do)


if __name__ == '__main__':
purge_test()

0 comments on commit 86cf348

Please sign in to comment.