forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Datasets] Add local and S3 filesystem test coverage for file-based d…
…atasources. (ray-project#17158)
- Loading branch information
1 parent
e53aeca
commit d6eeb5d
Showing
6 changed files
with
284 additions
and
76 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import os | ||
import shutil | ||
|
||
import pytest | ||
import pyarrow as pa | ||
|
||
from ray.data.tests.mock_server import * # noqa | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def aws_credentials(): | ||
import os | ||
old_env = os.environ | ||
os.environ["AWS_ACCESS_KEY_ID"] = "testing" | ||
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" | ||
os.environ["AWS_SECURITY_TOKEN"] = "testing" | ||
os.environ["AWS_SESSION_TOKEN"] = "testing" | ||
yield | ||
os.environ = old_env | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def data_dir(): | ||
yield "test_data" | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def s3_path(data_dir): | ||
yield "s3://" + data_dir | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def s3_fs(aws_credentials, s3_server, s3_path): | ||
fs = pa.fs.S3FileSystem(region="us-west-2", endpoint_override=s3_server) | ||
if s3_path.startswith("s3://"): | ||
s3_path = s3_path[len("s3://"):] | ||
fs.create_dir(s3_path) | ||
yield fs | ||
fs.delete_dir(s3_path) | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def local_path(tmp_path, data_dir): | ||
path = os.path.join(tmp_path, data_dir) | ||
os.mkdir(path) | ||
yield path | ||
shutil.rmtree(path) | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def local_fs(): | ||
yield pa.fs.LocalFileSystem() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# extracted from aioboto3 | ||
# https://github.com/terrycain/aioboto3/blob/16a1a1085191ebe6d40ee45d9588b2173738af0c/tests/mock_server.py | ||
import pytest | ||
import requests | ||
import shutil | ||
import signal | ||
import subprocess as sp | ||
import time | ||
|
||
_proxy_bypass = { | ||
"http": None, | ||
"https": None, | ||
} | ||
|
||
|
||
def start_service(service_name, host, port): | ||
moto_svr_path = shutil.which("moto_server") | ||
args = [moto_svr_path, service_name, "-H", host, "-p", str(port)] | ||
# For debugging | ||
# args = '{0} {1} -H {2} -p {3} 2>&1 | \ | ||
# tee -a /tmp/moto.log'.format(moto_svr_path, service_name, host, port) | ||
process = sp.Popen( | ||
args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) # shell=True | ||
url = "http://{host}:{port}".format(host=host, port=port) | ||
|
||
for i in range(0, 30): | ||
output = process.poll() | ||
if output is not None: | ||
print("moto_server exited status {0}".format(output)) | ||
stdout, stderr = process.communicate() | ||
print("moto_server stdout: {0}".format(stdout)) | ||
print("moto_server stderr: {0}".format(stderr)) | ||
pytest.fail("Can not start service: {}".format(service_name)) | ||
|
||
try: | ||
# we need to bypass the proxies due to monkeypatches | ||
requests.get(url, timeout=5, proxies=_proxy_bypass) | ||
break | ||
except requests.exceptions.ConnectionError: | ||
time.sleep(0.5) | ||
else: | ||
stop_process(process) # pytest.fail doesn't call stop_process | ||
pytest.fail("Can not start service: {}".format(service_name)) | ||
|
||
return process | ||
|
||
|
||
def stop_process(process): | ||
try: | ||
process.send_signal(signal.SIGTERM) | ||
process.communicate(timeout=20) | ||
except sp.TimeoutExpired: | ||
process.kill() | ||
outs, errors = process.communicate(timeout=20) | ||
exit_code = process.returncode | ||
msg = "Child process finished {} not in clean way: {} {}" \ | ||
.format(exit_code, outs, errors) | ||
raise RuntimeError(msg) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def s3_server(): | ||
host = "localhost" | ||
port = 5002 | ||
url = "http://{host}:{port}".format(host=host, port=port) | ||
process = start_service("s3", host, port) | ||
yield url | ||
stop_process(process) |
Oops, something went wrong.