Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanded Job Queue #1636

Merged
merged 48 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9b9b670
working expanded queue
mraheja Jan 26, 2023
c099057
added event
mraheja Jan 26, 2023
281d798
updated testing and bug fixes
mraheja Jan 26, 2023
5ad959d
fixed missed space
mraheja Jan 26, 2023
440fc68
formatting fix
mraheja Jan 26, 2023
d92da6a
fixed setting up status + enabled spot
mraheja Feb 15, 2023
0fed010
format
mraheja Feb 15, 2023
2cb6d19
fixed setting up test
mraheja Feb 17, 2023
42ce456
addressed comments
mraheja Feb 23, 2023
496284e
change stop lock range
mraheja Feb 28, 2023
21c3836
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Feb 28, 2023
cbf8ab3
error message for needing update
mraheja Feb 28, 2023
afedc83
removed wandb.py
mraheja Mar 3, 2023
68dea21
added skylet restart
mraheja Mar 3, 2023
fdc68e2
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Mar 3, 2023
5af3146
updates
mraheja Mar 10, 2023
052a579
addressed comments
mraheja Mar 17, 2023
6c29f09
formatting + minor comment addressing
mraheja Mar 24, 2023
be36c35
more comments addressed
mraheja Mar 24, 2023
a12577e
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Mar 26, 2023
21919c5
Fix rich status
Michaelvll Mar 26, 2023
0b7141a
fixed stalling issue
mraheja Apr 11, 2023
2e46f14
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 11, 2023
356c04e
format
mraheja Apr 11, 2023
3223b74
small test bug
mraheja Apr 11, 2023
b6a8325
fixed skylet launch issue
mraheja Apr 28, 2023
47ccba0
formatting + forgetten file
mraheja Apr 28, 2023
1bc0478
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 28, 2023
3dc81c4
more formatting + remove check
mraheja Apr 28, 2023
35a6469
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja May 20, 2023
3876881
addressed comments and removed pkill usage
mraheja May 20, 2023
4a7ef25
schedule after setup too
mraheja May 20, 2023
4ac4335
Update sky/backends/cloud_vm_ray_backend.py
mraheja Jun 1, 2023
f1556e6
addressed more comments
mraheja Jun 1, 2023
c4e21d1
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 1, 2023
b7d13b1
Merge branch 'expanded-job-queue' of https://github.com/skypilot-org/…
mraheja Jun 1, 2023
d305d26
formatting
mraheja Jun 1, 2023
88dd953
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 5, 2023
cb0d66f
Address comments
Michaelvll Jun 6, 2023
8b28637
faster job scheduling
Michaelvll Jun 6, 2023
c5881b7
format
Michaelvll Jun 6, 2023
cc7b7fd
Fix cancellation logic
Michaelvll Jun 6, 2023
4e45030
Don't schedule a job after reboot
Michaelvll Jun 6, 2023
6903de7
add comment
Michaelvll Jun 6, 2023
a6e82eb
revert changes in test
Michaelvll Jun 6, 2023
46b7c11
Add test for cancelling pending jobs
Michaelvll Jun 6, 2023
505f9e3
Make update job status more readable
Michaelvll Jun 6, 2023
f4a2b80
schedule more frequently for job cancelling
Michaelvll Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/job_queue/job_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ run: |
conda env list
for i in {1..360}; do
echo "$timestamp $i"
sleep 1
sleep 2
done
127 changes: 72 additions & 55 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,11 @@ def __init__(self):
def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
setup_cmd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
setup_log_path: Optional[str] = None,
is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
self.job_id = job_id
self.is_local = is_local
# Should use 'auto' or 'ray://<internal_head_ip>:10001' rather than
# 'ray://localhost:10001', or 'ray://127.0.0.1:10001', for public cloud.
# Otherwise, it will a bug of ray job failed to get the placement group
Expand Down Expand Up @@ -235,7 +233,6 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}',
]
# Currently, the codegen program is/can only be submitted to the head
# node, due to using job_lib for updating job statuses, and using
Expand All @@ -247,46 +244,6 @@ def add_prologue(self,
if hasattr(autostop_lib, 'set_last_active_time_to_now'):
autostop_lib.set_last_active_time_to_now()
"""))
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
# The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the
# requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string.
# We unset it so that user setup command may properly use this env var.
setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
ray.get(setup_pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
Expand All @@ -304,6 +261,9 @@ def add_gang_scheduling_placement_group(
num_nodes: int,
accelerator_dict: Dict[str, int],
stable_cluster_internal_ips: List[str],
setup_cmd: Optional[str] = None,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
setup_log_path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
) -> None:
"""Create the gang scheduling placement group for a Task.

Expand Down Expand Up @@ -345,7 +305,7 @@ def add_gang_scheduling_placement_group(
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node{{plural}}'

message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message = {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
file=sys.stderr,
Expand All @@ -357,10 +317,58 @@ def add_gang_scheduling_placement_group(
print('INFO: All task resources reserved.',
file=sys.stderr,
flush=True)
job_lib.set_job_started({self.job_id!r})
""")
]

job_id = self.job_id
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
setup_cmd = {setup_cmd!r}
_SETUP_CPUS = 0.0001
# The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the
# requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string.
# We unset it so that user setup command may properly use this env var.
setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
mraheja marked this conversation as resolved.
Show resolved Hide resolved
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={self.is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]

self._code += [
textwrap.dedent(f"""\
job_lib.set_job_started({self.job_id!r})
job_lib.scheduler.schedule_step()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be moved to after L316, as we may want to schedule the next job as soon as the current job is fulfilled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump @mraheja, I am not sure why this is marked as resolved, as we may want to start scheduling the next job as soon as the placement group is fulfilled for the job, instead of after the setup is done?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we schedule the next job immediately after the pg is fulfilled, this line should be able to be removed to reduce the overhead?

"""),
]

# Export IP and node rank to the environment variables.
self._code += [
textwrap.dedent(f"""\
Expand Down Expand Up @@ -2441,11 +2449,17 @@ def _exec_code_on_head(
handle, ray_command, ray_job_id)
else:
job_submit_cmd = (
f'{cd} && mkdir -p {remote_log_dir} && ray job submit '
f'{cd} && ray job submit '
f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} '
'--no-wait '
f'"{executable} -u {script_path} > {remote_log_path} 2>&1"')

mkdir_code = (
f'{cd} && mkdir -p {remote_log_dir} && touch '
f'{remote_log_path} && echo START > {remote_log_path} 2>&1')
mraheja marked this conversation as resolved.
Show resolved Hide resolved
code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd)
job_submit_cmd = mkdir_code + ' && ' + code

returncode, stdout, stderr = self.run_on_head(handle,
job_submit_cmd,
stream_logs=False,
Expand Down Expand Up @@ -3373,12 +3387,15 @@ def _execute_task_one_node(self, handle: ResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
1, accelerator_dict, stable_cluster_internal_ips=internal_ips)
1,
accelerator_dict,
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs,
)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down Expand Up @@ -3436,14 +3453,14 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
num_actual_nodes,
accelerator_dict,
stable_cluster_internal_ips=internal_ips)
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down
15 changes: 3 additions & 12 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""skylet events"""
import getpass
import math
import os
import re
Expand Down Expand Up @@ -52,20 +51,12 @@ def _run(self):
raise NotImplementedError


class JobUpdateEvent(SkyletEvent):
"""Skylet event for updating job status."""
class JobSchedulerEvent(SkyletEvent):
"""Skylet event for scheduling jobs"""
EVENT_INTERVAL_SECONDS = 300

# Only update status of the jobs after this many seconds of job submission,
# to avoid race condition with `ray job` to make sure it job has been
# correctly updated.
# TODO(zhwu): This number should be tuned based on heuristics.
_SUBMITTED_GAP_SECONDS = 60

def _run(self):
job_owner = getpass.getuser()
job_lib.update_status(job_owner,
submitted_gap_sec=self._SUBMITTED_GAP_SECONDS)
job_lib.scheduler.schedule_step()


class SpotJobUpdateEvent(SkyletEvent):
Expand Down
Loading