Skip to content

dask.delayed: Dataframe partition with map_overlap gets randomly converted to Pandas Dataframe or Pandas Series. #11200

Open
@rettigl

Description

Describe the issue:
In recent versions of Dask, passing a Dataframe partition as an argument to a dask.delayed function gets randomly converted either into a Pandas DataFrame (expected), or a Pandas Series (not expected). This behavior is entirely non-reproducible, i.e. always happens randomly at different stages in a loop over partitions.
This seems to happen only under specific circumstances, one case beeing after applying two iterations of map_overlap (see below). I have, however, also seen it in situations without map_overlap. Remarkably, it also depends on the chunking of the iteration (happens for n_cores=3, apparently not for n_cores=2).

Minimal Complete Verifiable Example:

import numpy as np
import pandas as pd
import dask.dataframe as ddf
import dask
sample = np.array([np.arange(100000), 2 * np.arange(100000), 3 * np.arange(100000)]).T
columns = ["x", "y", "z"]
sample_pdf = pd.DataFrame(sample, columns=columns)
df = ddf.from_pandas(sample_pdf, npartitions=100)

n_cores=3

def test(part, columns):
    col_id = [part.columns.get_loc(axis) for axis in columns]
    return col_id

def forward_fill_partition(df):
    df = df.ffill()
    return df

for _ in range(2):
    df = df.map_overlap(
        forward_fill_partition,
        before=2,
        after=0,
    )

for i in range(0, df.npartitions, n_cores):
    print(i)
    core_tasks = []  # Core-level jobs
    for j in range(0, n_cores):
        partition_index = i + j
        if partition_index >= df.npartitions:
            break

        df_partition = df.get_partition(partition_index)

        core_tasks.append(
            dask.delayed(test)(
                df_partition,
                columns,
            ),
        )

    if len(core_tasks) > 0:
        core_results = dask.compute(*core_tasks)

Example output:

0
3
6
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_19014/1120124549.py in ?()
     44     col_id = [part.columns.get_loc(axis) for axis in columns]
     45     return col_id
     46 
     47 def forward_fill_partition(df):
---> 48     df = df.ffill()
     49     return df
     50 
     51 for _ in range(2):

/mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/dask/base.py in ?(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658         keys.append(x.__dask_keys__())
    659         postcomputes.append(x.__dask_postcompute__())
    660 
    661     with shorten_traceback():
--> 662         results = schedule(dsk, keys, **kwargs)
    663 
    664     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

/tmp/ipykernel_19014/1120124549.py in ?(part, columns)
     14 def test(part, columns):
---> 15     col_id = [part.columns.get_loc(axis) for axis in columns]
     16     return col_id

/tmp/ipykernel_19014/1120124549.py in ?(.0)
---> 15 def test(part, columns):
     16     col_id = [part.columns.get_loc(axis) for axis in columns]
     17     return col_id

/mnt/pcshare/users/Laurenz/AreaB/sed/poetry_envs/virtualenvs/sed-processor-3qnpZCFI-py3.9/lib/python3.9/site-packages/pandas/core/generic.py in ?(self, name)
   6295             and name not in self._accessors
   6296             and self._info_axis._can_hold_identifiers_and_holds_name(name)
   6297         ):
   6298             return self[name]
-> 6299         return object.__getattribute__(self, name)

AttributeError: 'Series' object has no attribute 'columns'

Anything else we need to know?:
This behavior is only seen in recent versions of dask >= 2024.3, in previous versions of dask this worked flawlessly. I suspect it to be related to the recently introduced query planning.

Environment:

  • Dask version: 2024.6.2
  • Python version: 3.9.19
  • Operating System: Ubuntu Linux
  • Install method (conda, pip, source): Python installed via anaconda, environment with virtualenv and pip

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions