[Data] Incorrect StageSummaryStats
execution time calculated #37105
Closed
Description
What happened + What you expected to happen
The stats for a Dataset generated from Read->SplitBlocks(n)->MapBatches
contains an incorrectly duplicated execution time summary; see example below.
Initial hypothesis is that this is caused from inheriting incorrect stats information during operator fusion / stats generation.
Versions / Dependencies
ray master
Reproduction script
import ray
import tensorflow as tf
import numpy as np
import random
import pyarrow as pa
import tempfile
import os
import sleep
def generate_random_tfrecords(
num_rows: int,
num_int: int,
) -> str:
def generate_features(batch):
batch_size = len(batch["id"])
features = {"int_features": []}
lower_bound = -(2**32)
upper_bound = 2**32
for _ in range(batch_size):
if num_int > 0:
int_features = [
random.randint(lower_bound, upper_bound) for _ in range(num_int)
]
features["int_features"].append(int_features)
features = {k: v for (k, v) in features.items() if len(v) > 0}
return pa.table(features)
ds = ray.data.range(num_rows).map_batches(generate_features)
assert ds.count() == num_rows, ds.count()
tfrecords_dir = tempfile.mkdtemp()
ds.write_tfrecords(tfrecords_dir)
return tfrecords_dir
file_path = generate_random_tfrecords(
num_rows=10000,
num_int=1000,
)
print("===> created data file at", file_path)
records = ray.data.read_tfrecords(paths=file_path)
def fn(batch):
time.sleep(5)
return batch
processed = records.map_batches(fn)
num_batches = 0
for batch in processed.iter_batches():
num_batches += 1
print(f"===> Iterated over {num_batches} batches")
stats = processed.stats()
print(stats)
The block execution summary string (first line) is duplicated incorrectly for the two stages:
Stage 1 ReadTFRecord->SplitBlocks(4): 80/80 blocks executed in 1.92s
* Remote wall time: 160.21us min, 1.88s max, 413.09ms mean, 33.05s total
* Remote cpu time: 160.0us min, 1.18s max, 285.51ms mean, 22.84s total
* Peak heap memory usage (MiB): 467359.38 min, 485484.38 max, 472517 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1000500 min, 1000500 max, 1000500 mean, 80040000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 40020000, 'obj_store_mem_freed': 37572238, 'obj_store_mem_peak': 38790839, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
Stage 2 MapBatches(fn): 80/80 blocks executed in 1.92s
* Remote wall time: 798.54us min, 4.46ms max, 1.23ms mean, 98.51ms total
* Remote cpu time: 798.0us min, 2.37ms max, 1.14ms mean, 90.86ms total
* Peak heap memory usage (MiB): 467359.38 min, 487828.12 max, 474389 mean
* Output num rows: 125 min, 125 max, 125 mean, 10000 total
* Output size bytes: 1002000 min, 1002000 max, 1002000 mean, 80160000 total
* Tasks per node: 80 min, 80 max, 80 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 5010000, 'obj_store_mem_freed': 5002500, 'obj_store_mem_peak': 10005000, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
Dataset iterator time breakdown:
* Total time user code is blocked: 2.32s
* Total time in user code: 3.48ms
* Total time overall: 4.24s
* Num blocks local: 80
* Num blocks remote: 0
* Num blocks unknown location: 0
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): 248.0us min, 3.57ms max, 633.74us avg, 50.7ms total
* In batch creation: 103.5us min, 2.87ms max, 974.33us avg, 38.97ms total
* In batch formatting: 977.21us min, 19.83ms max, 11.47ms avg, 458.71ms total
We should expect Stage 2 to take close to 5 seconds, due to the time.sleep(5)
in the mapped function. However, we see the same time as Stage 1.
Issue Severity
None