Description
If you have two (jsonl) files where one contains columns {"id", "text"}
and the other contains {"text", "id", "meta"}
and you wish to read the two files using dd.read_json([file1.jsonl, file2.jsonl], lines=True)
we run into an error
Metadata mismatch found in `from_delayed`.
Partition type: `pandas.core.frame.DataFrame`
(or it is Partition type: `cudf.core.dataframe.DataFrame` when backend=='cudf')
+---------+-------+----------+
| Column | Found | Expected |
+---------+-------+----------+
| 'meta1' | - | object |
+---------+-------+----------+
For what it's worth this isn't an issue in read_parquet (cpu) and for gpu the fix is in the works https://github.com/rapidsai/cudf/pull/17554/files
Guessing the rootcause
IIUC in both pandas and cudf, we call read_json_file
(here).
In the pandas case, even if dtype
is specified, pandas doesn't prune out the non-specified columns, while cudf does (assuming prune_columns=True). Therefore the pandas case continues to fail, while cudf
case fails on a column order vs metadata column order mismatch error (since one file has id, text
, while the other has text, id
.
One possible hack could be supporting columns
arg and then performing engine(.....)[columns]
. Another could be
MRE
import dask.dataframe as dd
import dask
import tempfile
import pandas as pd
import os
records = [
{"id": 123, "text": "foo"},
{
"text": "bar",
"meta1": [{"field1": "cat"}],
"id": 456,
},
]
columns = ["text", "id"]
with tempfile.TemporaryDirectory() as tmpdir:
file1 = os.path.join(tmpdir, "part.0.jsonl")
file2 = os.path.join(tmpdir, "part.1.jsonl")
pd.DataFrame(records[:1]).to_json(file1, orient="records", lines=True)
pd.DataFrame(records[1:]).to_json(file2, orient="records", lines=True)
for backend in ["pandas", "cudf"]:
read_kwargs = dict()
if backend == "cudf":
read_kwargs["dtype"] = {"id": "str", "text": "str"}
read_kwargs["prune_columns"] = True
print("="*30)
print(f"==== {backend=} ====")
print("="*30)
try:
with dask.config.set({"dataframe.backend": backend}):
df = dd.read_json(
[file1, file2],
lines=True,
**read_kwargs,
)
print(f"{df.columns=}")
print(f"{df.compute().columns=}")
print(f"{type(df.compute())=}")
display((df.compute()))
except Exception as e:
print(f"{backend=} failed due to {e} \n")
cc @rjzamora