Skip to content

Commit

Permalink
[Serve] [Dashboard] Support nondetached and multiple Serve instances …
Browse files Browse the repository at this point in the history
…in cluster snapshot (ray-project#17747)
  • Loading branch information
architkulkarni authored Aug 12, 2021
1 parent ce171f1 commit 00f6b30
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 30 deletions.
34 changes: 25 additions & 9 deletions dashboard/modules/snapshot/snapshot_head.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Dict, Any, List

import ray
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_pb2
from ray.core.generated import gcs_service_pb2_grpc
from ray.experimental.internal_kv import _internal_kv_get
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_list

import ray.new_dashboard.utils as dashboard_utils

Expand All @@ -28,6 +31,8 @@ async def snapshot(self, req):
"actors": actor_data,
"deployments": serve_data,
"session_name": session_name,
"ray_version": ray.__version__,
"ray_commit": ray.__commit__
}
return dashboard_utils.rest_response(
success=True, message="hello", snapshot=snapshot)
Expand Down Expand Up @@ -93,19 +98,30 @@ async def get_serve_info(self):
try:
from ray.serve.controller import SNAPSHOT_KEY as SERVE_SNAPSHOT_KEY
from ray.serve.constants import SERVE_CONTROLLER_NAME
from ray.serve.storage.kv_store import get_storage_key
except Exception:
return "{}"

client = self._dashboard_head.gcs_client
gcs_client = self._dashboard_head.gcs_client

# Serve wraps Ray's internal KV store and specially formats the keys.
# TODO(architkulkarni): Use _internal_kv_list to get all Serve
# controllers. Currently we only get the detached one. Non-detached
# ones have name = SERVE_CONTROLLER_NAME + random letters.
key = get_storage_key(SERVE_CONTROLLER_NAME, SERVE_SNAPSHOT_KEY)
val_bytes = _internal_kv_get(key, client) or "{}".encode("utf-8")
return json.loads(val_bytes.decode("utf-8"))
# These are the keys we are interested in:
# SERVE_CONTROLLER_NAME(+ optional random letters):SERVE_SNAPSHOT_KEY

serve_keys = _internal_kv_list(SERVE_CONTROLLER_NAME, gcs_client)
serve_snapshot_keys = filter(lambda k: SERVE_SNAPSHOT_KEY in str(k),
serve_keys)

deployments_per_controller: List[Dict[str, Any]] = []
for key in serve_snapshot_keys:
val_bytes = _internal_kv_get(key,
gcs_client) or "{}".encode("utf-8")
deployments_per_controller.append(
json.loads(val_bytes.decode("utf-8")))
deployments: Dict[str, Any] = {
k: v
for d in deployments_per_controller for k, v in d.items()
}
return deployments

async def get_session_name(self):
encoded_name = await self._dashboard_head.aioredis_client.get(
Expand Down
6 changes: 6 additions & 0 deletions dashboard/modules/snapshot/snapshot_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
"snapshot": {
"type": "object",
"properties": {
"ray_version": {
"type": "string"
},
"ray_commit": {
"type": "string"
},
"jobs": {
"type": "object",
"patternProperties": {
Expand Down
63 changes: 50 additions & 13 deletions dashboard/modules/snapshot/tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytest
import requests

import ray
from ray import serve
from ray.test_utils import (
format_web_url,
run_string_as_driver,
Expand Down Expand Up @@ -71,10 +73,18 @@ def ping(self):
else:
assert entry["endTime"] > 0, entry
assert "runtimeEnv" in entry
assert data["data"]["snapshot"]["rayCommit"] == ray.__commit__
assert data["data"]["snapshot"]["rayVersion"] == ray.__version__


@pytest.mark.parametrize(
"ray_start_with_dashboard", [{
"num_cpus": 4
}], indirect=True)
def test_serve_snapshot(ray_start_with_dashboard):
driver_script = f"""
"""Test detached and nondetached Serve instances running concurrently."""

detached_serve_driver_script = f"""
import ray
from ray import serve
Expand All @@ -90,7 +100,21 @@ def my_func(request):
my_func.deploy()
"""
run_string_as_driver(driver_script)

run_string_as_driver(detached_serve_driver_script)
assert requests.get("http://127.0.0.1:8000/my_func").text == "hello"

# Use a new port to avoid clobbering the first Serve instance.
serve.start(http_options={"port": 8123})

@serve.deployment(version="v1")
def my_func_nondetached(request):
return "hello"

my_func_nondetached.deploy()

assert requests.get(
"http://127.0.0.1:8123/my_func_nondetached").text == "hello"

webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)
Expand All @@ -103,17 +127,30 @@ def my_func(request):
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))

assert len(data["data"]["snapshot"]["deployments"]) == 1
for deployment_name, entry in data["data"]["snapshot"][
"deployments"].items():
assert entry["name"] == "my_func"
assert entry["version"] == "v1"
assert entry["httpRoute"] == "/my_func"
assert entry["className"] == "my_func"
assert entry["status"] == "RUNNING"
assert entry["rayJobId"] is not None
assert entry["startTime"] == 0
assert entry["endTime"] == 0
assert len(data["data"]["snapshot"]["deployments"]) == 2

entry = data["data"]["snapshot"]["deployments"]["myFunc"]
assert entry["name"] == "my_func"
assert entry["version"] == "v1"
assert entry["namespace"] == "serve"
assert entry["httpRoute"] == "/my_func"
assert entry["className"] == "my_func"
assert entry["status"] == "RUNNING"
assert entry["rayJobId"] is not None
assert entry["startTime"] == 0
assert entry["endTime"] == 0

entry_nondetached = data["data"]["snapshot"]["deployments"][
"myFuncNondetached"]
assert entry_nondetached["name"] == "my_func_nondetached"
assert entry_nondetached["version"] == "v1"
assert entry_nondetached["namespace"] == ""
assert entry_nondetached["httpRoute"] == "/my_func_nondetached"
assert entry_nondetached["className"] == "my_func_nondetached"
assert entry_nondetached["status"] == "RUNNING"
assert entry_nondetached["rayJobId"] is not None
assert entry_nondetached["startTime"] == 0
assert entry_nondetached["endTime"] == 0


if __name__ == "__main__":
Expand Down
16 changes: 12 additions & 4 deletions python/ray/experimental/internal_kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _internal_kv_get(key: Union[str, bytes], gcs_client=None) -> bytes:
"""Fetch the value of a binary key.
Args:
gcs_client: The GCS client to use to fetch the key. If not provided,
gcs_client: The GCS client to use to fetch the value. If not provided,
the global worker's GCS client is used.
"""
if gcs_client:
Expand Down Expand Up @@ -80,9 +80,17 @@ def _internal_kv_del(key: Union[str, bytes]):


@client_mode_hook
def _internal_kv_list(prefix: Union[str, bytes]) -> List[bytes]:
"""List all keys in the internal KV store that start with the prefix."""
if redis:
def _internal_kv_list(prefix: Union[str, bytes],
gcs_client=None) -> List[bytes]:
"""List all keys in the internal KV store that start with the prefix.
Args:
gcs_client: The GCS client to use to fetch the keys. If not provided,
the global worker's GCS client is used.
"""
if gcs_client:
return gcs_client.kv_keys(prefix)
elif redis:
if isinstance(prefix, bytes):
pattern = prefix + b"*"
else:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def format_actor_name(actor_name, controller_name=None, *modifiers):
if controller_name is None:
name = actor_name
else:
name = "{}:{}".format(controller_name, actor_name)
name = "{}:{}".format(actor_name, controller_name)

for modifier in modifiers:
name += "-{}".format(modifier)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def _ray_start(**kwargs):
@pytest.fixture
def ray_start_with_dashboard(request):
param = getattr(request, "param", {})

with _ray_start(
num_cpus=1, include_dashboard=True, **param) as address_info:
if param.get("num_cpus") is None:
param["num_cpus"] = 1
with _ray_start(include_dashboard=True, **param) as address_info:
yield address_info


Expand Down

0 comments on commit 00f6b30

Please sign in to comment.