Skip to content

Commit

Permalink
Move common pre_process_context code from child classes to BaseWorker…
Browse files Browse the repository at this point in the history
… parent
  • Loading branch information
psarma89 committed Apr 14, 2021
1 parent 0c49f79 commit 4bafcfa
Showing 1 changed file with 61 additions and 76 deletions.
137 changes: 61 additions & 76 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
@@ -74,6 +74,33 @@ def _run_hooks(self, hook_name, context):
for hook in hooks:
hook(context)

def _create_pre_process_context(self, packed_message):
message = packed_message['message']
message_body = decode_message(message)
full_task_path = message_body['task']

pre_process_context = {
"task_name": full_task_path.split(".")[-1],
"args": message_body['args'],
"kwargs": message_body['kwargs'],
"full_task_path": full_task_path,
"fetch_time": packed_message['start_time'],
"queue_url": packed_message['queue'],
"timeout": packed_message['timeout'],
"receipt_handle": message['ReceiptHandle']
}

return pre_process_context

def _get_task(self, full_task_path):

task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])
task_module = importlib.import_module(task_path)
task = getattr(task_module, task_name)

return task


class ReadWorker(BaseWorker):

@@ -186,60 +213,41 @@ def process_message(self):
except Empty:
# Return False if we did not attempt to process any messages
return False
message = packed_message['message']
queue_url = packed_message['queue']
fetch_time = packed_message['start_time']
timeout = packed_message['timeout']
message_body = decode_message(message)
full_task_path = message_body['task']
args = message_body['args']
kwargs = message_body['kwargs']

task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])

task_module = importlib.import_module(task_path)

task = getattr(task_module, task_name)

pre_process_context = {
"task_name": task_name,
"args": args,
"kwargs": kwargs,
"full_task_path": full_task_path,
"fetch_time": fetch_time,
"queue_url": queue_url,
"timeout": timeout
}

# Modify the contexts separately so the original
# context isn't modified by later processing
post_process_context = copy.copy(pre_process_context)
pre_process_context = self._create_pre_process_context(packed_message)

current_time = time.time()
if int(current_time - fetch_time) >= timeout:
if int(current_time - pre_process_context["fetch_time"]) \
>= pre_process_context["timeout"]:
logger.warning(
"Discarding task {} with args: {} and kwargs: {} due to "
"exceeding visibility timeout".format( # noqa
full_task_path,
repr(args),
repr(kwargs),
pre_process_context["full_task_path"],
repr(pre_process_context["args"]),
repr(pre_process_context["kwargs"]),
)
)
return True

task = self._get_task(pre_process_context["full_task_path"])

# Modify the contexts separately so the original
# context isn't modified by later processing
post_process_context = copy.copy(pre_process_context)

try:
start_time = time.time()
self._run_hooks("pre_process", pre_process_context)
task(*args, **kwargs)
task(*pre_process_context["args"], **pre_process_context["kwargs"])
except Exception:
end_time = time.time()
logger.exception(
"Task {} raised error in {:.4f} seconds: with args: {} "
"and kwargs: {}: {}".format(
full_task_path,
pre_process_context["full_task_path"],
end_time - start_time,
args,
kwargs,
pre_process_context["args"],
pre_process_context["kwargs"],
traceback.format_exc(),
)
)
@@ -250,16 +258,16 @@ def process_message(self):
else:
end_time = time.time()
self._get_connection().delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
QueueUrl=pre_process_context["queue_url"],
ReceiptHandle=pre_process_context["receipt_handle"]
)
logger.info(
"Processed task {} in {:.4f} seconds with args: {} "
"and kwargs: {}".format(
full_task_path,
pre_process_context["full_task_path"],
end_time - start_time,
repr(args),
repr(kwargs),
pre_process_context["args"],
pre_process_context["kwargs"],
)
)
post_process_context["status"] = "success"
@@ -325,50 +333,27 @@ def read_message(self):

def process_message(self, packed_message):

message = packed_message['message']
queue_url = packed_message['queue']
fetch_time = packed_message['start_time']
timeout = packed_message['timeout']
message_body = decode_message(message)
full_task_path = message_body['task']
args = message_body['args']
kwargs = message_body['kwargs']

task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])
pre_process_context = self._create_pre_process_context(packed_message)

task_module = importlib.import_module(task_path)

task = getattr(task_module, task_name)

pre_process_context = {
"task_name": task_name,
"args": args,
"kwargs": kwargs,
"full_task_path": full_task_path,
"fetch_time": fetch_time,
"queue_url": queue_url,
"timeout": timeout
}
task = self._get_task(pre_process_context["full_task_path"])

# Modify the contexts separately so the original
# context isn't modified by later processing
post_process_context = copy.copy(pre_process_context)

start_time = time.time()

try:
start_time = time.time()
self._run_hooks("pre_process", pre_process_context)
task(*args, **kwargs)
task(*pre_process_context["args"], **pre_process_context["kwargs"])
except Exception:
end_time = time.time()
logger.exception(
"Task {} raised error in {:.4f} seconds: with args: {} "
"and kwargs: {}: {}".format(
full_task_path,
pre_process_context["full_task_path"],
end_time - start_time,
args,
kwargs,
pre_process_context["args"],
pre_process_context["kwargs"],
traceback.format_exc(),
)
)
@@ -379,16 +364,16 @@ def process_message(self, packed_message):
else:
end_time = time.time()
self._get_connection().delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
QueueUrl=pre_process_context["queue_url"],
ReceiptHandle=pre_process_context["receipt_handle"]
)
logger.info(
"Processed task {} in {:.4f} seconds with args: {} "
"and kwargs: {}".format(
full_task_path,
pre_process_context["full_task_path"],
end_time - start_time,
repr(args),
repr(kwargs),
pre_process_context["args"],
pre_process_context["kwargs"],
)
)
post_process_context["status"] = "success"

0 comments on commit 4bafcfa

Please sign in to comment.