Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions gateway/api/management/commands/update_jobs_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,21 @@ def update_job_status(job: Job):
job_new_status = Job.PENDING
success = False
job_handler = get_job_handler(job.compute_resource.host)
ray_job_status = job_handler.status(job.ray_job_id) if job_handler else None

try:
ray_job_status = job_handler.status(job.ray_job_id) if job_handler else None
except RuntimeError as e:
# Handle case where Ray operator lost reference to the job
logger.error(
"Job [%s] with ray_job_id [%s] failed to get status from Ray cluster. "
"Marking as FAILED. Error: %s",
job.id,
job.ray_job_id,
str(e),
)
ray_job_status = None
job_new_status = Job.FAILED
success = False

if ray_job_status:
job_new_status = ray_job_status_to_model_job_status(ray_job_status)
Expand All @@ -59,16 +73,25 @@ def update_job_status(job: Job):
job.sub_status = None
job.env_vars = "{}"

if job_handler:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job)
# check if job is resource constrained
no_resources_log = "No available node types can fulfill resource request"
if no_resources_log in job.logs:
job_new_status = fail_job_insufficient_resources(job)
job.status = job_new_status
# cleanup env vars
job.env_vars = "{}"
if job_handler and ray_job_status is not None:
try:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job) or ""
# check if job is resource constrained
no_resources_log = "No available node types can fulfill resource request"
if no_resources_log in job.logs:
job_new_status = fail_job_insufficient_resources(job)
job.status = job_new_status
# cleanup env vars
job.env_vars = "{}"
except RuntimeError as e:
# Handle case where Ray operator lost reference to the job
logger.warning(
"Cannot fetch logs for job [%s] with ray_job_id [%s]. Error: %s",
job.id,
job.ray_job_id,
str(e),
)

try:
job.save()
Expand Down
8 changes: 5 additions & 3 deletions gateway/tests/api/management/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ def test_free_resources(self):
num_resources = ComputeResource.objects.count()
self.assertEqual(num_resources, 1)

@patch("api.ray.get_job_handler")
def test_update_jobs_statuses(self, get_job_handler):
@patch("api.ray.JobSubmissionClient")
def test_update_jobs_statuses(self, mock_job_submission_client):
"""Tests update of job statuses."""
# Test status change from PENDING to RUNNING
ray_client = MagicMock()
ray_client.get_job_status.return_value = JobStatus.RUNNING
ray_client.get_job_logs.return_value = "No logs yet."
ray_client.stop_job.return_value = True
ray_client.submit_job.return_value = "AwesomeJobId"
get_job_handler.return_value = JobHandler(ray_client)

# Mock JobSubmissionClient to return our mocked client
mock_job_submission_client.return_value = ray_client

# This new line is needed because if not the Job will timeout
job = Job.objects.get(id__exact="1a7947f9-6ae8-4e3d-ac1e-e7d608deec84")
Expand Down
Loading