Skip to content

Commit

Permalink
[CoreEngine] check if the endpoint is ready, debug the federate job o…
Browse files Browse the repository at this point in the history
…n launch.
  • Loading branch information
fedml-alex committed Jan 3, 2024
1 parent be499b3 commit 6cc57da
Show file tree
Hide file tree
Showing 26 changed files with 352 additions and 110 deletions.
2 changes: 1 addition & 1 deletion devops/dockerfile/server-agent/Dockerfile-Dev
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ ENV MODE=normal FEDML_VERSION=${VERSION} ACCOUNT_ID=0 SERVER_AGENT_ID=0 \
AWS_IAM_ACCESS_KEY=0 \
AWS_REGION=0

CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID};bash ./runner.sh
CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -k ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID};bash ./runner.sh
2 changes: 1 addition & 1 deletion devops/dockerfile/server-agent/Dockerfile-Release
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ ENV MODE=normal FEDML_VERSION=${VERSION} ACCOUNT_ID=0 SERVER_AGENT_ID=0 \
AWS_IAM_ACCESS_KEY=0 \
AWS_REGION=0

CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID}; bash ./runner.sh
CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -k ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID}; bash ./runner.sh
2 changes: 1 addition & 1 deletion devops/dockerfile/server-agent/Dockerfile-Test
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ ENV MODE=normal FEDML_VERSION=${VERSION} ACCOUNT_ID=0 SERVER_AGENT_ID=0 \
AWS_IAM_ACCESS_KEY=0 \
AWS_REGION=0

CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID}; bash ./runner.sh
CMD bash ./start-redis.sh; ./set-aws-credentials.sh ${AWS_IAM_ACCESS_ID} ${AWS_IAM_ACCESS_KEY} ${AWS_REGION};python3 ./fedml-pip/fedml/computing/scheduler/master/server_daemon.py -t login -u ${ACCOUNT_ID} -k ${ACCOUNT_ID} -v ${FEDML_VERSION} -r cloud_agent -id ${SERVER_AGENT_ID}; bash ./runner.sh
6 changes: 4 additions & 2 deletions python/examples/launch/federated_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ server_job: |
python3 torch_server.py --cf config/fedml_config.yaml --rank 0 --role server --run_id $FEDML_CURRENT_RUN_ID
job_type: federate # options: train, deploy, federate
fedml_env:
federate_project_name: Cheetah_HelloWorld

# train subtype: general_training, single_machine_training, cluster_distributed_training, cross_cloud_training
# federate subtype: cross_silo, simulation, web, smart_phone
Expand All @@ -42,5 +44,5 @@ computing:
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $3000 # max cost per hour for your job per gpu card
#allow_cross_cloud_resources: true # true, false
#device_type: CPU # options: GPU, CPU, hybrid
resource_type: A100-80G # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
device_type: CPU # options: GPU, CPU, hybrid
#resource_type: A100-800G # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
5 changes: 2 additions & 3 deletions python/fedml/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,11 @@ def run_stop(run_id: str, platform: str = "falcon", api_key: str = None) -> bool
return run.stop(run_id=run_id, platform=platform, api_key=api_key)


def run_list(run_name: str, run_id: str = None, platform: str = "falcon", api_key: str = None) -> FedMLRunModelList:
def run_list(run_name: str = None, run_id: str = None, platform: str = "falcon", api_key: str = None) -> FedMLRunModelList:
return run.list_run(run_name=run_name, run_id=run_id, platform=platform, api_key=api_key)


def run_status(run_name: str, run_id: str = None, platform: str = "falcon", api_key: str = None) -> (
FedMLRunModelList, str):
def run_status(run_name: str = None, run_id: str = None, platform: str = "falcon", api_key: str = None) -> (FedMLRunModelList, str):
return run.status(run_name=run_name, run_id=run_id, platform=platform, api_key=api_key)


Expand Down
19 changes: 16 additions & 3 deletions python/fedml/api/modules/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

class LaunchResult:
def __init__(self, result_code: int, result_message: str, run_id: str = None, project_id: str = None,
inner_id: str = None, result_object: FedMLRunStartedModel=None):
inner_id: str = None, result_object: FedMLRunStartedModel = None):
self.run_id = run_id
self.project_id = project_id
self.inner_id = inner_id
Expand All @@ -27,7 +27,8 @@ def __init__(self, result_code: int, result_message: str, run_id: str = None, pr


def create_run(yaml_file, api_key: str, resource_id: str = None, device_server: str = None,
device_edges: List[str] = None, feature_entry_point: FeatureEntryPoint = None) -> (int, str, FedMLRunStartedModel):
device_edges: List[str] = None, feature_entry_point: FeatureEntryPoint = None) -> (
int, str, FedMLRunStartedModel):
result_code, result_message = (ApiConstants.ERROR_CODE[ApiConstants.RESOURCE_MATCHED_STATUS_MATCHED],
ApiConstants.RESOURCE_MATCHED_STATUS_MATCHED)

Expand Down Expand Up @@ -94,12 +95,24 @@ def run(create_run_result: FedMLRunStartedModel, api_key: str, device_server: st
device_edges: List[str] = None, feature_entry_point: FeatureEntryPoint = None):
authenticate(api_key)

# Start a federated run when the job type is federate.
federate_launch_result = None
if create_run_result.job_type == Constants.JOB_TASK_TYPE_FEDERATE:
device_server = create_run_result.server_agent_id
device_edges = [int(gpu_machine.gpu_id) for gpu_machine in create_run_result.gpu_matched]
federate_launch_result = start(platform=SchedulerConstants.PLATFORM_TYPE_OCTOPUS,
create_run_result=create_run_result,
device_server=device_server, device_edges=device_edges, api_key=get_api_key(),
feature_entry_point=feature_entry_point)

# Start the run
launch_result = start(platform=SchedulerConstants.PLATFORM_TYPE_FALCON, create_run_result=create_run_result,
device_server=device_server, device_edges=device_edges, api_key=get_api_key(),
feature_entry_point=feature_entry_point)
if federate_launch_result is not None:
launch_result.inner_id = federate_launch_result.run_id

return launch_result
return federate_launch_result if federate_launch_result is not None else launch_result


def job(
Expand Down
1 change: 1 addition & 0 deletions python/fedml/api/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _check_api_key(api_key=None):


def authenticate(api_key):

error_code, api_key = fedml_login(api_key)

# Exit if not able to authenticate successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def restart_container(self, container_name, container_port=2345):
try:
client.api.restart(container=container_obj.id)
inference_port = self.get_host_port(container_obj, container_port)
container_obj.reload()
return container_obj.status == "running", inference_port
except Exception as e:
logging.error("Failed to restart container ")
Expand Down Expand Up @@ -121,7 +122,7 @@ def start_container(self, container_name, container_port=2345):
client.api.start(container=container_obj.id)
inference_port = self.get_host_port(container_obj, container_port)
container_obj.reload()
return container_obj.status == "running", inference_port
return container_obj.status == "running", inference_port
except Exception as e:
logging.error(f"Failed to restart container {traceback.format_exc()}")

Expand Down Expand Up @@ -169,7 +170,7 @@ def get_container_rank_same_model(prefix:str):
return -1

try:
container_list = client.containers.list()
container_list = client.containers.list(all=True)
except docker.errors.APIError:
logging.error("The API cannot be accessed")
return -1
Expand Down
84 changes: 56 additions & 28 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ def monitor_slave_endpoint_status(self):
pass
FedMLModelDatabase.get_instance().set_database_base_dir(device_client_constants.ClientConstants.get_database_dir())
job_list = device_client_data_interface.FedMLClientDataInterface.get_instance().get_jobs_from_db()
agent_config = dict()
agent_config["mqtt_config"] = self.mqtt_config
endpoint_sync_protocol = FedMLEndpointSyncProtocol(agent_config=agent_config)
endpoint_sync_protocol.setup_client_mqtt_mgr()
for job in job_list.job_list:
count += 1
if count >= 1000:
Expand Down Expand Up @@ -258,20 +262,36 @@ def monitor_slave_endpoint_status(self):
job.job_id, endpoint_name, model_name, job.edge_id)
if deployment_result is None:
continue
generated_container_name = device_client_constants.ClientConstants.get_deployment_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id)

status_result = FedMLModelDatabase.get_instance().get_deployment_status_with_device_id(
job.job_id, endpoint_name, model_name, job.edge_id)

# Check the endpoint status
is_endpoint_ready = self._check_and_reset_endpoint_status(
job.job_id, job.edge_id, deployment_result, only_check_inference_ready_status=True)
if not is_endpoint_ready:
started, inference_port = ContainerUtils.get_instance().restart_container(generated_container_name)
else:
started, inference_port = ContainerUtils.get_instance().start_container(generated_container_name)
if started and inference_port != 0:
FedMLEndpointSyncProtocol.send_sync_inference_info(
device_ids[0], job.edge_id, job.job_id, endpoint_name, model_name,
model_id, model_version, inference_port)

# Get endpoint container name prefix
endpoint_container_name_prefix = device_client_constants.ClientConstants.get_endpoint_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id)

# Could be multiple containers for the same endpoint
num_containers = ContainerUtils.get_instance().get_container_rank_same_model(
endpoint_container_name_prefix)

for i in range(num_containers):
endpoint_container_name = endpoint_container_name_prefix + f"__{i}"
if not is_endpoint_ready:
started, inference_port = ContainerUtils.get_instance().restart_container(endpoint_container_name)
else:
started, inference_port = ContainerUtils.get_instance().start_container(endpoint_container_name)
if started and inference_port != 0:
endpoint_sync_protocol.send_sync_inference_info(
device_ids[0], job.edge_id, job.job_id, endpoint_name, model_name,
model_id, model_version, inference_port)

endpoint_sync_protocol.set_local_deployment_status_result(
job.job_id, endpoint_name, model_name, model_version, job.edge_id,
inference_port, status_result, deployment_result)
elif job.status == device_client_constants.ClientConstants.MSG_MLOPS_CLIENT_STATUS_OFFLINE:
endpoint_json = json.loads(job.running_json)
model_config = endpoint_json.get("model_config", {})
Expand All @@ -284,16 +304,32 @@ def monitor_slave_endpoint_status(self):
job.job_id, endpoint_name, model_name, job.edge_id)
if deployment_result is None:
continue
generated_container_name = device_client_constants.ClientConstants.get_deployment_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id)

status_result = FedMLModelDatabase.get_instance().get_deployment_status_with_device_id(
job.job_id, endpoint_name, model_name, job.edge_id)

is_endpoint_ready = self._check_and_reset_endpoint_status(
job.job_id, job.edge_id, deployment_result, only_check_inference_ready_status=True)
if is_endpoint_ready:
started, inference_port = ContainerUtils.get_instance().start_container(generated_container_name)
if started and inference_port != 0:
FedMLEndpointSyncProtocol.send_sync_inference_info(
device_ids[0], job.edge_id, job.job_id, endpoint_name, model_name,
model_id, model_version, inference_port)
# Get endpoint container name prefix
endpoint_container_name_prefix = device_client_constants.ClientConstants.get_endpoint_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id)

# Could be multiple containers for the same endpoint
num_containers = ContainerUtils.get_instance().get_container_rank_same_model(
endpoint_container_name_prefix)

for i in range(num_containers):
endpoint_container_name = endpoint_container_name_prefix + f"__{i}"
started, inference_port = ContainerUtils.get_instance().start_container(endpoint_container_name)
if started and inference_port != 0:
endpoint_sync_protocol.send_sync_inference_info(
device_ids[0], job.edge_id, job.job_id, endpoint_name, model_name,
model_id, model_version, inference_port)

endpoint_sync_protocol.set_local_deployment_status_result(
job.job_id, endpoint_name, model_name, model_version, job.edge_id,
inference_port, status_result, deployment_result)
elif job.status == device_client_constants.ClientConstants.MSG_MLOPS_CLIENT_STATUS_RUNNING:
started_time = int(float(job.started_time))
timeout = time.time() - started_time
Expand Down Expand Up @@ -603,16 +639,6 @@ def monitor_endpoint_logs(self):
model_version = model_config.get("model_version", None)
endpoint_name = endpoint_json.get("end_point_name", None)

# Get endpoint container name
endpoint_container_name = device_client_constants.ClientConstants.get_endpoint_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id
)

# Get endpoint logs from the container
endpoint_logs = ContainerUtils.get_instance().get_container_logs(endpoint_container_name)
if endpoint_logs is None:
continue

log_file_path, program_prefix = MLOpsRuntimeLog.build_log_file_path_with_run_params(
job.job_id, job.edge_id, device_server_constants.ServerConstants.get_log_file_dir(), is_server=True,
log_file_prefix=JobMonitor.ENDPOINT_CONTAINER_LOG_PREFIX,
Expand All @@ -628,7 +654,7 @@ def monitor_endpoint_logs(self):

# Get endpoint container name
endpoint_container_name_prefix = device_client_constants.ClientConstants.get_endpoint_container_name(
endpoint_name, model_name, model_version, job.job_id, model_id
endpoint_name, model_name, model_version, job.job_id, model_id, edge_id=job.edge_id
)

# Could be multiple containers for the same endpoint
Expand All @@ -640,6 +666,8 @@ def monitor_endpoint_logs(self):

# Get endpoint logs from the container
endpoint_logs = ContainerUtils.get_instance().get_container_logs(endpoint_container_name)
if endpoint_logs is None:
continue

# Write container logs to the log file
if i == 0:
Expand Down
13 changes: 13 additions & 0 deletions python/fedml/computing/scheduler/comm_utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ def occupy_gpu_ids(self, run_id, request_gpu_num, device_id, inner_id=None,
model_master_device_id=None, model_slave_device_id=None):
try:
ComputeCacheManager.get_instance().set_redis_params()
original_run_id = run_id
run_id = inner_id if inner_id is not None else run_id
switchable_device_id = model_slave_device_id \
if inner_id is not None and model_slave_device_id is not None else device_id
with ComputeCacheManager.get_instance().lock(
ComputeCacheManager.get_instance().get_gpu_cache().get_device_run_lock_key(switchable_device_id, run_id)
):
if inner_id is not None and str(original_run_id) != str(inner_id):
ComputeCacheManager.get_instance().get_gpu_cache().set_endpoint_run_id_map(inner_id, original_run_id)

available_gpu_ids = self.get_available_gpu_id_list(device_id)

available_gpu_ids = JobRunnerUtils.search_and_refresh_available_gpu_ids(available_gpu_ids)
Expand Down Expand Up @@ -114,6 +118,8 @@ def trim_unavailable_gpu_ids(gpu_ids):
return trimmed_gpu_ids.copy()

def release_gpu_ids(self, run_id, device_id):
edge_device_id = None
original_run_id = None
try:
ComputeCacheManager.get_instance().set_redis_params()
with ComputeCacheManager.get_instance().lock(
Expand All @@ -139,10 +145,17 @@ def release_gpu_ids(self, run_id, device_id):
ComputeCacheManager.get_instance().get_gpu_cache().set_device_available_gpu_ids(edge_device_id, available_gpu_ids)

ComputeCacheManager.get_instance().get_gpu_cache().set_device_run_gpu_ids(device_id, run_id, None)

original_run_id = ComputeCacheManager.get_instance().get_gpu_cache().get_endpoint_run_id_map(run_id)

except Exception as e:
logging.info(f"Exception {traceback.format_exc()}")
pass

if edge_device_id is not None:
from fedml.core import mlops
mlops.release_resources(run_id if original_run_id is None else original_run_id, edge_device_id)

def get_available_gpu_id_list(self, device_id):
try:
ComputeCacheManager.get_instance().set_redis_params()
Expand Down
Loading

0 comments on commit 6cc57da

Please sign in to comment.