Skip to content

Commit

Permalink
[Serve] Update system_logging_config to logging_config (ray-project#4…
Browse files Browse the repository at this point in the history
…1749)

Signed-off-by: Sihan Wang <sihanwang41@gmail.com>
  • Loading branch information
sihanwang41 authored Dec 8, 2023
1 parent f783234 commit d6d2689
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 40 deletions.
4 changes: 2 additions & 2 deletions dashboard/modules/serve/serve_rest_api_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def put_all_applications(self, req: Request) -> Response:
client = await serve_start_async(
http_options=full_http_options,
grpc_options=grpc_options,
system_logging_config=config.logging_config,
global_logging_config=config.logging_config,
)

# Serve ignores HTTP options if it was already running when
Expand All @@ -186,7 +186,7 @@ async def put_all_applications(self, req: Request) -> Response:

try:
if config.logging_config:
client.update_system_logging_config(config.logging_config)
client.update_global_logging_config(config.logging_config)
client.deploy_apps(config)
record_extra_usage_tag(TagKey.SERVE_REST_API_VERSION, "v2")
except RayTaskError as e:
Expand Down
20 changes: 10 additions & 10 deletions python/ray/serve/_private/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _check_http_options(
def _start_controller(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
global_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> Tuple[ActorHandle, str]:
"""Start Ray Serve controller.
Expand Down Expand Up @@ -157,16 +157,16 @@ def _start_controller(
if isinstance(grpc_options, dict):
grpc_options = gRPCOptions(**grpc_options)

if system_logging_config is None:
system_logging_config = LoggingConfig()
elif isinstance(system_logging_config, dict):
system_logging_config = LoggingConfig(**system_logging_config)
if global_logging_config is None:
global_logging_config = LoggingConfig()
elif isinstance(global_logging_config, dict):
global_logging_config = LoggingConfig(**global_logging_config)

controller = ServeController.options(**controller_actor_options).remote(
SERVE_CONTROLLER_NAME,
http_config=http_options,
grpc_options=grpc_options,
system_logging_config=system_logging_config,
global_logging_config=global_logging_config,
)

proxy_handles = ray.get(controller.get_proxies.remote())
Expand All @@ -186,7 +186,7 @@ def _start_controller(
async def serve_start_async(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
global_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> ServeControllerClient:
"""Initialize a serve instance asynchronously.
Expand Down Expand Up @@ -216,7 +216,7 @@ async def serve_start_async(
controller, controller_name = (
await ray.remote(_start_controller)
.options(num_cpus=0)
.remote(http_options, grpc_options, system_logging_config, **kwargs)
.remote(http_options, grpc_options, global_logging_config, **kwargs)
)

client = ServeControllerClient(
Expand All @@ -231,7 +231,7 @@ async def serve_start_async(
def serve_start(
http_options: Union[None, dict, HTTPOptions] = None,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
global_logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
) -> ServeControllerClient:
"""Initialize a serve instance.
Expand Down Expand Up @@ -289,7 +289,7 @@ def serve_start(
pass

controller, controller_name = _start_controller(
http_options, grpc_options, system_logging_config, **kwargs
http_options, grpc_options, global_logging_config, **kwargs
)

client = ServeControllerClient(
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,6 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
self._controller.record_multiplexed_replica_info.remote(info)

@_ensure_connected
def update_system_logging_config(self, logging_config: LoggingConfig):
def update_global_logging_config(self, logging_config: LoggingConfig):
"""Reconfigure the logging config for the controller & proxies."""
self._controller.reconfigure_system_logging_config.remote(logging_config)
self._controller.reconfigure_global_logging_config.remote(logging_config)
33 changes: 16 additions & 17 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def __init__(
controller_name: str,
*,
http_config: HTTPOptions,
system_logging_config: LoggingConfig,
global_logging_config: LoggingConfig,
grpc_options: Optional[gRPCOptions] = None,
):
self._controller_node_id = ray.get_runtime_context().get_node_id()
Expand All @@ -135,11 +135,11 @@ async def __init__(
# Try to read config from checkpoint
# logging config from checkpoint take precedence over the one passed in
# the constructor.
self.system_logging_config = None
self.global_logging_config = None
log_config_checkpoint = self.kv_store.get(LOGGING_CONFIG_CHECKPOINT_KEY)
if log_config_checkpoint is not None:
system_logging_config = pickle.loads(log_config_checkpoint)
self.reconfigure_system_logging_config(system_logging_config)
global_logging_config = pickle.loads(log_config_checkpoint)
self.reconfigure_global_logging_config(global_logging_config)

configure_component_memory_profiler(
component_name="controller", component_id=str(os.getpid())
Expand All @@ -163,7 +163,7 @@ async def __init__(
http_config,
self._controller_node_id,
self.cluster_node_info_cache,
self.system_logging_config,
self.global_logging_config,
grpc_options,
)

Expand Down Expand Up @@ -223,29 +223,29 @@ async def __init__(
description="The number of times that controller has started.",
).inc()

def reconfigure_system_logging_config(self, system_logging_config: LoggingConfig):
def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig):
if (
self.system_logging_config
and self.system_logging_config == system_logging_config
self.global_logging_config
and self.global_logging_config == global_logging_config
):
return
self.kv_store.put(
LOGGING_CONFIG_CHECKPOINT_KEY, pickle.dumps(system_logging_config)
LOGGING_CONFIG_CHECKPOINT_KEY, pickle.dumps(global_logging_config)
)
self.system_logging_config = system_logging_config
self.global_logging_config = global_logging_config

self.long_poll_host.notify_changed(
LongPollNamespace.SYSTEM_LOGGING_CONFIG,
system_logging_config,
LongPollNamespace.GLOBAL_LOGGING_CONFIG,
global_logging_config,
)
configure_component_logger(
component_name="controller",
component_id=str(os.getpid()),
logging_config=system_logging_config,
logging_config=global_logging_config,
)
logger.debug(
"Configure the serve controller logger "
f"with logging config: {self.system_logging_config}"
f"with logging config: {self.global_logging_config}"
)

def check_alive(self) -> None:
Expand Down Expand Up @@ -513,7 +513,6 @@ def _read_config_checkpoint(

checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)
if checkpoint is not None:

(
deployment_time,
target_capacity,
Expand Down Expand Up @@ -1081,7 +1080,7 @@ def _get_logging_config(self) -> Tuple:
for handler in logger.handlers:
if isinstance(handler, logging.handlers.RotatingFileHandler):
log_file_path = handler.baseFilename
return self.system_logging_config, log_file_path
return self.global_logging_config, log_file_path

def _get_target_capacity_direction(self) -> Optional[TargetCapacityDirection]:
"""Gets the controller's scale direction (for testing purposes)."""
Expand Down Expand Up @@ -1192,7 +1191,7 @@ def __init__(
).remote(
controller_name,
http_config=http_config,
system_logging_config=logging_config,
global_logging_config=logging_config,
)

def check_alive(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __repr__(self):

RUNNING_REPLICAS = auto()
ROUTE_TABLE = auto()
SYSTEM_LOGGING_CONFIG = auto()
GLOBAL_LOGGING_CONFIG = auto()


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ def __init__(
self.long_poll_client = long_poll_client or LongPollClient(
ray.get_actor(controller_name, namespace=SERVE_NAMESPACE),
{
LongPollNamespace.SYSTEM_LOGGING_CONFIG: self._update_logging_config,
LongPollNamespace.GLOBAL_LOGGING_CONFIG: self._update_logging_config,
},
call_in_event_loop=get_or_create_event_loop(),
)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def start(
http_options: Union[None, dict, HTTPOptions] = None,
dedicated_cpu: bool = False,
grpc_options: Union[None, dict, gRPCOptions] = None,
system_logging_config: Union[None, dict, LoggingConfig] = None,
logging_config: Union[None, dict, LoggingConfig] = None,
**kwargs,
):
"""Start Serve on the cluster.
Expand All @@ -89,7 +89,7 @@ def start(
grpc_options: [EXPERIMENTAL] gRPC config options for the proxies. These can
be passed as an unstructured dictionary or the structured `gRPCOptions`
class See `gRPCOptions` for supported options.
system_logging_config: logging config options for the serve component (
logging_config: logging config options for the serve component (
controller & proxy).
"""

Expand Down Expand Up @@ -122,7 +122,7 @@ class See `gRPCOptions` for supported options.
_private_api.serve_start(
http_options=http_options,
grpc_options=grpc_options,
system_logging_config=system_logging_config,
global_logging_config=logging_config,
**kwargs,
)

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test_callback_fail(ray_instance):
handle = actor_def.remote(
"controller",
http_config={},
system_logging_config=LoggingConfig(),
global_logging_config=LoggingConfig(),
)
with pytest.raises(RayActorError, match="cannot be imported"):
ray.get(handle.check_alive.remote())
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def __call__(self):
client = serve_instance

# Update the logging config
client.update_system_logging_config(
client.update_global_logging_config(
LoggingConfig(encoding="JSON", log_level="DEBUG")
)

Expand Down
3 changes: 1 addition & 2 deletions python/ray/serve/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def check_log_file(log_file: str, expected_regex: list):

class TestLoggingAPI:
def test_start_serve_with_logging_config(self, serve_and_ray_shutdown):
serve.start(system_logging_config={"log_level": "DEBUG", "encoding": "JSON"})
serve.start(logging_config={"log_level": "DEBUG", "encoding": "JSON"})
serve_log_dir = get_serve_logs_dir()
# Check controller log
actors = state_api.list_actors()
Expand Down Expand Up @@ -434,7 +434,6 @@ def __call__(self, req: starlette.requests.Request):
check_log_file(resp["log_file"], expected_log_regex)

def test_logs_dir(self, serve_and_ray_shutdown):

logger = logging.getLogger("ray.serve")

@serve.deployment
Expand Down

0 comments on commit d6d2689

Please sign in to comment.