Skip to content

Commit

Permalink
Restore the old behaviour of auto-glob given prefix/folders in io ada…
Browse files Browse the repository at this point in the history
…ptors (v6d-io#1693)

To allow users use paths like `oss://a/b/c/` as the folder path to
reading multiple files.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Dec 25, 2023
1 parent e5995d3 commit 5b0ef1a
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 58 deletions.
5 changes: 2 additions & 3 deletions python/vineyard/drivers/io/adaptors/read_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import Dict

import fsspec
from fsspec.core import get_fs_token_paths
from fsspec.utils import read_block

import vineyard
Expand All @@ -42,7 +41,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc")

from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402

# Note [Semantic of read_block with delimiter]:
#
Expand Down Expand Up @@ -161,7 +160,7 @@ def read_bytes( # noqa: C901, pylint: disable=too-many-statements

try:
# files would be empty if it's a glob pattern and globbed nothing.
fs, _, files = get_fs_token_paths(path, storage_options=storage_options)
fs, _, files = infer_fsspec_paths(path, storage_options=storage_options)
except Exception: # pylint: disable=broad-except
report_error(
f"Cannot initialize such filesystem for '{path}', "
Expand Down
5 changes: 2 additions & 3 deletions python/vineyard/drivers/io/adaptors/read_bytes_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import Tuple # pylint: disable=unused-import

import fsspec
from fsspec.core import get_fs_token_paths
from fsspec.spec import AbstractFileSystem
from fsspec.utils import read_block

Expand All @@ -47,7 +46,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc")

from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402

CHUNK_SIZE = 1024 * 1024 * 128

Expand Down Expand Up @@ -178,7 +177,7 @@ def read_bytes_collection(
client = vineyard.connect(vineyard_socket)

# files would be empty if it's a glob pattern and globbed nothing.
fs, _, files = get_fs_token_paths(prefix, storage_options=storage_options)
fs, _, files = infer_fsspec_paths(prefix, storage_options=storage_options)
prefix_path = files[0]

worker_prefix = os.path.join(prefix_path, '%s-%s' % (proc_num, proc_index))
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/drivers/io/adaptors/read_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import cloudpickle
import fsspec
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard.data.utils import str_to_bool
Expand All @@ -42,6 +41,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def make_empty_batch(schema):
Expand Down Expand Up @@ -127,7 +127,7 @@ def read_bytes( # noqa: C901, pylint: disable=too-many-statements

try:
# files would be empty if it's a glob pattern and globbed nothing.
fs, _, files = get_fs_token_paths(path, storage_options=storage_options)
fs, _, files = infer_fsspec_paths(path, storage_options=storage_options)
except Exception: # pylint: disable=broad-except
report_error(
f"Cannot initialize such filesystem for '{path}', "
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/drivers/io/adaptors/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import cloudpickle
import fsspec
import fsspec.implementations.arrow
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard.data.utils import str_to_bool
Expand All @@ -43,6 +42,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def make_empty_batch(schema):
Expand Down Expand Up @@ -133,7 +133,7 @@ def read_bytes( # noqa: C901, pylint: disable=too-many-statements

try:
# files would be empty if it's a glob pattern and globbed nothing.
fs, _, files = get_fs_token_paths(path, storage_options=storage_options)
fs, _, files = infer_fsspec_paths(path, storage_options=storage_options)
except Exception: # pylint: disable=broad-except
report_error(
f"Cannot initialize such filesystem for '{path}', "
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/drivers/io/adaptors/write_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import sys

import fsspec
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard.io.byte import ByteStream
Expand All @@ -36,6 +35,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def write_bytes(
Expand Down Expand Up @@ -75,7 +75,7 @@ def write_bytes(
try:
reader = instream.open_reader(client)

fs, _, _ = get_fs_token_paths(
fs, _, _ = infer_fsspec_paths(
f"{path}_{proc_index}", storage_options=storage_options
)
if hasattr(fs, 'auto_mkdir'):
Expand Down
6 changes: 3 additions & 3 deletions python/vineyard/drivers/io/adaptors/write_bytes_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from typing import Dict

import fsspec
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard._C import ObjectID
Expand All @@ -45,6 +44,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def write_metadata(streams: StreamCollection, prefix: str, storage_options: Dict):
Expand All @@ -55,7 +55,7 @@ def write_metadata(streams: StreamCollection, prefix: str, storage_options: Dict
prefix, metadata[StreamCollection.KEY_OF_PATH], 'metadata.json'
)
logger.info('creating metadata for %r ...', metadata_path)
fs, _, _ = get_fs_token_paths(metadata_path, storage_options=storage_options)
fs, _, _ = infer_fsspec_paths(metadata_path, storage_options=storage_options)
if hasattr(fs, 'auto_mkdir'):
fs.auto_mkdir = True
with fs.open(metadata_path, 'wb', **storage_options) as fp:
Expand All @@ -67,7 +67,7 @@ def write_byte_stream(client, stream: ByteStream, prefix: str, storage_options:
try:
reader = stream.open_reader(client)

fs, _, _ = get_fs_token_paths(
fs, _, _ = infer_fsspec_paths(
os.path.join(prefix, path), storage_options=storage_options
)
if hasattr(fs, 'auto_mkdir'):
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/drivers/io/adaptors/write_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import cloudpickle
import fsspec
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard.io.dataframe import DataframeStream
Expand All @@ -37,6 +36,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def write_orc(
Expand All @@ -60,7 +60,7 @@ def write_orc(
chunk_hook = write_options.get('chunk_hook', None)

writer = None
fs, _, _ = get_fs_token_paths(
fs, _, _ = infer_fsspec_paths(
f"{path}_{proc_index}", storage_options=storage_options
)
if hasattr(fs, 'auto_mkdir'):
Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/drivers/io/adaptors/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import cloudpickle
import fsspec
from fsspec.core import get_fs_token_paths

import vineyard
from vineyard.io.dataframe import DataframeStream
Expand All @@ -35,6 +34,7 @@
from vineyard.drivers.io import fsspec_adaptors
except Exception: # pylint: disable=broad-except
logger.warning("Failed to import fsspec adaptors for hdfs, oss, etc.")
from vineyard.drivers.io.fsspec_adaptors import infer_fsspec_paths # noqa: E402


def write_parquet(
Expand All @@ -58,7 +58,7 @@ def write_parquet(
chunk_hook = write_options.get('chunk_hook', None)

writer = None
fs, _, _ = get_fs_token_paths(
fs, _, _ = infer_fsspec_paths(
f"{path}_{proc_index}", storage_options=storage_options
)
if hasattr(fs, 'auto_mkdir'):
Expand Down
102 changes: 63 additions & 39 deletions python/vineyard/drivers/io/fsspec_adaptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@
import pyarrow.fs

import fsspec
import fsspec.implementations.arrow

try:
from fsspec.implementations.arrow import ArrowFile as FSSpecArrowFile
except ImportError:
FSSpecArrowFile = None
try:
from fsspec.implementations.arrow import HadoopFileSystem as FSSpecHadoopFileSystem
except ImportError:
FSSpecHadoopFileSystem = None

from fsspec.core import get_fs_token_paths

# register OSS
try:
Expand All @@ -33,41 +43,55 @@
fsspec.register_implementation("oss", ossfs.OSSFileSystem, clobber=True)


class ArrowFile(fsspec.implementations.arrow.ArrowFile):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def size(self):
return self.stream.size()


class HDFSFileSystem(fsspec.implementations.arrow.HadoopFileSystem):
@fsspec.implementations.arrow.wrap_exceptions
def _open(self, path, mode="rb", block_size=None, **kwargs):
if mode == "rb":
method = self.fs.open_input_file
elif mode == "wb":
method = self.fs.open_output_stream
elif mode == "ab":
method = self.fs.open_append_stream
else:
raise ValueError(f"unsupported mode for Arrow filesystem: {mode!r}")

_kwargs = {}
pyarrow_version = fsspec.implementations.arrow.PYARROW_VERSION
if isinstance(pyarrow_version, str):
pyarrow_version_major = int(pyarrow_version.split(".")[0])
else:
pyarrow_version_major = pyarrow_version[0]
if pyarrow_version_major >= 4:
# disable compression auto-detection
_kwargs["compression"] = None
if mode == 'rb':
stream = method(path)
else:
stream = method(path, **_kwargs)

return ArrowFile(self, stream, path, mode, block_size, **kwargs)


fsspec.register_implementation("hdfs", HDFSFileSystem, clobber=True)
if FSSpecArrowFile:

class ArrowFile(FSSpecArrowFile):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def size(self):
return self.stream.size()


if FSSpecHadoopFileSystem:

class HDFSFileSystem(fsspec.implementations.arrow.HadoopFileSystem):
@fsspec.implementations.arrow.wrap_exceptions
def _open(self, path, mode="rb", block_size=None, **kwargs):
if mode == "rb":
method = self.fs.open_input_file
elif mode == "wb":
method = self.fs.open_output_stream
elif mode == "ab":
method = self.fs.open_append_stream
else:
raise ValueError(f"unsupported mode for Arrow filesystem: {mode!r}")

_kwargs = {}
pyarrow_version = fsspec.implementations.arrow.PYARROW_VERSION
if isinstance(pyarrow_version, str):
pyarrow_version_major = int(pyarrow_version.split(".")[0])
else:
pyarrow_version_major = pyarrow_version[0]
if pyarrow_version_major >= 4:
# disable compression auto-detection
_kwargs["compression"] = None
if mode == 'rb':
stream = method(path)
else:
stream = method(path, **_kwargs)

return ArrowFile(self, stream, path, mode, block_size, **kwargs)

fsspec.register_implementation("hdfs", HDFSFileSystem, clobber=True)


def infer_fsspec_paths(prefix, storage_options):
'''Infer the paths from the prefix and storage options.'''
fs, token, files = get_fs_token_paths(prefix, storage_options=storage_options)
if fs.isfile(prefix):
return fs, token, [prefix]
else:
if '*' not in prefix:
prefix = prefix + '*'
return fs, token, fs.glob(prefix)

0 comments on commit 5b0ef1a

Please sign in to comment.