You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I ran into an issue where a dramatiq worker takes a DataFrame to do some processing, and when more than one message is in the queue, the broker raises an exception because MessageProxy's __eq__ is comparing two messages containing the dataframe argument using == which Pandas doesn't like.
Reproducable example:
importpandasaspdimportdramatiqdramatiq.set_encoder(dramatiq.PickleEncoder())
@dramatiq.actordefbuggy(df: pd.DataFrame):
passif__name__=="__main__":
for_inrange(10): # the bug does not appear if only one message is sentbuggy.send(pd.DataFrame([[1,2,3],[4,5,6]], [1,2]))
Starting workers with dramatiq -t 1 -p 2 bug (to avoid out of memory issues) and the main program with python bug.py results in the following exception
[2024-06-22 23:50:51,477] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [CRITICAL] Consumer encountered an unexpected error.
Traceback (most recent call last):
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 267, in run
self.handle_message(message)
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 328, in handle_message
self.work_queue.put((actor.priority, message))
File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 150, in put
self._put(item)
File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 236, in _put
heappush(self.queue, item)
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\broker.py", line 394, in __eq__
return self._message == other._message
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 4, in __eq__
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\pandas\core\generic.py", line 1527, in __nonzero__
raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
[2024-06-22 23:50:51,480] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [INFO] Restarting consumer in 3.00 seconds.
Immediately followed by
Exception in thread Thread-4:
Traceback (most recent call last):
File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
self.run()
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 462, in run
self.process_message(message)
File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 525, in process_message
self.work_queue.task_done()
File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 75, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
What did you expect would happen?
I expected dramatiq to not raise an exception
The text was updated successfully, but these errors were encountered:
Not really solving your issue, but something we did that might work as a workaround is that we don't pass Dataframes directly to Dramatiq (partially because my use case has massive Dataframes), but persist what we need to the DB, then only pass an id for a row and fetch it within the worker.
In this example, I’m using PickleEncoder. IMHO, the problem lies within the broker that doesn’t handle exceptions when comparing two messages for equality.
Even when doing something stupid like in this example, it should either handle it gracefully, or fail right away. As it currently is implemented, it seems to work at first, until more than one message is sent.
What OS are you using?
Windows 10
What version of Dramatiq are you using?
Version: 1.17.0
What did you do?
I ran into an issue where a dramatiq worker takes a DataFrame to do some processing, and when more than one message is in the queue, the broker raises an exception because MessageProxy's
__eq__
is comparing two messages containing the dataframe argument using==
which Pandas doesn't like.Reproducable example:
Starting workers with
dramatiq -t 1 -p 2 bug
(to avoid out of memory issues) and the main program withpython bug.py
results in the following exceptionImmediately followed by
What did you expect would happen?
I expected dramatiq to not raise an exception
The text was updated successfully, but these errors were encountered: