Skip to content

Commit

Permalink
[Datasets] Add local and S3 filesystem test coverage for file-based d…
Browse files Browse the repository at this point in the history
…atasources. (ray-project#17158)
  • Loading branch information
clarkzinzow authored Aug 12, 2021
1 parent e53aeca commit d6eeb5d
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 76 deletions.
12 changes: 7 additions & 5 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 PYTHON=3.7 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
# Because Python version changed, we need to re-install Ray here
- rm -rf ./python/ray/thirdparty_files; ./ci/travis/ci.sh build
- rm -rf ./python/ray/thirdparty_files; rm -rf ./python/ray/pickle5_files; ./ci/travis/ci.sh build
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=py37,-flaky,-client python/ray/tune/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/xgboost/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/horovod/...
Expand All @@ -369,18 +369,20 @@
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 PYTHON=3.7 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
- rm -rf ./python/ray/thirdparty_files; ./ci/travis/ci.sh build
- rm -rf ./python/ray/thirdparty_files; rm -rf ./python/ray/pickle5_files; ./ci/travis/ci.sh build
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client --test_env=RAY_CLIENT_MODE=1 python/ray/util/dask/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-flaky python/ray/tune/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-client_unit_tests python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-flaky python/ray/util/xgboost/...

- label: ":potable_water: Workflow & Dataset tests"
- label: ":potable_water: Workflow & Dataset tests (Python 3.7)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) python/ray/experimental/workflow/... python/ray/data/...
- DATA_PROCESSING_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
# Because Python version changed, we need to re-install Ray here
- rm -rf ./python/ray/thirdparty_files; rm -rf ./python/ray/pickle5_files; ./ci/travis/ci.sh build
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/experimental/workflow/... python/ray/data/...

- label: ":book: Doc tests and examples"
conditions:
Expand Down
52 changes: 52 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import shutil

import pytest
import pyarrow as pa

from ray.data.tests.mock_server import * # noqa


@pytest.fixture(scope="function")
def aws_credentials():
import os
old_env = os.environ
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
yield
os.environ = old_env


@pytest.fixture(scope="function")
def data_dir():
yield "test_data"


@pytest.fixture(scope="function")
def s3_path(data_dir):
yield "s3://" + data_dir


@pytest.fixture(scope="function")
def s3_fs(aws_credentials, s3_server, s3_path):
fs = pa.fs.S3FileSystem(region="us-west-2", endpoint_override=s3_server)
if s3_path.startswith("s3://"):
s3_path = s3_path[len("s3://"):]
fs.create_dir(s3_path)
yield fs
fs.delete_dir(s3_path)


@pytest.fixture(scope="function")
def local_path(tmp_path, data_dir):
path = os.path.join(tmp_path, data_dir)
os.mkdir(path)
yield path
shutil.rmtree(path)


@pytest.fixture(scope="function")
def local_fs():
yield pa.fs.LocalFileSystem()
68 changes: 68 additions & 0 deletions python/ray/data/tests/mock_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# extracted from aioboto3
# https://github.com/terrycain/aioboto3/blob/16a1a1085191ebe6d40ee45d9588b2173738af0c/tests/mock_server.py
import pytest
import requests
import shutil
import signal
import subprocess as sp
import time

_proxy_bypass = {
"http": None,
"https": None,
}


def start_service(service_name, host, port):
moto_svr_path = shutil.which("moto_server")
args = [moto_svr_path, service_name, "-H", host, "-p", str(port)]
# For debugging
# args = '{0} {1} -H {2} -p {3} 2>&1 | \
# tee -a /tmp/moto.log'.format(moto_svr_path, service_name, host, port)
process = sp.Popen(
args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) # shell=True
url = "http://{host}:{port}".format(host=host, port=port)

for i in range(0, 30):
output = process.poll()
if output is not None:
print("moto_server exited status {0}".format(output))
stdout, stderr = process.communicate()
print("moto_server stdout: {0}".format(stdout))
print("moto_server stderr: {0}".format(stderr))
pytest.fail("Can not start service: {}".format(service_name))

try:
# we need to bypass the proxies due to monkeypatches
requests.get(url, timeout=5, proxies=_proxy_bypass)
break
except requests.exceptions.ConnectionError:
time.sleep(0.5)
else:
stop_process(process) # pytest.fail doesn't call stop_process
pytest.fail("Can not start service: {}".format(service_name))

return process


def stop_process(process):
try:
process.send_signal(signal.SIGTERM)
process.communicate(timeout=20)
except sp.TimeoutExpired:
process.kill()
outs, errors = process.communicate(timeout=20)
exit_code = process.returncode
msg = "Child process finished {} not in clean way: {} {}" \
.format(exit_code, outs, errors)
raise RuntimeError(msg)


@pytest.fixture(scope="session")
def s3_server():
host = "localhost"
port = 5002
url = "http://{host}:{port}".format(host=host, port=port)
process = start_service("s3", host, port)
yield url
stop_process(process)
Loading

0 comments on commit d6eeb5d

Please sign in to comment.