Skip to content

[Data] Incorrect StageSummaryStats execution time calculated #37105

Closed
@scottjlee

Description

What happened + What you expected to happen

The stats for a Dataset generated from Read->SplitBlocks(n)->MapBatches contains an incorrectly duplicated execution time summary; see example below.

Initial hypothesis is that this is caused from inheriting incorrect stats information during operator fusion / stats generation.

Versions / Dependencies

ray master

Reproduction script

import ray
import tensorflow as tf
import numpy as np
import random
import pyarrow as pa
import tempfile
import os
import sleep

def generate_random_tfrecords(
    num_rows: int,
    num_int: int,
) -> str:
    def generate_features(batch):
        batch_size = len(batch["id"])
        features = {"int_features": []}
        lower_bound = -(2**32)
        upper_bound = 2**32
        for _ in range(batch_size):
            if num_int > 0:
                int_features = [
                    random.randint(lower_bound, upper_bound) for _ in range(num_int)
                ]
                features["int_features"].append(int_features)
        features = {k: v for (k, v) in features.items() if len(v) > 0}
        return pa.table(features)

    ds = ray.data.range(num_rows).map_batches(generate_features)
    assert ds.count() == num_rows, ds.count()

    tfrecords_dir = tempfile.mkdtemp()
    ds.write_tfrecords(tfrecords_dir)
    return tfrecords_dir


file_path = generate_random_tfrecords(
    num_rows=10000,
    num_int=1000,
)
print("===> created data file at", file_path)
records = ray.data.read_tfrecords(paths=file_path)

def fn(batch):
    time.sleep(5)
    return batch

processed = records.map_batches(fn)
num_batches = 0
for batch in processed.iter_batches():
    num_batches += 1
print(f"===> Iterated over {num_batches} batches")
stats = processed.stats()
print(stats)

The block execution summary string (first line) is duplicated incorrectly for the two stages:

Stage 1 ReadTFRecord->SplitBlocks(4): 80/80 blocks executed in 1.92s
* Remote wall time: 160.21us min, 1.88s max, 413.09ms mean, 33.05s total
* Remote cpu time: 160.0us min, 1.18s max, 285.51ms mean, 22.84s total
* Peak heap memory usage (MiB): 467359.38 min, 485484.38 max, 472517 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1000500 min, 1000500 max, 1000500 mean, 80040000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 40020000, 'obj_store_mem_freed': 37572238, 'obj_store_mem_peak': 38790839, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Stage 2 MapBatches(fn): 80/80 blocks executed in 1.92s
* Remote wall time: 798.54us min, 4.46ms max, 1.23ms mean, 98.51ms total
* Remote cpu time: 798.0us min, 2.37ms max, 1.14ms mean, 90.86ms total
* Peak heap memory usage (MiB): 467359.38 min, 487828.12 max, 474389 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1002000 min, 1002000 max, 1002000 mean, 80160000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 5010000, 'obj_store_mem_freed': 5002500, 'obj_store_mem_peak': 10005000, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Dataset iterator time breakdown:
* Total time user code is blocked: 2.32s
* Total time in user code: 3.48ms
* Total time overall: 4.24s
* Num blocks local: 80
* Num blocks remote: 0
* Num blocks unknown location: 0
* Batch iteration time breakdown (summed across prefetch threads):
    * In ray.get(): 248.0us min, 3.57ms max, 633.74us avg, 50.7ms total
    * In batch creation: 103.5us min, 2.87ms max, 974.33us avg, 38.97ms total
    * In batch formatting: 977.21us min, 19.83ms max, 11.47ms avg, 458.71ms total

We should expect Stage 2 to take close to 5 seconds, due to the time.sleep(5) in the mapped function. However, we see the same time as Stage 1.

Issue Severity

None

Metadata

Assignees

Labels

P0Issues that should be fixed in short orderRay-2.6bugSomething that is supposed to be working; but isn'tdataRay Data-related issuesrelease-blockerP0 Issue that blocks the release

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions