Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/simple process worker #76

Merged
merged 26 commits into from
May 14, 2021

Conversation

psarma89
Copy link
Contributor

@psarma89 psarma89 commented Apr 7, 2021

About

  • This PR creates a SimpleProcessWorker and SimpleManagerWorker
  • Removes the internal queue
  • Combines the functionality of the ReadWorker and ProcessWorker into the SimpleProcessWorker
  • Adds a simple-worker flag to use the new SimpleManagerWorker
  • Keeps support for multiple queues, concurrency, batchsize, and interval; removes prefetch-multiplier support when using SimpleManagerWorker
  • Conducted throughput testing to make sure that SimpleProcessWorker does not lose any throughput when compared to ProcessWorker

To Do

  • See if it makes sense to make things more DRY between ReadWorker, ProcessWorker and SimpleProcessWorker by subclassing from a common parent
  • See if it makes sense to make things more DRY between SimpleManagerWorker and MangerWorker by subclassing from a common parent
  • Add tests for new classes
  • Update README

Decisions

  • Decision 1: For SimpleProcessWorker, do we want to keep track of number of messages_processed due to memory implications, is this relevant still?
    • Decision 1a: If we keep this, do we count a message grabbed and discarded due to visibility timeout as a going against the count, i.e messages_processed?
  • Decision 2: Should read_message in SimpleProcessWorker break or not do the visibility timeout check here at all since we are doing it in process_message?
  • Decision 3: Should we immediately make messages visible that have failed, but have a long visibility timeout?
  • Decision 4: Do we support batchsize in SimpleProcessWorker?
  • Decision 5: Support multiple sqs queues for SimpleManagerWorker?
  • Decision 6: Support concurrency for SimpleManagerWorker?

@psarma89
Copy link
Contributor Author

psarma89 commented Apr 7, 2021

@spulec @jimjshields @nadlerjessie Curious on your thoughts for the different decision points we have to finish SimpleProcessWorker and general review of this PR.

@coveralls
Copy link

coveralls commented Apr 7, 2021

Coverage Status

Coverage decreased (-0.1%) to 97.531% when pulling bcfdcc1 on psarma89:feature/simple-process-worker into d0c8e3d on spulec:master.

Copy link

@nadlerjessie nadlerjessie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm excited about this @psarma89!! Overall this looks good, I agree there's an opportunity to refactor to avoid repeated code. My biggest sticking point is the idea of discarding messages because they exceed the visibility timeout. I think it will cause more trouble than it's worth. Removing batch size should remove the potential gotcha - I don't have a huge need for batch size, I don't know about others.

I left comments inline which address some of your questions but to be explicit (answers in bold):

  • Decision 1: For SimpleProcessWorker, do we want to keep track of number of messages_processed due to memory implications, is this relevant still? I don't see the harm in keeping it
  • Decision 1a: If we keep this, do we count a message grabbed and discarded due to visibility timeout as a going against the count, i.e messages_processed? I don't think we should discard messages due to visibility timeout, more comments in PR
  • Decision 2: Should read_message in SimpleProcessWorker break or not do the visibility timeout check here at all since we are doing it in process_message? Same as above
  • Decision 3: Should we immediately make messages visible that have failed, but have a long visibility timeout? **I don't understand the question. Can you give an example?**
  • Decision 4: Do we support batchsize in SimpleProcessWorker? Would rather concurrency over batchsize. That might address some of my visibility timeout concerns too
  • Decision 5: Support multiple sqs queues for SimpleManagerWorker? I don't think it's necessary
  • Decision 6: Support concurrency for SimpleManagerWorker? I think this it's nice to keep it

pyqs/worker.py Outdated
Comment on lines 318 to 327
if int(end - start) >= self.visibility_timeout:
# Don't add any more messages since they have
# re-appeared in the sqs queue Instead just reset and get
# fresh messages from the sqs queue
msg = (
"Clearing Local messages since we exceeded "
"their visibility_timeout"
)
logger.warning(msg)
break

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you asked about wanting both checks but I'm wondering - do we want either of them? My understanding about the initial motivation was that we wanted to avoid processing messages multiple times. The internal queue made it more likely that the message was pulled and the visibility timeout expired before we processed it. @spulec could speak to that more.

It's still possible to execute a message twice. Let's we have a batchsize of 2, a vis timeout of 60 seconds, and the first message takes 65 seconds. Message 2 could exceed the vis timeout without ever running it.

However I don't know if the risk of running it multiple times is worse than the risk of pulling a message and never executing it. If we have a max receives of 1, that means message number 2 ends up in the DLQ without ever running it. IMO this is the largest pain point with pyqs today.

If we want to keep it at least now there's a way around it that is not available with the internal queue but it still sets up a potential trap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my understanding as well. It is there to avoid double processing a message. Would this not be relevant for example in sending email campaigns in clementine or are there application checks to avoid it. I am OK without doing this check and adding better documentation to explain the implications, and putting the onus on the user.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most apps that have max receives > 1 should have logic to address if a message gets picked up again. I suppose it's possible the checks wouldn't capture if two are picked up at the exact same time. I think an at least once pattern is preferable to what we have here, which would lead to messages never being processed, but I don't know if that's true for all applications.

The real issue here is batchsize. If we remove batchsize I don't think this check does anything (we read the message and immediately start processing it). Then can safely remove these checks and the possibility of something going straight to the DLQ without ever executing the task.

messages = self._get_connection().receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.batchsize,
WaitTimeSeconds=LONG_POLLING_INTERVAL,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep the LONG_POLLING_INTERVAL. That means it could take 20 seconds after writing an item to SQS to pick it up. It would be nice if this was configurable or if the wait time was 0 if batchsize = 1. It's more API calls but I don't think that matters much

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already an issue #60 and yes maybe we can make it configurable, but this would generally apply to SimpleProcessWorker or ReadWorker, so maybe we can deal with it in a separate PR.

Comment on lines +299 to +301
if self.messages_processed \
>= self._messages_to_process_before_shutdown:
self.shutdown()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You asked about whether or not we want to keep messages_processed. What are the shutdown implications if we don't? I'm not worried about the memory from keeping a counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, it's probably safer to keep, so unless we all strongly disagree, we will most likely just keep it, but I wanted it to surface.

@psarma89
Copy link
Contributor Author

psarma89 commented Apr 8, 2021

@nadlerjessie regarding your point about Decision 3 and the below comment. I was thinking of this for situations where we may have a moderate visibility timeout, lets say 20 mins, and max receives > 1. If something fails quickly, but has the chance of succeeding again if retried (let's say Databricks was busy, but 10 mins later it could be fine), do we want to wait the 20 mins to retry that message or make it immediately visible. I think this is an edge case, and interesting to think about, but maybe the safest thing we can do is do nothing with it. I maybe underestimating the complexity of making it immediately visible and taking that ownership away from the engineer and their understanding of their application and setting appropriate visibility timeouts. So not opposed to doing nothing for this.

Decision 3: Should we immediately make messages visible that have failed, but have a long visibility timeout? I don't understand the question. Can you give an example?

@psarma89
Copy link
Contributor Author

psarma89 commented Apr 9, 2021

As an update, here is where we landed on the decisions:

Decisions

  • Decision 1: For SimpleProcessWorker, do we want to keep track of number of messages_processed due to memory implications, is this relevant still?
    • We want to keep this as a best practice to prevent potential memory leaks in the worker since you can't control what might actually be run on the worker
    • Decision 1a: If we keep this, do we count a message grabbed and discarded due to visibility timeout as a going against the count, i.e messages_processed?
      • We decided to remove the check for visibility timeout, so this becomes irrelevant
      • Removing the visibility timeout check and allowing the worker to process the message even past its visibility timeout means we solve the problem of never processing something if max_receives=1 and we incorrectly set a shorter visibility timeout and exceed the visibility timeout
      • It increases the probability that we process a message more than once, especially if batchsize > 1, but this can be solved by the developer checking if the message has already been processed. It shifts the onus to the developer to account for this, which is better than trying to prevent this in the library
  • Decision 2: Should read_message in SimpleProcessWorker break or not do the visibility timeout check here at all since we are doing it in process_message?
    • See above explanation of Decision 1a, but this also becomes irrelevant
  • Decision 3: Should we immediately make messages visible that have failed, but have a long visibility timeout?
    • We defer addressing this concern at the moment, since it is the responsibility of the developer to understand their application code and set appropriate visibility timeouts
  • Decision 4: Do we support batchsize in SimpleProcessWorker?
    • We will continue to support batchsize but default to batchsize=1 when using the --simple-worker flag
    • Making batchsize 1 and not allowing it to be configurable means we are relying on the latency of sqs sending us the 1 message, and for fast messages, we would be increasing the latency disproportionately and reduce throughput
  • Decision 5: Support multiple sqs queues for SimpleManagerWorker?
    • We will continue to support multiple sqs queues
  • Decision 6: Support concurrency for SimpleManagerWorker?
    • We will continue to support concurrency
  • We will not make LONG_POLLING_INTERVAL configurable in this PR

psarma89 added 20 commits April 9, 2021 13:28
@psarma89
Copy link
Contributor Author

@jimjshields @nadlerjessie @spulec I finished the refactoring work and adding tests to this PR. This is ready for a full review. I will update the README once everyone has a chance to review. In the meantime, I will be running a subset of the throughput testing that I ran to confirm the behavior is the same.

@psarma89 psarma89 force-pushed the feature/simple-process-worker branch from 2f8b820 to b8d63ee Compare May 13, 2021 17:39
@psarma89 psarma89 force-pushed the feature/simple-process-worker branch from 8b4c6a2 to bcfdcc1 Compare May 14, 2021 18:24
@spulec spulec merged commit 44da356 into spulec:master May 14, 2021
@psarma89 psarma89 deleted the feature/simple-process-worker branch March 14, 2024 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants