-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
app: generic approach to prevent addition of duplicated api jobs #975
Conversation
We also need this for the case where we have a very large number of either sources or submissions: any remaining jobs in the queue when the next sync kicks off with the current logic get added again, resulting in the unnecessary repeated redownload of items, potentially several times (for 15 second sync time and 1000 sources with 4 items per source conversation, this happened multiple times). |
3ae6d99
to
e214328
Compare
@creviera in standup the other day made an important point (h/t to @rmol for pointing me to the relevant bit of code after standup) - for download jobs, the actual downloads won’t happen due to this logic, although the download jobs do get added (what I was seeing in my comment above). As such, I was considering whether I should close or defer this PR, even though imho having duplicate jobs like this in the network queue is not great design, but before doing so I thought I’d do a quick investigation to determine - just how many unnecessary jobs will get added and how long does it take for them to get cleared out after initial sync in a realistic scenario? For 1000 sources with 2 submissions and 2 replies each, here’s what happens (note this is not over tor and this is using the 0.1.6 tag):
So the queue was blocked to message/reply download for ~5 minutes (in my test, 21220 unnecessary jobs were processed) while all these duplicated jobs were cleared out. The length of time that the queue is blocked to message/reply download will be longer if there were many more messages/replies, or if the time to download an object is longer as the number of jobs processed per sync period is lower, so more duplicate jobs will be added, which would be true if this were happening over tor. As such, we should still enforce the constraint of disallowing duplicate jobs else legitimate message/reply downloads can be significant delayed (potentially many minutes) due to duplicate jobs consuming resources. For the interested observer if you want to repeat this test I used this diff: diff --git a/securedrop_client/api_jobs/downloads.py b/securedrop_client/api_jobs/downloads.py
index aab51d6..930491e 100644
--- a/securedrop_client/api_jobs/downloads.py
+++ b/securedrop_client/api_jobs/downloads.py
@@ -123,9 +123,11 @@ class DownloadJob(ApiJob):
db_object = self.get_db_object(session)
if db_object.is_decrypted:
+ logger.debug('OBJECT {} IS DECRYPTED, SKIPPING'.format(db_object.uuid))
return db_object.uuid
if db_object.is_downloaded:
+ logger.debug('OBJECT {} IS DOWNLOADED, SKIPPING'.format(db_object.uuid))
self._decrypt(db_object.location(self.data_dir), db_object, session)
return db_object.uuid |
a60bf89
to
8e65cec
Compare
Given the potential for negative side effects of duplicate jobs on instances with large source counts, we're still treating this as a stretch goal for the 0.2.0 release during the 4/22-5/6 sprint, with @redshiftzero continuing to lead improvement of this PR. |
8a3f0d5
to
c4ded3b
Compare
(This is ready for review, but since it's a larger change, we should release 0.2.0 first) |
^ Per the above, priority is on release (e.g. fixes for any issues discovered in QA); otherwise we've committed to a time-boxed (4 hour) review of this PR during the 5/20-6/3 sprint, which may or may not result in it getting merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did a first pass. plan to test and review more later after initial discussions begin.
UUID of the item (source, reply, submission, etc.) that this item | ||
corresponds to. We track this to prevent the addition of duplicate jobs. | ||
''' | ||
self.uuid = uuid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see, since we have an api job that performs a task on many objects, i.e. MetadataSyncJob
, which gets all remote source data, we can't just extend ApiJob
to have a uuid. the rest of our jobs are "single object" jobs, so they'll each have a uuid associated with them. this does get me thinking about batch jobs and checking for duplicate lists of uuids to prevent duplicate batch jobs, but we don't have to worry about this yet.
also, since this change prevents any SingleObjectApiJob from entering the queue if we're already performing an operation on that object, i.e. if a job with that object's uuid is already in the queue, we have to make sure we only ever derive from this class if we are sure there are no other type of operations that could happen on the same object simultaneously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add log lines to where we make sure we don't make an api call for jobs that have already been download and decrypted:
securedrop-client/securedrop_client/api_jobs/downloads.py
Lines 125 to 130 in 409db11
if db_object.is_decrypted: | |
return db_object.uuid | |
if db_object.is_downloaded: | |
self._decrypt(db_object.location(self.data_dir), db_object, session) | |
return db_object.uuid |
This will be useful for debugging #1095 later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
ensures that duplicate jobs of the same type with the same uuid attribute cannot be added to the queue. this commit also updates mypy such that we can use ignore[override] Thread behavior on master - Sync thread A: A1: Sync A2: Signal thread B to run B1 […A1 runs on schedule every 15 seconds…] GUI thread B: B1: Add jobs for any items that are not downloaded by calling `ApiJobQueue.enqueue` which calls `RunnableQueue.add_job` queue thread C: C1: Wait until we have at least one job in the queue, and then grab the highest priority job in the queue (`RunnableQueue.queue.get`) C2: If timeouts keep happening, add job again (`RunnableQueue._re_add_job` which calls `RunnableQueue.queue.put_nowait`) Explanation - While we are accessing the queue state from multiple threads (either B or C above), that’s OK because we’re using Python’s synchronized queue class. If we want to prevent duplicate jobs however, we need to ensure that two threads can’t add the same job, even if both threads try to add the same job at the same time. In this diff, that is done by holding a mutex when we read or mutate the queue state. Here the queue state means: * `RunnableQueue.queue.queue` (“yo dawg...") which has a list of `(priority, job)` tuples containing the jobs that are _not_ in progress, * the job that’s in progress which is kept on `RunnableQueue.current_job`. We hold this mutex when we call `RunnableQueue.add_job` / `RunnableQueue._re_add_job` and when we update `RunnableQueue.current_job`.
5e60c0d
to
f9e85f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Description
This is a generic solution for preventing the addition of duplicate jobs that we need for increasing the sync speed later, ref #738.
The basic idea is:
SingleObjectApiJob
which has an attributeuuid
which is the UUID that should be unique for that job: for source-related jobs like the source deletion job it's the source UUID, for the reply download job it's the reply UUID, and so on.Test Plan
NUM_SOURCES=200 make dev
(this is basically a number of sources that is large enough such that it will take more than 15 seconds to download all messages/replies, and duplicate jobs will be attempted to be added in the next 15 second sync period)LOGLEVEL=DEBUG ./run.sh
).on
master
Monitor the logs, you should see
Added MessageDownloadJob('31ba1bc9-6687-4111-9826-776be899f45a', 5) to queue
. During the next sync period, a bunch moreAdded MessageDownloadJob
will occur for UUIDs that have already been added.on this branch
Monitor the logs, you should not see duplicate jobs get added. You'll see:
Added MessageDownloadJob('31ba1bc9-6687-4111-9826-776be899f45a', 5) to queue
during the first sync, and then the next time that UUID appears, it should either be the download processing ORDuplicate job MessageDownloadJob('31ba1bc9-6687-4111-9826-776be899f45a', 5), skipping
.Checklist
If these changes modify code paths involving cryptography, the opening of files in VMs or network (via the RPC service) traffic, Qubes testing in the staging environment is required. For fine tuning of the graphical user interface, testing in any environment in Qubes is required. Please check as applicable:
If these changes add or remove files other than client code, packaging logic (e.g., the AppArmor profile) may need to be updated. Please check as applicable: