diff --git a/docs/release_notes/release_notes-0.15.0.rst b/docs/release_notes/release_notes-0.15.0.rst index 67d4718a2d4..606759add12 100644 --- a/docs/release_notes/release_notes-0.15.0.rst +++ b/docs/release_notes/release_notes-0.15.0.rst @@ -35,6 +35,7 @@ Key Features and Updates * * Developer API enhancements * FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360) + * FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484) * Update testing suite * TEST-#4363: Use Ray from pypi in CI (#4364) * FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index bcba828f42b..981bb7e9b86 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,6 +42,7 @@ from modin.core.execution.ray.common import RayTask, SignalActor from ..dataframe import PandasOnRayDataframe from ..partitioning import PandasOnRayDataframePartition +from modin.core.io.utils import is_local_path class PandasOnRayIO(RayIO): @@ -165,6 +166,18 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) + if len(ray.nodes()) > 1 and ( + not isinstance(kwargs["path_or_buf"], str) + or is_local_path(kwargs["path_or_buf"]) + ): + from modin.error_message import ErrorMessage + + ErrorMessage.single_warning( + "`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) " + + "when in cluster mode. Defaulting to pandas for `to_csv`" + ) + return RayIO.to_csv(qc, **kwargs) + signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) def func(df, **kw): @@ -277,6 +290,18 @@ def to_parquet(cls, qc, **kwargs): if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) + if len(ray.nodes()) > 1 and ( + not isinstance(kwargs["path_or_buf"], str) + or is_local_path(kwargs["path_or_buf"]) + ): + from modin.error_message import ErrorMessage + + ErrorMessage.single_warning( + "`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) " + + "when in cluster mode. Defaulting to pandas for `to_parquet`" + ) + return RayIO.to_parquet(qc, **kwargs) + def func(df, **kw): """ Dump a chunk of rows as parquet, then save them to target maintaining order. diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py new file mode 100644 index 00000000000..4e76594161b --- /dev/null +++ b/modin/core/io/utils.py @@ -0,0 +1,68 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Collection of utility functions for distributed io.""" + +import os +import re +import fsspec + +IS_FILE_ONLY_REGEX = re.compile(f"[^\\{os.sep}]*\.\w+") # noqa: W605 + + +def is_local_path(path) -> bool: + """ + Return ``True`` if the specified `path` is a local path, ``False`` otherwise. + + Parameters + ---------- + path : str, path object or file-like object + The path to check. + + Returns + ------- + Whether the `path` points to a local file. + + Notes + ----- + If the filesystem corresponds to a `ZipFileSystem`, `TarFileSystem` or `CachingFileSystem`, + this code will return `False` even if it is local. + """ + try: + if IS_FILE_ONLY_REGEX.match(path) is not None: + # If we are passed just a filename, we will perform our check on the current working + # directory. + parent_dir = os.getcwd() + else: + # If we are passed a full path, we want to remove the filename from it. + parent_dir = os.sep.join(path.split(os.sep)[:-1]) + fs = fsspec.core.url_to_fs(parent_dir)[0] # Grab just the FileSystem object + if hasattr( + fs, "local_file" + ): # If the FS does not have the `local_file` attr, it is not local. + # We still need to check that it is not a mounted file - as fsspec treats mounted + # files the same as local ones, but we want to distinguish between local and mounted. + if os.name == "nt" and parent_dir[:3] == "D:\\": + # In Windows, os.path.abspath(os.sep) will give us the C Drive, but we want the + # D drive to also be marked as local. + local_device_id = os.stat("D:\\") + else: + local_device_id = os.stat(os.path.abspath(os.sep)).st_dev + path_device_id = os.stat(parent_dir).st_dev + return path_device_id == local_device_id + return False + except Exception: + # If an exception is raised, it means we tried to open a filesystem that requires additional + # dependencies. This means that it is definitely not a local filesystem, so we can return + # `False` here. + return False diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 3348c07e244..ac9b522e630 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -36,6 +36,7 @@ from modin.utils import to_pandas from modin.pandas.utils import from_arrow from modin.test.test_utils import warns_that_defaulting_to_pandas +from modin.core.io.utils import is_local_path import pyarrow as pa import os from scipy import sparse @@ -2378,3 +2379,25 @@ def test_to_period(): ) modin_df, pandas_df = create_test_dfs(TEST_DATA, index=index) df_equals(modin_df.to_period(), pandas_df.to_period()) + + +def test_is_local_path(): + s3_path = "s3://modin-example-bucket/modin-example-file" + assert not is_local_path(s3_path), "S3 Path incorrectly flagged as local!" + azure_blob_path = "https://modin-example-storage-account.blob.core.windows.net/modin-example-container/modin-example-file" + assert not is_local_path( + azure_blob_path + ), "Azure Blob Storage Path incorrectly flagged as local!" + gcs_path = "gs://modin-example-bucket/modin-example-file" + assert not is_local_path(gcs_path), "GCS Path incorrectly flagged as local!" + assert is_local_path( + os.getcwd() + ), "Current Working Directory incorrectly flagged as not local!" + new_file = os.getcwd() + "/modin-example-file.extension" + assert is_local_path( + new_file + ), "Non-existent file under current working directory incorrectly flagged as not local!" + new_file_in_curr_dir = "modin-example-file.extension" + assert is_local_path( + new_file_in_curr_dir, + ), "Non-existent file without absolute path incorrectly flagged as not local!"