-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/simple process worker #76
Feature/simple process worker #76
Conversation
@spulec @jimjshields @nadlerjessie Curious on your thoughts for the different decision points we have to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm excited about this @psarma89!! Overall this looks good, I agree there's an opportunity to refactor to avoid repeated code. My biggest sticking point is the idea of discarding messages because they exceed the visibility timeout. I think it will cause more trouble than it's worth. Removing batch size should remove the potential gotcha - I don't have a huge need for batch size, I don't know about others.
I left comments inline which address some of your questions but to be explicit (answers in bold):
- Decision 1: For SimpleProcessWorker, do we want to keep track of number of messages_processed due to memory implications, is this relevant still? I don't see the harm in keeping it
- Decision 1a: If we keep this, do we count a message grabbed and discarded due to visibility timeout as a going against the count, i.e messages_processed? I don't think we should discard messages due to visibility timeout, more comments in PR
- Decision 2: Should read_message in SimpleProcessWorker break or not do the visibility timeout check here at all since we are doing it in process_message? Same as above
- Decision 3: Should we immediately make messages visible that have failed, but have a long visibility timeout? **I don't understand the question. Can you give an example?**
- Decision 4: Do we support batchsize in SimpleProcessWorker? Would rather concurrency over batchsize. That might address some of my visibility timeout concerns too
- Decision 5: Support multiple sqs queues for SimpleManagerWorker? I don't think it's necessary
- Decision 6: Support concurrency for SimpleManagerWorker? I think this it's nice to keep it
pyqs/worker.py
Outdated
if int(end - start) >= self.visibility_timeout: | ||
# Don't add any more messages since they have | ||
# re-appeared in the sqs queue Instead just reset and get | ||
# fresh messages from the sqs queue | ||
msg = ( | ||
"Clearing Local messages since we exceeded " | ||
"their visibility_timeout" | ||
) | ||
logger.warning(msg) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you asked about wanting both checks but I'm wondering - do we want either of them? My understanding about the initial motivation was that we wanted to avoid processing messages multiple times. The internal queue made it more likely that the message was pulled and the visibility timeout expired before we processed it. @spulec could speak to that more.
It's still possible to execute a message twice. Let's we have a batchsize of 2, a vis timeout of 60 seconds, and the first message takes 65 seconds. Message 2 could exceed the vis timeout without ever running it.
However I don't know if the risk of running it multiple times is worse than the risk of pulling a message and never executing it. If we have a max receives of 1, that means message number 2 ends up in the DLQ without ever running it. IMO this is the largest pain point with pyqs today.
If we want to keep it at least now there's a way around it that is not available with the internal queue but it still sets up a potential trap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my understanding as well. It is there to avoid double processing a message. Would this not be relevant for example in sending email campaigns in clementine or are there application checks to avoid it. I am OK without doing this check and adding better documentation to explain the implications, and putting the onus on the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most apps that have max receives > 1 should have logic to address if a message gets picked up again. I suppose it's possible the checks wouldn't capture if two are picked up at the exact same time. I think an at least once pattern is preferable to what we have here, which would lead to messages never being processed, but I don't know if that's true for all applications.
The real issue here is batchsize. If we remove batchsize I don't think this check does anything (we read the message and immediately start processing it). Then can safely remove these checks and the possibility of something going straight to the DLQ without ever executing the task.
messages = self._get_connection().receive_message( | ||
QueueUrl=self.queue_url, | ||
MaxNumberOfMessages=self.batchsize, | ||
WaitTimeSeconds=LONG_POLLING_INTERVAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to keep the LONG_POLLING_INTERVAL
. That means it could take 20 seconds after writing an item to SQS to pick it up. It would be nice if this was configurable or if the wait time was 0 if batchsize = 1. It's more API calls but I don't think that matters much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already an issue #60 and yes maybe we can make it configurable, but this would generally apply to SimpleProcessWorker
or ReadWorker
, so maybe we can deal with it in a separate PR.
if self.messages_processed \ | ||
>= self._messages_to_process_before_shutdown: | ||
self.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You asked about whether or not we want to keep messages_processed
. What are the shutdown implications if we don't? I'm not worried about the memory from keeping a counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hindsight, it's probably safer to keep, so unless we all strongly disagree, we will most likely just keep it, but I wanted it to surface.
@nadlerjessie regarding your point about Decision 3 and the below comment. I was thinking of this for situations where we may have a moderate visibility timeout, lets say 20 mins, and max receives > 1. If something fails quickly, but has the chance of succeeding again if retried (let's say Databricks was busy, but 10 mins later it could be fine), do we want to wait the 20 mins to retry that message or make it immediately visible. I think this is an edge case, and interesting to think about, but maybe the safest thing we can do is do nothing with it. I maybe underestimating the complexity of making it immediately visible and taking that ownership away from the engineer and their understanding of their application and setting appropriate visibility timeouts. So not opposed to doing nothing for this.
|
As an update, here is where we landed on the decisions: Decisions
|
…visibility timeout info
…Worker methods it doesn't need
@jimjshields @nadlerjessie @spulec I finished the refactoring work and adding tests to this PR. This is ready for a full review. I will update the |
2f8b820
to
b8d63ee
Compare
8b4c6a2
to
bcfdcc1
Compare
About
SimpleProcessWorker
andSimpleManagerWorker
ReadWorker
andProcessWorker
into theSimpleProcessWorker
simple-worker
flag to use the newSimpleManagerWorker
concurrency
,batchsize
, andinterval
; removesprefetch-multiplier
support when usingSimpleManagerWorker
SimpleProcessWorker
does not lose any throughput when compared toProcessWorker
To Do
ReadWorker
,ProcessWorker
andSimpleProcessWorker
by subclassing from a common parentSimpleManagerWorker
andMangerWorker
by subclassing from a common parentDecisions
SimpleProcessWorker
, do we want to keep track of number ofmessages_processed
due to memory implications, is this relevant still?messages_processed
?read_message
inSimpleProcessWorker
break or not do the visibility timeout check here at all since we are doing it inprocess_message
?batchsize
inSimpleProcessWorker
?SimpleManagerWorker
?concurrency
forSimpleManagerWorker
?