Skip to content

Supporting inconsistent schemas in read_json #11595

Open
@praateekmahajan

Description

If you have two (jsonl) files where one contains columns {"id", "text"} and the other contains {"text", "id", "meta"} and you wish to read the two files using dd.read_json([file1.jsonl, file2.jsonl], lines=True) we run into an error

Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame` 
(or it is Partition type: `cudf.core.dataframe.DataFrame` when backend=='cudf')
+---------+-------+----------+
| Column  | Found | Expected |
+---------+-------+----------+
| 'meta1' | -     | object   |
+---------+-------+----------+ 

For what it's worth this isn't an issue in read_parquet (cpu) and for gpu the fix is in the works https://github.com/rapidsai/cudf/pull/17554/files

Guessing the rootcause

IIUC in both pandas and cudf, we call read_json_file (here).

In the pandas case, even if dtype is specified, pandas doesn't prune out the non-specified columns, while cudf does (assuming prune_columns=True). Therefore the pandas case continues to fail, while cudf case fails on a column order vs metadata column order mismatch error (since one file has id, text, while the other has text, id.

One possible hack could be supporting columns arg and then performing engine(.....)[columns]. Another could be

MRE

import dask.dataframe as dd
import dask
import tempfile
import pandas as pd
import os

records = [
    {"id": 123, "text": "foo"},
    {
        "text": "bar",
        "meta1": [{"field1": "cat"}],
        "id": 456,
    },
]
columns = ["text", "id"]
with tempfile.TemporaryDirectory() as tmpdir:
    file1 = os.path.join(tmpdir, "part.0.jsonl")
    file2 = os.path.join(tmpdir, "part.1.jsonl")

    pd.DataFrame(records[:1]).to_json(file1, orient="records", lines=True)
    pd.DataFrame(records[1:]).to_json(file2, orient="records", lines=True)

    for backend in ["pandas", "cudf"]:
        read_kwargs = dict()
        if backend == "cudf":
            read_kwargs["dtype"] = {"id": "str", "text": "str"}
            read_kwargs["prune_columns"] = True
        print("="*30)
        print(f"==== {backend=} ====")
        print("="*30)
        try:
            with dask.config.set({"dataframe.backend": backend}):
                df = dd.read_json(
                    [file1, file2],
                    lines=True,
                    **read_kwargs,
                )
                print(f"{df.columns=}")
                print(f"{df.compute().columns=}")
                print(f"{type(df.compute())=}")
                display((df.compute()))

        except Exception as e:
            print(f"{backend=} failed due to {e} \n")

cc @rjzamora

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions