dask.delayed: Dataframe partition with map_overlap gets randomly converted to Pandas Dataframe or Pandas Series. #11200
Description
Describe the issue:
In recent versions of Dask, passing a Dataframe partition as an argument to a dask.delayed function gets randomly converted either into a Pandas DataFrame (expected), or a Pandas Series (not expected). This behavior is entirely non-reproducible, i.e. always happens randomly at different stages in a loop over partitions.
This seems to happen only under specific circumstances, one case beeing after applying two iterations of map_overlap (see below). I have, however, also seen it in situations without map_overlap. Remarkably, it also depends on the chunking of the iteration (happens for n_cores=3, apparently not for n_cores=2).
Minimal Complete Verifiable Example:
import numpy as np
import pandas as pd
import dask.dataframe as ddf
import dask
sample = np.array([np.arange(100000), 2 * np.arange(100000), 3 * np.arange(100000)]).T
columns = ["x", "y", "z"]
sample_pdf = pd.DataFrame(sample, columns=columns)
df = ddf.from_pandas(sample_pdf, npartitions=100)
n_cores=3
def test(part, columns):
col_id = [part.columns.get_loc(axis) for axis in columns]
return col_id
def forward_fill_partition(df):
df = df.ffill()
return df
for _ in range(2):
df = df.map_overlap(
forward_fill_partition,
before=2,
after=0,
)
for i in range(0, df.npartitions, n_cores):
print(i)
core_tasks = [] # Core-level jobs
for j in range(0, n_cores):
partition_index = i + j
if partition_index >= df.npartitions:
break
df_partition = df.get_partition(partition_index)
core_tasks.append(
dask.delayed(test)(
df_partition,
columns,
),
)
if len(core_tasks) > 0:
core_results = dask.compute(*core_tasks)
Example output:
0
3
6
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/tmp/ipykernel_19014/1120124549.py in ?()
44 col_id = [part.columns.get_loc(axis) for axis in columns]
45 return col_id
46
47 def forward_fill_partition(df):
---> 48 df = df.ffill()
49 return df
50
51 for _ in range(2):
/mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/dask/base.py in ?(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 keys.append(x.__dask_keys__())
659 postcomputes.append(x.__dask_postcompute__())
660
661 with shorten_traceback():
--> 662 results = schedule(dsk, keys, **kwargs)
663
664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
/tmp/ipykernel_19014/1120124549.py in ?(part, columns)
14 def test(part, columns):
---> 15 col_id = [part.columns.get_loc(axis) for axis in columns]
16 return col_id
/tmp/ipykernel_19014/1120124549.py in ?(.0)
---> 15 def test(part, columns):
16 col_id = [part.columns.get_loc(axis) for axis in columns]
17 return col_id
/mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/pandas/core/generic.py in ?(self, name)
6295 and name not in self._accessors
6296 and self._info_axis._can_hold_identifiers_and_holds_name(name)
6297 ):
6298 return self[name]
-> 6299 return object.__getattribute__(self, name)
AttributeError: 'Series' object has no attribute 'columns'
Anything else we need to know?:
This behavior is only seen in recent versions of dask >= 2024.3, in previous versions of dask this worked flawlessly. I suspect it to be related to the recently introduced query planning.
Environment:
- Dask version: 2024.6.2
- Python version: 3.9.19
- Operating System: Ubuntu Linux
- Install method (conda, pip, source): Python installed via anaconda, environment with virtualenv and pip