Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add meaningful error for out of disk exception during write
Browse files Browse the repository at this point in the history
hendrikmakait committed Oct 8, 2024
1 parent 36020d6 commit 1ad108c
Showing 5 changed files with 71 additions and 7 deletions.
10 changes: 9 additions & 1 deletion distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,11 @@
from distributed.protocol.serialize import ToPickle
from distributed.shuffle._comms import CommShardsBuffer
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError
from distributed.shuffle._exceptions import (
P2PConsistencyError,
P2POutOfDiskError,
ShuffleClosedError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._memory import MemoryShardsBuffer
from distributed.utils import run_in_executor_with_context, sync
@@ -508,6 +512,8 @@ def handle_transfer_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e

@@ -522,6 +528,8 @@ def handle_unpack_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during unpack phase") from e

18 changes: 14 additions & 4 deletions distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import errno
import pathlib
import shutil
import threading
@@ -12,7 +13,7 @@

from distributed.metrics import context_meter, thread_time
from distributed.shuffle._buffer import ShardsBuffer
from distributed.shuffle._exceptions import DataUnavailable
from distributed.shuffle._exceptions import DataUnavailable, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import pickle_bytelist
from distributed.utils import Deadline, empty_context, log_errors, nbytes
@@ -177,12 +178,21 @@ async def _process(self, id: str, shards: list[Any]) -> None:
if self._closed:
raise RuntimeError("Already closed")

with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

try:
self._write_frames(frames, id)
except OSError as e:
if e.errno == errno.ENOSPC:
raise P2POutOfDiskError from e
raise
context_meter.digest_metric("disk-write", 1, "count")
context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes")

def _write_frames(
self, frames: Iterable[bytes | bytearray | memoryview], id: str
) -> None:
with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

def read(self, id: str) -> Any:
"""Read a complete file back into memory"""
self.raise_on_exception()
9 changes: 9 additions & 0 deletions distributed/shuffle/_exceptions.py
Original file line number Diff line number Diff line change
@@ -15,3 +15,12 @@ class ShuffleClosedError(P2PConsistencyError):

class DataUnavailable(Exception):
"""Raised when data is not available in the buffer"""


class P2POutOfDiskError(OSError):
def __str__(self) -> str:
return (
"P2P ran out of available disk space while temporarily storing transferred data. "
"Please make sure that P2P has enough disk space available by increasing the number of "
"workers or the size of the attached disk."
)
8 changes: 7 additions & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,11 @@
handle_transfer_errors,
handle_unpack_errors,
)
from distributed.shuffle._exceptions import DataUnavailable, P2PConsistencyError
from distributed.shuffle._exceptions import (
DataUnavailable,
P2PConsistencyError,
P2POutOfDiskError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
from distributed.sizeof import sizeof
@@ -107,6 +111,8 @@ def shuffle_barrier(id: ShuffleId, run_ids: list[int]) -> int:
raise e
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e

33 changes: 32 additions & 1 deletion distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

import asyncio
import contextlib
import errno
import itertools
import logging
import os
@@ -21,6 +22,7 @@
from dask.utils import key_split

from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.worker import Status

np = pytest.importorskip("numpy")
@@ -47,7 +49,7 @@
read_from_disk,
serialize_table,
)
from distributed.shuffle._exceptions import P2PConsistencyError
from distributed.shuffle._exceptions import P2PConsistencyError, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
from distributed.shuffle._shuffle import (
@@ -2039,6 +2041,35 @@ async def _receive(self, data: list[tuple[int, bytes]]) -> None:
await asyncio.gather(*[s.close() for s in [sA, sB]])


@gen_cluster(client=True)
async def test_meaningful_out_of_disk_error(c, s, a, b):
class OutOfDiskShardsBuffer(DiskShardsBuffer):
def _write_frames(self, frames, id):
code = errno.ENOSPC
raise OSError(code, os.strerror(code))

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True}
):
shuffled = df.shuffle("x", npartitions=10)
with pytest.raises(P2POutOfDiskError, match="out of available disk space"):
with mock.patch(
"distributed.shuffle._core.DiskShardsBuffer",
OutOfDiskShardsBuffer,
):
await c.compute(shuffled)
await assert_worker_cleanup(a)
await assert_worker_cleanup(b)
await c.close()
await assert_scheduler_cleanup(s)


class BlockedShuffleReceiveShuffleWorkerPlugin(ShuffleWorkerPlugin):
def setup(self, worker: Worker) -> None:
super().setup(worker)

0 comments on commit 1ad108c

Please sign in to comment.