From 4813fdd3287cc7a98434af6bbbdf104d7a251d0c Mon Sep 17 00:00:00 2001 From: Daniel Thom Date: Fri, 5 Jun 2026 10:32:58 -0600 Subject: [PATCH] Add server-side Slurm-job correlation endpoint `torc slurm diagnose-logs` correlated Slurm job IDs to the Torc jobs they ran by fetching four full lists (scheduled compute nodes, compute nodes, results, jobs) and joining them through three HashMaps in `build_slurm_to_jobs_map` -- the heaviest client-side join in the codebase. Add `GET /workflows/{id}/slurm_job_correlations`, which performs the whole join in one SQL query: scheduled_compute_node (scheduler_id = Slurm job ID) -> compute_node (linked via the scheduler JSON's scheduler_id) -> result -> job, grouped/ordered by (slurm_job_id, job_id) and covering all runs (matching the prior all_runs=true behavior). Every table is narrowed by its workflow_id index before joining; the query plan is index-only with no table scans. `build_slurm_to_jobs_map` now makes a single call and rebuilds the same `HashMap>`, so all consumers are unchanged. The now-unused pagination imports are removed. Adds integration tests for the correlation chain and the 404 path. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/openapi.codegen.yaml | 65 +++++ api/openapi.yaml | 65 +++++ .../Torc/src/api/apis/api_WorkflowsApi.jl | 31 ++ julia_client/Torc/src/api/modelincludes.jl | 2 + .../models/model_SlurmJobCorrelationModel.jl | 53 ++++ .../model_SlurmJobCorrelationsResponse.jl | 38 +++ julia_client/julia_client/README.md | 3 + .../docs/SlurmJobCorrelationModel.md | 14 + .../docs/SlurmJobCorrelationsResponse.md | 12 + .../julia_client/docs/WorkflowsApi.md | 29 ++ .../src/torc/openapi_client/__init__.py | 4 + .../torc/openapi_client/api/workflows_api.py | 267 ++++++++++++++++++ .../torc/openapi_client/models/__init__.py | 2 + .../models/slurm_job_correlation_model.py | 91 ++++++ .../models/slurm_job_correlations_response.py | 95 +++++++ src/client/apis/workflows_api.rs | 66 +++++ src/client/commands/slurm.rs | 142 ++-------- src/lib.rs | 4 +- src/models.rs | 17 ++ src/openapi_spec.rs | 47 ++- src/server/api/workflows.rs | 90 +++++- src/server/api_contract.rs | 7 + src/server/api_responses.rs | 13 + src/server/http_server.rs | 10 + src/server/http_server/workflows_transport.rs | 11 + src/server/http_transport/response_mapping.rs | 5 + src/server/live_router.rs | 28 ++ src/server/response_types.rs | 8 +- tests/test_workflows.rs | 76 +++++ 29 files changed, 1151 insertions(+), 144 deletions(-) create mode 100644 julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl create mode 100644 julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl create mode 100644 julia_client/julia_client/docs/SlurmJobCorrelationModel.md create mode 100644 julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md create mode 100644 python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py create mode 100644 python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py diff --git a/api/openapi.codegen.yaml b/api/openapi.codegen.yaml index a24a9420..fa4624fa 100644 --- a/api/openapi.codegen.yaml +++ b/api/openapi.codegen.yaml @@ -5313,6 +5313,44 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/slurm_job_correlations: + get: + tags: + - workflows + operationId: get_slurm_job_correlations + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/SlurmJobCorrelationsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /workflows/{id}/status: get: tags: @@ -7117,6 +7155,33 @@ components: workflow_id: type: integer format: int64 + SlurmJobCorrelationModel: + type: object + description: |- + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given + Torc job, derived from scheduled_compute_node -> compute_node -> result. + required: + - slurm_job_id + - job_id + - job_name + properties: + job_id: + type: integer + format: int64 + job_name: + type: string + slurm_job_id: + type: string + SlurmJobCorrelationsResponse: + type: object + description: All Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + required: + - items + properties: + items: + type: array + items: + $ref: '#/components/schemas/SlurmJobCorrelationModel' SlurmSchedulerModel: type: object required: diff --git a/api/openapi.yaml b/api/openapi.yaml index a24a9420..fa4624fa 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -5313,6 +5313,44 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/slurm_job_correlations: + get: + tags: + - workflows + operationId: get_slurm_job_correlations + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/SlurmJobCorrelationsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /workflows/{id}/status: get: tags: @@ -7117,6 +7155,33 @@ components: workflow_id: type: integer format: int64 + SlurmJobCorrelationModel: + type: object + description: |- + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given + Torc job, derived from scheduled_compute_node -> compute_node -> result. + required: + - slurm_job_id + - job_id + - job_name + properties: + job_id: + type: integer + format: int64 + job_name: + type: string + slurm_job_id: + type: string + SlurmJobCorrelationsResponse: + type: object + description: All Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + required: + - items + properties: + items: + type: array + items: + $ref: '#/components/schemas/SlurmJobCorrelationModel' SlurmSchedulerModel: type: object required: diff --git a/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl b/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl index 4ef33a86..c6796edc 100644 --- a/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl +++ b/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl @@ -273,6 +273,36 @@ function get_ready_job_requirements(_api::WorkflowsApi, response_stream::Channel return OpenAPI.Clients.exec(_ctx, response_stream) end +const _returntypes_get_slurm_job_correlations_WorkflowsApi = Dict{Regex,Type}( + Regex("^" * replace("200", "x"=>".") * "\$") => SlurmJobCorrelationsResponse, + Regex("^" * replace("403", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("404", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("500", "x"=>".") * "\$") => ErrorResponse, +) + +function _oacinternal_get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; _mediaType=nothing) + _ctx = OpenAPI.Clients.Ctx(_api.client, "GET", _returntypes_get_slurm_job_correlations_WorkflowsApi, "/workflows/{id}/slurm_job_correlations", []) + OpenAPI.Clients.set_param(_ctx.path, "id", id) # type Int64 + OpenAPI.Clients.set_header_accept(_ctx, ["application/json", ]) + OpenAPI.Clients.set_header_content_type(_ctx, (_mediaType === nothing) ? [] : [_mediaType]) + return _ctx +end + +@doc raw"""Params: +- id::Int64 (required) + +Return: SlurmJobCorrelationsResponse, OpenAPI.Clients.ApiResponse +""" +function get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; _mediaType=nothing) + _ctx = _oacinternal_get_slurm_job_correlations(_api, id; _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx) +end + +function get_slurm_job_correlations(_api::WorkflowsApi, response_stream::Channel, id::Int64; _mediaType=nothing) + _ctx = _oacinternal_get_slurm_job_correlations(_api, id; _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx, response_stream) +end + const _returntypes_get_workflow_WorkflowsApi = Dict{Regex,Type}( Regex("^" * replace("200", "x"=>".") * "\$") => WorkflowModel, ) @@ -777,6 +807,7 @@ export create_workflow export delete_workflow export get_active_task_for_workflow export get_ready_job_requirements +export get_slurm_job_correlations export get_workflow export get_workflow_status export initialize_jobs diff --git a/julia_client/Torc/src/api/modelincludes.jl b/julia_client/Torc/src/api/modelincludes.jl index d61e112d..df9259a7 100644 --- a/julia_client/Torc/src/api/modelincludes.jl +++ b/julia_client/Torc/src/api/modelincludes.jl @@ -76,6 +76,8 @@ include("models/model_ResourceRequirementsModel.jl") include("models/model_ResultModel.jl") include("models/model_RoCrateEntityModel.jl") include("models/model_ScheduledComputeNodesModel.jl") +include("models/model_SlurmJobCorrelationModel.jl") +include("models/model_SlurmJobCorrelationsResponse.jl") include("models/model_SlurmSchedulerModel.jl") include("models/model_SlurmStatsModel.jl") include("models/model_SpawnJobModel.jl") diff --git a/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl new file mode 100644 index 00000000..f8423f89 --- /dev/null +++ b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl @@ -0,0 +1,53 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""SlurmJobCorrelationModel +One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given Torc job, derived from scheduled_compute_node -> compute_node -> result. + + SlurmJobCorrelationModel(; + job_id=nothing, + job_name=nothing, + slurm_job_id=nothing, + ) + + - job_id::Int64 + - job_name::String + - slurm_job_id::String +""" +Base.@kwdef mutable struct SlurmJobCorrelationModel <: OpenAPI.APIModel + job_id::Union{Nothing, Int64} = nothing + job_name::Union{Nothing, String} = nothing + slurm_job_id::Union{Nothing, String} = nothing + + function SlurmJobCorrelationModel(job_id, job_name, slurm_job_id, ) + o = new(job_id, job_name, slurm_job_id, ) + OpenAPI.validate_properties(o) + return o + end +end # type SlurmJobCorrelationModel + +const _property_types_SlurmJobCorrelationModel = Dict{Symbol,String}(Symbol("job_id")=>"Int64", Symbol("job_name")=>"String", Symbol("slurm_job_id")=>"String", ) +OpenAPI.property_type(::Type{ SlurmJobCorrelationModel }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_SlurmJobCorrelationModel[name]))} + +function OpenAPI.check_required(o::SlurmJobCorrelationModel) + o.job_id === nothing && (return false) + o.job_name === nothing && (return false) + o.slurm_job_id === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::SlurmJobCorrelationModel) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("job_id"), o.job_id) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("job_name"), o.job_name) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("slurm_job_id"), o.slurm_job_id) +end + +function OpenAPI.validate_property(::Type{ SlurmJobCorrelationModel }, name::Symbol, val) + + if name === Symbol("job_id") + OpenAPI.validate_param(name, "SlurmJobCorrelationModel", :format, val, "int64") + end + + +end diff --git a/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl new file mode 100644 index 00000000..8a7acc7a --- /dev/null +++ b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl @@ -0,0 +1,38 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""SlurmJobCorrelationsResponse +All Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + + SlurmJobCorrelationsResponse(; + items=nothing, + ) + + - items::Vector{SlurmJobCorrelationModel} +""" +Base.@kwdef mutable struct SlurmJobCorrelationsResponse <: OpenAPI.APIModel + items::Union{Nothing, Vector} = nothing # spec type: Union{ Nothing, Vector{SlurmJobCorrelationModel} } + + function SlurmJobCorrelationsResponse(items, ) + o = new(items, ) + OpenAPI.validate_properties(o) + return o + end +end # type SlurmJobCorrelationsResponse + +const _property_types_SlurmJobCorrelationsResponse = Dict{Symbol,String}(Symbol("items")=>"Vector{SlurmJobCorrelationModel}", ) +OpenAPI.property_type(::Type{ SlurmJobCorrelationsResponse }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_SlurmJobCorrelationsResponse[name]))} + +function OpenAPI.check_required(o::SlurmJobCorrelationsResponse) + o.items === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::SlurmJobCorrelationsResponse) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("items"), o.items) +end + +function OpenAPI.validate_property(::Type{ SlurmJobCorrelationsResponse }, name::Symbol, val) + +end diff --git a/julia_client/julia_client/README.md b/julia_client/julia_client/README.md index a4d39d32..84837962 100644 --- a/julia_client/julia_client/README.md +++ b/julia_client/julia_client/README.md @@ -133,6 +133,7 @@ Class | Method *WorkflowsApi* | [**delete_workflow**](docs/WorkflowsApi.md#delete_workflow)
**DELETE** /workflows/{id}
*WorkflowsApi* | [**get_active_task_for_workflow**](docs/WorkflowsApi.md#get_active_task_for_workflow)
**GET** /workflows/{id}/active_task
*WorkflowsApi* | [**get_ready_job_requirements**](docs/WorkflowsApi.md#get_ready_job_requirements)
**GET** /workflows/{id}/ready_job_requirements
+*WorkflowsApi* | [**get_slurm_job_correlations**](docs/WorkflowsApi.md#get_slurm_job_correlations)
**GET** /workflows/{id}/slurm_job_correlations
*WorkflowsApi* | [**get_workflow**](docs/WorkflowsApi.md#get_workflow)
**GET** /workflows/{id}
*WorkflowsApi* | [**get_workflow_status**](docs/WorkflowsApi.md#get_workflow_status)
**GET** /workflows/{id}/status
*WorkflowsApi* | [**initialize_jobs**](docs/WorkflowsApi.md#initialize_jobs)
**POST** /workflows/{id}/initialize_jobs
@@ -228,6 +229,8 @@ Class | Method - [ResultModel](docs/ResultModel.md) - [RoCrateEntityModel](docs/RoCrateEntityModel.md) - [ScheduledComputeNodesModel](docs/ScheduledComputeNodesModel.md) + - [SlurmJobCorrelationModel](docs/SlurmJobCorrelationModel.md) + - [SlurmJobCorrelationsResponse](docs/SlurmJobCorrelationsResponse.md) - [SlurmSchedulerModel](docs/SlurmSchedulerModel.md) - [SlurmStatsModel](docs/SlurmStatsModel.md) - [SpawnJobModel](docs/SpawnJobModel.md) diff --git a/julia_client/julia_client/docs/SlurmJobCorrelationModel.md b/julia_client/julia_client/docs/SlurmJobCorrelationModel.md new file mode 100644 index 00000000..e17c92d9 --- /dev/null +++ b/julia_client/julia_client/docs/SlurmJobCorrelationModel.md @@ -0,0 +1,14 @@ +# SlurmJobCorrelationModel + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**job_id** | **Int64** | | [default to nothing] +**job_name** | **String** | | [default to nothing] +**slurm_job_id** | **String** | | [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md b/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md new file mode 100644 index 00000000..d14d768b --- /dev/null +++ b/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md @@ -0,0 +1,12 @@ +# SlurmJobCorrelationsResponse + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**items** | [**Vector{SlurmJobCorrelationModel}**](SlurmJobCorrelationModel.md) | | [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/WorkflowsApi.md b/julia_client/julia_client/docs/WorkflowsApi.md index 41d2417e..bda0c98a 100644 --- a/julia_client/julia_client/docs/WorkflowsApi.md +++ b/julia_client/julia_client/docs/WorkflowsApi.md @@ -13,6 +13,7 @@ Method | HTTP request | Description [**delete_workflow**](WorkflowsApi.md#delete_workflow) | **DELETE** /workflows/{id} | [**get_active_task_for_workflow**](WorkflowsApi.md#get_active_task_for_workflow) | **GET** /workflows/{id}/active_task | [**get_ready_job_requirements**](WorkflowsApi.md#get_ready_job_requirements) | **GET** /workflows/{id}/ready_job_requirements | +[**get_slurm_job_correlations**](WorkflowsApi.md#get_slurm_job_correlations) | **GET** /workflows/{id}/slurm_job_correlations | [**get_workflow**](WorkflowsApi.md#get_workflow) | **GET** /workflows/{id} | [**get_workflow_status**](WorkflowsApi.md#get_workflow_status) | **GET** /workflows/{id}/status | [**initialize_jobs**](WorkflowsApi.md#initialize_jobs) | **POST** /workflows/{id}/initialize_jobs | @@ -305,6 +306,34 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) +# **get_slurm_job_correlations** +> get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; _mediaType=nothing) -> SlurmJobCorrelationsResponse, OpenAPI.Clients.ApiResponse
+> get_slurm_job_correlations(_api::WorkflowsApi, response_stream::Channel, id::Int64; _mediaType=nothing) -> Channel{ SlurmJobCorrelationsResponse }, OpenAPI.Clients.ApiResponse + + + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **_api** | **WorkflowsApi** | API context | +**id** | **Int64** | Workflow ID | + +### Return type + +[**SlurmJobCorrelationsResponse**](SlurmJobCorrelationsResponse.md) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: application/json + +[[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) + # **get_workflow** > get_workflow(_api::WorkflowsApi, id::Int64; _mediaType=nothing) -> WorkflowModel, OpenAPI.Clients.ApiResponse
> get_workflow(_api::WorkflowsApi, response_stream::Channel, id::Int64; _mediaType=nothing) -> Channel{ WorkflowModel }, OpenAPI.Clients.ApiResponse diff --git a/python_client/src/torc/openapi_client/__init__.py b/python_client/src/torc/openapi_client/__init__.py index c9386eba..4d15d9c9 100644 --- a/python_client/src/torc/openapi_client/__init__.py +++ b/python_client/src/torc/openapi_client/__init__.py @@ -121,6 +121,8 @@ "ResultModel", "RoCrateEntityModel", "ScheduledComputeNodesModel", + "SlurmJobCorrelationModel", + "SlurmJobCorrelationsResponse", "SlurmSchedulerModel", "SlurmStatsModel", "SpawnJobModel", @@ -248,6 +250,8 @@ from torc.openapi_client.models.result_model import ResultModel as ResultModel from torc.openapi_client.models.ro_crate_entity_model import RoCrateEntityModel as RoCrateEntityModel from torc.openapi_client.models.scheduled_compute_nodes_model import ScheduledComputeNodesModel as ScheduledComputeNodesModel +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel as SlurmJobCorrelationModel +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse as SlurmJobCorrelationsResponse from torc.openapi_client.models.slurm_scheduler_model import SlurmSchedulerModel as SlurmSchedulerModel from torc.openapi_client.models.slurm_stats_model import SlurmStatsModel as SlurmStatsModel from torc.openapi_client.models.spawn_job_model import SpawnJobModel as SpawnJobModel diff --git a/python_client/src/torc/openapi_client/api/workflows_api.py b/python_client/src/torc/openapi_client/api/workflows_api.py index 46f12751..84c36812 100644 --- a/python_client/src/torc/openapi_client/api/workflows_api.py +++ b/python_client/src/torc/openapi_client/api/workflows_api.py @@ -38,6 +38,7 @@ from torc.openapi_client.models.list_workflows_response import ListWorkflowsResponse from torc.openapi_client.models.process_changed_job_inputs_response import ProcessChangedJobInputsResponse from torc.openapi_client.models.reset_job_status_response import ResetJobStatusResponse +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse from torc.openapi_client.models.workflow_model import WorkflowModel from torc.openapi_client.models.workflow_status_response import WorkflowStatusResponse @@ -2562,6 +2563,272 @@ def _get_ready_job_requirements_serialize( + @validate_call + def get_slurm_job_correlations( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> SlurmJobCorrelationsResponse: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def get_slurm_job_correlations_with_http_info( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[SlurmJobCorrelationsResponse]: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def get_slurm_job_correlations_without_preload_content( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _get_slurm_job_correlations_serialize( + self, + id, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[ + str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] + ] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if id is not None: + _path_params['id'] = id + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + + + # set the HTTP header `Accept` + if 'Accept' not in _header_params: + _header_params['Accept'] = self.api_client.select_header_accept( + [ + 'application/json' + ] + ) + + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='GET', + resource_path='/workflows/{id}/slurm_job_correlations', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + @validate_call def get_workflow( self, diff --git a/python_client/src/torc/openapi_client/models/__init__.py b/python_client/src/torc/openapi_client/models/__init__.py index 18e6d41d..8b63d24d 100644 --- a/python_client/src/torc/openapi_client/models/__init__.py +++ b/python_client/src/torc/openapi_client/models/__init__.py @@ -88,6 +88,8 @@ from torc.openapi_client.models.result_model import ResultModel from torc.openapi_client.models.ro_crate_entity_model import RoCrateEntityModel from torc.openapi_client.models.scheduled_compute_nodes_model import ScheduledComputeNodesModel +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse from torc.openapi_client.models.slurm_scheduler_model import SlurmSchedulerModel from torc.openapi_client.models.slurm_stats_model import SlurmStatsModel from torc.openapi_client.models.spawn_job_model import SpawnJobModel diff --git a/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py b/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py new file mode 100644 index 00000000..872ed69a --- /dev/null +++ b/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py @@ -0,0 +1,91 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, StrictInt, StrictStr +from typing import Any, ClassVar, Dict, List +from typing import Optional, Set +from typing_extensions import Self + +class SlurmJobCorrelationModel(BaseModel): + """ + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given Torc job, derived from scheduled_compute_node -> compute_node -> result. + """ # noqa: E501 + job_id: StrictInt + job_name: StrictStr + slurm_job_id: StrictStr + __properties: ClassVar[List[str]] = ["job_id", "job_name", "slurm_job_id"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationModel from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationModel from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "job_id": obj.get("job_id"), + "job_name": obj.get("job_name"), + "slurm_job_id": obj.get("slurm_job_id") + }) + return _obj + + diff --git a/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py b/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py new file mode 100644 index 00000000..b7f80824 --- /dev/null +++ b/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py @@ -0,0 +1,95 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict +from typing import Any, ClassVar, Dict, List +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel +from typing import Optional, Set +from typing_extensions import Self + +class SlurmJobCorrelationsResponse(BaseModel): + """ + All Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + """ # noqa: E501 + items: List[SlurmJobCorrelationModel] + __properties: ClassVar[List[str]] = ["items"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationsResponse from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of each item in items (list) + _items = [] + if self.items: + for _item_items in self.items: + if _item_items: + _items.append(_item_items.to_dict()) + _dict['items'] = _items + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationsResponse from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "items": [SlurmJobCorrelationModel.from_dict(_item) for _item in obj["items"]] if obj.get("items") is not None else None + }) + return _obj + + diff --git a/src/client/apis/workflows_api.rs b/src/client/apis/workflows_api.rs index f2094778..7b63dd07 100644 --- a/src/client/apis/workflows_api.rs +++ b/src/client/apis/workflows_api.rs @@ -86,6 +86,16 @@ pub enum GetReadyJobRequirementsError { UnknownValue(serde_json::Value), } +/// struct for typed errors of method [`get_slurm_job_correlations`] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum GetSlurmJobCorrelationsError { + Status403(models::ErrorResponse), + Status404(models::ErrorResponse), + Status500(models::ErrorResponse), + UnknownValue(serde_json::Value), +} + /// struct for typed errors of method [`get_workflow`] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] @@ -745,6 +755,62 @@ pub fn get_ready_job_requirements( } } +pub fn get_slurm_job_correlations( + configuration: &configuration::Configuration, + id: i64, +) -> Result> { + // add a prefix to parameters to efficiently prevent name collisions + let p_path_id = id; + + let uri_str = format!( + "{}/workflows/{id}/slurm_job_correlations", + configuration.base_path, + id = p_path_id + ); + let mut req_builder = configuration.client.request(reqwest::Method::GET, &uri_str); + + if let Some(ref user_agent) = configuration.user_agent { + req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone()); + } + req_builder = configuration.apply_auth(req_builder); + + let req = req_builder.build()?; + let resp = configuration.client.execute(req)?; + + let status = resp.status(); + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream"); + let content_type = super::ContentType::from(content_type); + + if !status.is_client_error() && !status.is_server_error() { + let content = resp.text()?; + match content_type { + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => { + return Err(Error::from(serde_json::Error::custom( + "Received `text/plain` content type response that cannot be converted to `models::SlurmJobCorrelationsResponse`", + ))); + } + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::SlurmJobCorrelationsResponse`" + )))); + } + } + } else { + let content = resp.text()?; + let entity: Option = serde_json::from_str(&content).ok(); + Err(Error::ResponseError(ResponseContent { + status, + content, + entity, + })) + } +} + pub fn get_workflow( configuration: &configuration::Configuration, id: i64, diff --git a/src/client/commands/slurm.rs b/src/client/commands/slurm.rs index 59af0516..c6eeff4b 100644 --- a/src/client/commands/slurm.rs +++ b/src/client/commands/slurm.rs @@ -47,10 +47,10 @@ use crate::client::apis::configuration::Configuration; use crate::client::commands::get_env_user_name; use crate::client::commands::hpc::create_registry_with_config_public; use crate::client::commands::pagination::{ - ComputeNodeListParams, JobListParams, ResourceRequirementsListParams, ResultListParams, - ScheduledComputeNodeListParams, SlurmSchedulersListParams, paginate_compute_nodes, - paginate_jobs, paginate_resource_requirements, paginate_results, - paginate_scheduled_compute_nodes, paginate_slurm_schedulers, + JobListParams, ResourceRequirementsListParams, ResultListParams, + ScheduledComputeNodeListParams, SlurmSchedulersListParams, paginate_jobs, + paginate_resource_requirements, paginate_results, paginate_scheduled_compute_nodes, + paginate_slurm_schedulers, }; use crate::client::commands::{ print_error, select_workflow_interactively, @@ -2240,131 +2240,27 @@ fn build_slurm_to_jobs_map( ) -> HashMap> { let mut slurm_to_jobs: HashMap> = HashMap::new(); - // Step 1: Get all scheduled compute nodes (they have scheduler_id = Slurm job ID) - let scheduled_nodes = match paginate_scheduled_compute_nodes( - config, - workflow_id, - ScheduledComputeNodeListParams::new(), - ) { - Ok(nodes) => nodes, - Err(e) => { - warn!( - "Could not fetch scheduled compute nodes for job correlation: {}", - e - ); - return slurm_to_jobs; - } - }; - - // Build scn_id -> slurm_job_id map - let scn_to_slurm: HashMap = scheduled_nodes - .iter() - .filter(|scn| scn.scheduler_type == "slurm") - .filter_map(|scn| scn.id.map(|id| (id, scn.scheduler_id.to_string()))) - .collect(); - - if scn_to_slurm.is_empty() { - return slurm_to_jobs; - } - - // Step 2: Get all compute nodes and build slurm_job_id -> compute_node_ids map - let compute_nodes = - match paginate_compute_nodes(config, workflow_id, ComputeNodeListParams::new()) { - Ok(nodes) => nodes, - Err(e) => { - warn!("Could not fetch compute nodes for job correlation: {}", e); - return slurm_to_jobs; - } - }; - - // Build slurm_job_id -> Vec map using SCN relationship - let mut slurm_to_compute_nodes: HashMap> = HashMap::new(); - for node in &compute_nodes { - if node.compute_node_type != "slurm" { - continue; - } - if let Some(scheduler) = &node.scheduler { - // Get the SCN ID from the scheduler JSON - if let Some(scn_id) = scheduler.get("scheduler_id").and_then(|v| v.as_i64()) { - // Look up the Slurm job ID from our SCN map - if let Some(slurm_job_id) = scn_to_slurm.get(&scn_id) - && let Some(node_id) = node.id - { - slurm_to_compute_nodes - .entry(slurm_job_id.clone()) - .or_default() - .push(node_id); - } - } - } - } - - if slurm_to_compute_nodes.is_empty() { - return slurm_to_jobs; - } - - // Step 3: Get all results and build compute_node_id -> Vec map - let results = match paginate_results( - config, - workflow_id, - ResultListParams::new().with_all_runs(true), - ) { - Ok(results) => results, + // The server correlates scheduled_compute_node -> compute_node -> result -> + // job in a single query (covering all runs), replacing what used to be four + // full-list fetches joined in memory here. Rows arrive grouped/sorted by + // (slurm_job_id, job_id), so each per-Slurm-job Vec is already deduplicated + // and ordered. + let response = match apis::workflows_api::get_slurm_job_correlations(config, workflow_id) { + Ok(response) => response, Err(e) => { - warn!("Could not fetch results for job correlation: {}", e); + warn!("Could not fetch Slurm job correlations: {}", e); return slurm_to_jobs; } }; - let mut compute_node_to_jobs: HashMap> = HashMap::new(); - for result in &results { - compute_node_to_jobs - .entry(result.compute_node_id) + for item in response.items { + slurm_to_jobs + .entry(item.slurm_job_id) .or_default() - .push(result.job_id); - } - - // Step 4: Get all jobs and build job_id -> job_name map - let jobs = match paginate_jobs(config, workflow_id, JobListParams::new()) { - Ok(jobs) => jobs, - Err(e) => { - warn!("Could not fetch jobs for job correlation: {}", e); - return slurm_to_jobs; - } - }; - - let job_id_to_name: HashMap = jobs - .iter() - .filter_map(|j| j.id.map(|id| (id, j.name.clone()))) - .collect(); - - // Step 5: Build the final slurm_job_id -> Vec map - for (slurm_id, compute_node_ids) in &slurm_to_compute_nodes { - let mut affected_jobs: Vec = Vec::new(); - let mut seen_job_ids: std::collections::HashSet = std::collections::HashSet::new(); - - for compute_node_id in compute_node_ids { - if let Some(job_ids) = compute_node_to_jobs.get(compute_node_id) { - for job_id in job_ids { - if seen_job_ids.insert(*job_id) { - let job_name = job_id_to_name - .get(job_id) - .cloned() - .unwrap_or_else(|| format!("job_{}", job_id)); - affected_jobs.push(AffectedJob { - job_id: *job_id, - job_name, - }); - } - } - } - } - - if !affected_jobs.is_empty() { - // Sort by job_id for consistent output - affected_jobs.sort_by_key(|j| j.job_id); - slurm_to_jobs.insert(slurm_id.clone(), affected_jobs); - } + .push(AffectedJob { + job_id: item.job_id, + job_name: item.job_name, + }); } slurm_to_jobs diff --git a/src/lib.rs b/src/lib.rs index b9829e6f..4d999822 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,8 +79,8 @@ pub use models::{ ListResourceRequirementsResponse, ListResultsResponse, ListScheduledComputeNodesResponse, ListSlurmSchedulersResponse, ListUserDataResponse, ListWorkflowsResponse, LocalSchedulerModel, ProcessChangedJobInputsResponse, ResourceRequirementsModel, ResultModel, - ScheduledComputeNodesModel, SlurmSchedulerModel, UserDataModel, WorkflowActionModel, - WorkflowModel, WorkflowStatusResponse, + ScheduledComputeNodesModel, SlurmJobCorrelationModel, SlurmJobCorrelationsResponse, + SlurmSchedulerModel, UserDataModel, WorkflowActionModel, WorkflowModel, WorkflowStatusResponse, }; // Re-export client types when client feature is enabled diff --git a/src/models.rs b/src/models.rs index 11d9cd20..256b7c1c 100644 --- a/src/models.rs +++ b/src/models.rs @@ -2388,6 +2388,23 @@ pub struct WorkflowStatusResponse { pub is_canceled: bool, } +/// One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given +/// Torc job, derived from scheduled_compute_node -> compute_node -> result. +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SlurmJobCorrelationModel { + pub slurm_job_id: String, + pub job_id: i64, + pub job_name: String, +} + +/// All Slurm-job-to-Torc-job correlations for a workflow, computed server-side. +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SlurmJobCorrelationsResponse { + pub items: Vec, +} + #[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ResetJobStatusResponse { diff --git a/src/openapi_spec.rs b/src/openapi_spec.rs index 9e2acf98..a250b4e8 100644 --- a/src/openapi_spec.rs +++ b/src/openapi_spec.rs @@ -24,8 +24,9 @@ use crate::models::{ ListUserDataResponse, ListUserGroupMembershipsResponse, ListWorkflowsResponse, LocalSchedulerModel, MessageResponse, ProcessChangedJobInputsResponse, ReloadAuthResponse, RemoteWorkerModel, ResetJobStatusResponse, ResourceRequirementsModel, ResultModel, - RoCrateEntityModel, ScheduledComputeNodesModel, SlurmSchedulerModel, SlurmStatsModel, - SpawnJobModel, SpawnJobsRequest, SpawnJobsResponse, UserDataListModel, UserDataModel, + RoCrateEntityModel, ScheduledComputeNodesModel, SlurmJobCorrelationModel, + SlurmJobCorrelationsResponse, SlurmSchedulerModel, SlurmStatsModel, SpawnJobModel, + SpawnJobsRequest, SpawnJobsResponse, UserDataListModel, UserDataModel, UserGroupMembershipModel, WorkflowAccessGroupModel, WorkflowActionModel, WorkflowModel, WorkflowStatusResponse, }; @@ -153,20 +154,21 @@ mod openapi_workflow_paths { __path_archive_workflow, __path_batch_complete_jobs, __path_cancel_workflow, __path_claim_jobs_based_on_resources, __path_claim_next_jobs, __path_create_workflow, __path_delete_workflow, __path_get_active_task_for_workflow, - __path_get_ready_job_requirements, __path_get_workflow, __path_get_workflow_status, - __path_initialize_jobs, __path_is_workflow_complete, __path_is_workflow_uninitialized, - __path_list_job_dependencies, __path_list_job_file_relationships, __path_list_job_ids, + __path_get_ready_job_requirements, __path_get_slurm_job_correlations, __path_get_workflow, + __path_get_workflow_status, __path_initialize_jobs, __path_is_workflow_complete, + __path_is_workflow_uninitialized, __path_list_job_dependencies, + __path_list_job_file_relationships, __path_list_job_ids, __path_list_job_user_data_relationships, __path_list_missing_user_data, __path_list_required_existing_files, __path_list_workflows, __path_process_changed_job_inputs, __path_reset_job_status, __path_reset_workflow_status, __path_update_workflow, archive_workflow, batch_complete_jobs, cancel_workflow, claim_jobs_based_on_resources, claim_next_jobs, create_workflow, delete_workflow, - get_active_task_for_workflow, get_ready_job_requirements, get_workflow, - get_workflow_status, initialize_jobs, is_workflow_complete, is_workflow_uninitialized, - list_job_dependencies, list_job_file_relationships, list_job_ids, - list_job_user_data_relationships, list_missing_user_data, list_required_existing_files, - list_workflows, process_changed_job_inputs, reset_job_status, reset_workflow_status, - update_workflow, + get_active_task_for_workflow, get_ready_job_requirements, get_slurm_job_correlations, + get_workflow, get_workflow_status, initialize_jobs, is_workflow_complete, + is_workflow_uninitialized, list_job_dependencies, list_job_file_relationships, + list_job_ids, list_job_user_data_relationships, list_missing_user_data, + list_required_existing_files, list_workflows, process_changed_job_inputs, reset_job_status, + reset_workflow_status, update_workflow, }; } @@ -537,6 +539,7 @@ fn resolve_schema_properties<'a>( openapi_workflow_paths::delete_workflow, openapi_workflow_paths::get_workflow, openapi_workflow_paths::get_workflow_status, + openapi_workflow_paths::get_slurm_job_correlations, openapi_workflow_paths::update_workflow, openapi_workflow_paths::cancel_workflow, openapi_workflow_paths::initialize_jobs, @@ -650,6 +653,8 @@ fn resolve_schema_properties<'a>( IsUninitializedResponse, JobStatusCounts, WorkflowStatusResponse, + SlurmJobCorrelationModel, + SlurmJobCorrelationsResponse, ResetJobStatusResponse, UserDataModel, ListUserDataResponse @@ -2115,6 +2120,14 @@ pub fn parity_report(source: &str) -> Result, Box Result, Box { context: &C, ) -> Result; + /// Return Slurm-job-to-Torc-job correlations for the workflow. + async fn get_slurm_job_correlations( + &self, + id: i64, + context: &C, + ) -> Result; + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, @@ -1432,6 +1440,80 @@ where )) } + /// Return Slurm-job-to-Torc-job correlations for the workflow. + /// + /// Joins scheduled_compute_node (the Slurm allocation, whose `scheduler_id` + /// is the Slurm job ID) -> compute_node (linked via the scheduler JSON's + /// `scheduler_id`) -> result (`compute_node_id`) -> job. Replaces the former + /// client-side correlation that fetched four full lists and joined them in + /// memory. Covers all runs, matching the previous `all_runs=true` behavior. + /// Each workflow filter propagates through the join conditions so every + /// table is narrowed by its `workflow_id` index before the join. + async fn get_slurm_job_correlations( + &self, + id: i64, + context: &C, + ) -> Result { + debug!( + "get_slurm_job_correlations({}) - X-Span-ID: {:?}", + id, + context.get().0.clone() + ); + + let pool = self.context.pool.as_ref(); + + // 404 if the workflow does not exist. + let exists: Option = sqlx::query_scalar("SELECT 1 FROM workflow WHERE id = ?") + .bind(id) + .fetch_optional(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to get workflow"))?; + if exists.is_none() { + return Ok(GetSlurmJobCorrelationsResponse::NotFoundErrorResponse( + resource_not_found_response("Workflow", id), + )); + } + + let rows = sqlx::query( + r#" + SELECT + CAST(scn.scheduler_id AS TEXT) AS slurm_job_id, + j.id AS job_id, + j.name AS job_name + FROM scheduled_compute_node scn + JOIN compute_node cn + ON cn.workflow_id = scn.workflow_id + AND json_extract(cn.scheduler, '$.scheduler_id') = scn.id + JOIN result r + ON r.compute_node_id = cn.id + AND r.workflow_id = scn.workflow_id + JOIN job j ON j.id = r.job_id + WHERE scn.workflow_id = ? + AND scn.scheduler_type = 'slurm' + AND cn.compute_node_type = 'slurm' + GROUP BY slurm_job_id, j.id, j.name + ORDER BY slurm_job_id, j.id + "#, + ) + .bind(id) + .fetch_all(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to correlate Slurm jobs"))?; + + let items = rows + .iter() + .map(|row| models::SlurmJobCorrelationModel { + slurm_job_id: row.get("slurm_job_id"), + job_id: row.get("job_id"), + job_name: row.get("job_name"), + }) + .collect(); + + Ok(GetSlurmJobCorrelationsResponse::SuccessfulResponse( + models::SlurmJobCorrelationsResponse { items }, + )) + } + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, diff --git a/src/server/api_contract.rs b/src/server/api_contract.rs index 53324a9e..b530d2f0 100644 --- a/src/server/api_contract.rs +++ b/src/server/api_contract.rs @@ -767,6 +767,13 @@ pub trait TransportApiCore { context: &C, ) -> Result; + /// Return Slurm-job-to-Torc-job correlations for the workflow. + async fn get_slurm_job_correlations( + &self, + id: i64, + context: &C, + ) -> Result; + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, diff --git a/src/server/api_responses.rs b/src/server/api_responses.rs index 821c880f..037bb629 100644 --- a/src/server/api_responses.rs +++ b/src/server/api_responses.rs @@ -1016,6 +1016,19 @@ pub enum GetWorkflowStatusResponse { DefaultErrorResponse(models::ErrorResponse), } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[must_use] +pub enum GetSlurmJobCorrelationsResponse { + /// Successful response + SuccessfulResponse(models::SlurmJobCorrelationsResponse), + /// Forbidden - user does not have access + ForbiddenErrorResponse(models::ErrorResponse), + /// Not found error response + NotFoundErrorResponse(models::ErrorResponse), + /// Default error response + DefaultErrorResponse(models::ErrorResponse), +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] #[must_use] pub enum IsWorkflowUninitializedResponse { diff --git a/src/server/http_server.rs b/src/server/http_server.rs index f65f1ec1..4272283b 100644 --- a/src/server/http_server.rs +++ b/src/server/http_server.rs @@ -2043,6 +2043,16 @@ where self.transport_get_workflow_status(id, context).await } + /// Return Slurm-job-to-Torc-job correlations for the workflow. + #[instrument(level = "debug", skip(self, context), fields(workflow_id = id))] + async fn get_slurm_job_correlations( + &self, + id: i64, + context: &C, + ) -> Result { + self.transport_get_slurm_job_correlations(id, context).await + } + async fn is_workflow_uninitialized( &self, id: i64, diff --git a/src/server/http_server/workflows_transport.rs b/src/server/http_server/workflows_transport.rs index cbfbf349..3fa6f6ef 100644 --- a/src/server/http_server/workflows_transport.rs +++ b/src/server/http_server/workflows_transport.rs @@ -404,6 +404,17 @@ where self.workflows_api.get_workflow_status(id, context).await } + pub(super) async fn transport_get_slurm_job_correlations( + &self, + id: i64, + context: &C, + ) -> Result { + authorize_workflow!(self, id, context, GetSlurmJobCorrelationsResponse); + self.workflows_api + .get_slurm_job_correlations(id, context) + .await + } + pub(super) async fn transport_is_workflow_uninitialized( &self, id: i64, diff --git a/src/server/http_transport/response_mapping.rs b/src/server/http_transport/response_mapping.rs index 6558d81a..f8c51124 100644 --- a/src/server/http_transport/response_mapping.rs +++ b/src/server/http_transport/response_mapping.rs @@ -559,6 +559,11 @@ map_response!( GetWorkflowStatusResponse, SuccessfulResponse ); +map_response!( + get_slurm_job_correlations_response, + GetSlurmJobCorrelationsResponse, + SuccessfulResponse +); map_response!( is_workflow_uninitialized_response, IsWorkflowUninitializedResponse, diff --git a/src/server/live_router.rs b/src/server/live_router.rs index a0ab4c98..a68840f2 100644 --- a/src/server/live_router.rs +++ b/src/server/live_router.rs @@ -333,6 +333,10 @@ pub fn app_router(state: LiveRouterState) -> Router { "/torc-service/v1/workflows/{id}/status", get(get_workflow_status), ) + .route( + "/torc-service/v1/workflows/{id}/slurm_job_correlations", + get(get_slurm_job_correlations), + ) .route( "/torc-service/v1/workflows/{id}/is_uninitialized", get(is_workflow_uninitialized), @@ -3762,6 +3766,30 @@ pub async fn get_workflow_status( } } +#[utoipa::path( + get, + tag = "workflows", + path = "/workflows/{id}/slurm_job_correlations", + operation_id = "get_slurm_job_correlations", + params(("id" = i64, Path, description = "Workflow ID")), + responses( + (status = 200, body = models::SlurmJobCorrelationsResponse), + (status = 403, body = models::ErrorResponse, description = "User does not have access"), + (status = 404, body = models::ErrorResponse, description = "Workflow not found"), + (status = 500, body = models::ErrorResponse) + ) +)] +pub async fn get_slurm_job_correlations( + State(state): State, + Path(id): Path, + Extension(context): Extension, +) -> Response { + match state.server.get_slurm_job_correlations(id, &context).await { + Ok(response) => get_slurm_job_correlations_response(response), + Err(err) => error_response(StatusCode::INTERNAL_SERVER_ERROR, err.0), + } +} + #[utoipa::path( get, tag = "workflows", diff --git a/src/server/response_types.rs b/src/server/response_types.rs index 0f851fcb..4b7b323f 100644 --- a/src/server/response_types.rs +++ b/src/server/response_types.rs @@ -77,9 +77,9 @@ pub mod workflows { pub use crate::server::api_responses::{ ArchiveWorkflowResponse, CancelWorkflowResponse, ClaimActionResponse, CreateWorkflowActionResponse, CreateWorkflowResponse, DeleteWorkflowResponse, - GetActiveTaskResponse, GetPendingActionsResponse, GetWorkflowActionsResponse, - GetWorkflowResponse, GetWorkflowStatusResponse, IsWorkflowCompleteResponse, - IsWorkflowUninitializedResponse, ListWorkflowsResponse, ResetWorkflowStatusResponse, - UpdateWorkflowResponse, + GetActiveTaskResponse, GetPendingActionsResponse, GetSlurmJobCorrelationsResponse, + GetWorkflowActionsResponse, GetWorkflowResponse, GetWorkflowStatusResponse, + IsWorkflowCompleteResponse, IsWorkflowUninitializedResponse, ListWorkflowsResponse, + ResetWorkflowStatusResponse, UpdateWorkflowResponse, }; } diff --git a/tests/test_workflows.rs b/tests/test_workflows.rs index e482e8c6..a1d9eb69 100644 --- a/tests/test_workflows.rs +++ b/tests/test_workflows.rs @@ -1593,3 +1593,79 @@ fn test_get_workflow_status_not_found(start_server: &ServerProcess) { "expected error for nonexistent workflow status" ); } + +#[rstest] +fn test_get_slurm_job_correlations(start_server: &ServerProcess) { + let config = &start_server.config; + let workflow = create_test_workflow(config, "slurm_correlation_workflow"); + let workflow_id = workflow.id.unwrap(); + let job = create_test_job(config, workflow_id, "slurm_job"); + let job_id = job.id.unwrap(); + + // Slurm allocation: scheduler_id is the Slurm job ID. + let scn = apis::scheduled_compute_nodes_api::create_scheduled_compute_node( + config, + models::ScheduledComputeNodesModel::new( + workflow_id, + 987654, + 1, + "slurm".to_string(), + "active".to_string(), + ), + ) + .expect("create scheduled compute node"); + let scn_id = scn.id.unwrap(); + + // Compute node belonging to that allocation; the scheduler JSON links it + // back to the scheduled compute node via scheduler_id. + let compute_node = apis::compute_nodes_api::create_compute_node( + config, + models::ComputeNodeModel::new( + workflow_id, + "slurm-host".to_string(), + std::process::id() as i64, + chrono::Utc::now().to_rfc3339(), + 8, + 16.0, + 0, + 1, + "slurm".to_string(), + Some(serde_json::json!({ "scheduler_id": scn_id })), + ), + ) + .expect("create compute node"); + let compute_node_id = compute_node.id.unwrap(); + + // Result links the job to that compute node, completing the chain. + let result = models::ResultModel::new( + job_id, + workflow_id, + 1, + 1, + compute_node_id, + 0, + 1.0, + "2024-01-01T12:00:00.000Z".to_string(), + models::JobStatus::Completed, + ); + apis::results_api::create_result(config, result).expect("create result"); + + let response = apis::workflows_api::get_slurm_job_correlations(config, workflow_id) + .expect("get slurm job correlations"); + + assert_eq!(response.items.len(), 1, "expected one correlation row"); + let item = &response.items[0]; + assert_eq!(item.slurm_job_id, "987654"); + assert_eq!(item.job_id, job_id); + assert_eq!(item.job_name, "slurm_job"); +} + +#[rstest] +fn test_get_slurm_job_correlations_not_found(start_server: &ServerProcess) { + let config = &start_server.config; + let result = apis::workflows_api::get_slurm_job_correlations(config, 999_999); + assert!( + result.is_err(), + "expected error for nonexistent workflow correlations" + ); +}