diff --git a/api/openapi.codegen.yaml b/api/openapi.codegen.yaml index a24a9420..72058531 100644 --- a/api/openapi.codegen.yaml +++ b/api/openapi.codegen.yaml @@ -5313,6 +5313,114 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/running_jobs: + get: + tags: + - workflows + operationId: get_running_jobs + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + - name: offset + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + - name: limit + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/RunningJobsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/slurm_job_correlations: + get: + tags: + - workflows + operationId: get_slurm_job_correlations + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + - name: offset + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + - name: limit + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/SlurmJobCorrelationsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /workflows/{id}/status: get: tags: @@ -7039,6 +7147,14 @@ components: job_id: type: integer format: int64 + job_name: + type: + - string + - 'null' + description: |- + Name of the job this result belongs to. Populated by the server on read + paths (list/get) as a convenience so clients need not re-fetch jobs; it + is ignored on create/update input. peak_cpu_percent: type: - number @@ -7090,6 +7206,66 @@ components: workflow_id: type: integer format: int64 + RunningJobModel: + type: object + description: |- + A currently-running job together with the compute node it occupies and, + when the node was provisioned by a scheduler, that scheduler's job ID. + `scheduler_type` is generic (e.g. "slurm", "local"); `scheduler_job_id` + is populated only for scheduler-managed nodes (the Slurm job ID today). + required: + - job_id + - job_name + - compute_node_name + - scheduler_type + properties: + compute_node_name: + type: string + job_id: + type: integer + format: int64 + job_name: + type: string + scheduler_job_id: + type: + - string + - 'null' + scheduler_type: + type: string + start_time: + type: + - string + - 'null' + description: RFC3339 time the job started running, for computing elapsed time. + RunningJobsResponse: + type: object + description: A page of currently-running jobs for a workflow, computed server-side. + required: + - items + - offset + - max_limit + - count + - total_count + - has_more + properties: + count: + type: integer + format: int64 + has_more: + type: boolean + items: + type: array + items: + $ref: '#/components/schemas/RunningJobModel' + max_limit: + type: integer + format: int64 + offset: + type: integer + format: int64 + total_count: + type: integer + format: int64 ScheduledComputeNodesModel: type: object required: @@ -7117,6 +7293,54 @@ components: workflow_id: type: integer format: int64 + SlurmJobCorrelationModel: + type: object + description: |- + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given + Torc job, derived from scheduled_compute_node -> compute_node -> result. + required: + - slurm_job_id + - job_id + - job_name + properties: + job_id: + type: integer + format: int64 + job_name: + type: string + slurm_job_id: + type: string + SlurmJobCorrelationsResponse: + type: object + description: |- + A page of Slurm-job-to-Torc-job correlations for a workflow, computed + server-side. + required: + - items + - offset + - max_limit + - count + - total_count + - has_more + properties: + count: + type: integer + format: int64 + has_more: + type: boolean + items: + type: array + items: + $ref: '#/components/schemas/SlurmJobCorrelationModel' + max_limit: + type: integer + format: int64 + offset: + type: integer + format: int64 + total_count: + type: integer + format: int64 SlurmSchedulerModel: type: object required: diff --git a/api/openapi.yaml b/api/openapi.yaml index a24a9420..72058531 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -5313,6 +5313,114 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/running_jobs: + get: + tags: + - workflows + operationId: get_running_jobs + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + - name: offset + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + - name: limit + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/RunningJobsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + /workflows/{id}/slurm_job_correlations: + get: + tags: + - workflows + operationId: get_slurm_job_correlations + parameters: + - name: id + in: path + description: Workflow ID + required: true + schema: + type: integer + format: int64 + - name: offset + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + - name: limit + in: query + required: false + schema: + type: + - integer + - 'null' + format: int64 + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/SlurmJobCorrelationsResponse' + '403': + description: User does not have access + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Workflow not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /workflows/{id}/status: get: tags: @@ -7039,6 +7147,14 @@ components: job_id: type: integer format: int64 + job_name: + type: + - string + - 'null' + description: |- + Name of the job this result belongs to. Populated by the server on read + paths (list/get) as a convenience so clients need not re-fetch jobs; it + is ignored on create/update input. peak_cpu_percent: type: - number @@ -7090,6 +7206,66 @@ components: workflow_id: type: integer format: int64 + RunningJobModel: + type: object + description: |- + A currently-running job together with the compute node it occupies and, + when the node was provisioned by a scheduler, that scheduler's job ID. + `scheduler_type` is generic (e.g. "slurm", "local"); `scheduler_job_id` + is populated only for scheduler-managed nodes (the Slurm job ID today). + required: + - job_id + - job_name + - compute_node_name + - scheduler_type + properties: + compute_node_name: + type: string + job_id: + type: integer + format: int64 + job_name: + type: string + scheduler_job_id: + type: + - string + - 'null' + scheduler_type: + type: string + start_time: + type: + - string + - 'null' + description: RFC3339 time the job started running, for computing elapsed time. + RunningJobsResponse: + type: object + description: A page of currently-running jobs for a workflow, computed server-side. + required: + - items + - offset + - max_limit + - count + - total_count + - has_more + properties: + count: + type: integer + format: int64 + has_more: + type: boolean + items: + type: array + items: + $ref: '#/components/schemas/RunningJobModel' + max_limit: + type: integer + format: int64 + offset: + type: integer + format: int64 + total_count: + type: integer + format: int64 ScheduledComputeNodesModel: type: object required: @@ -7117,6 +7293,54 @@ components: workflow_id: type: integer format: int64 + SlurmJobCorrelationModel: + type: object + description: |- + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given + Torc job, derived from scheduled_compute_node -> compute_node -> result. + required: + - slurm_job_id + - job_id + - job_name + properties: + job_id: + type: integer + format: int64 + job_name: + type: string + slurm_job_id: + type: string + SlurmJobCorrelationsResponse: + type: object + description: |- + A page of Slurm-job-to-Torc-job correlations for a workflow, computed + server-side. + required: + - items + - offset + - max_limit + - count + - total_count + - has_more + properties: + count: + type: integer + format: int64 + has_more: + type: boolean + items: + type: array + items: + $ref: '#/components/schemas/SlurmJobCorrelationModel' + max_limit: + type: integer + format: int64 + offset: + type: integer + format: int64 + total_count: + type: integer + format: int64 SlurmSchedulerModel: type: object required: diff --git a/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl b/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl index 4ef33a86..33fb7741 100644 --- a/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl +++ b/julia_client/Torc/src/api/apis/api_WorkflowsApi.jl @@ -273,6 +273,74 @@ function get_ready_job_requirements(_api::WorkflowsApi, response_stream::Channel return OpenAPI.Clients.exec(_ctx, response_stream) end +const _returntypes_get_running_jobs_WorkflowsApi = Dict{Regex,Type}( + Regex("^" * replace("200", "x"=>".") * "\$") => RunningJobsResponse, + Regex("^" * replace("403", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("404", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("500", "x"=>".") * "\$") => ErrorResponse, +) + +function _oacinternal_get_running_jobs(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = OpenAPI.Clients.Ctx(_api.client, "GET", _returntypes_get_running_jobs_WorkflowsApi, "/workflows/{id}/running_jobs", []) + OpenAPI.Clients.set_param(_ctx.path, "id", id) # type Int64 + OpenAPI.Clients.set_param(_ctx.query, "offset", offset; style="form", is_explode=true) # type Int64 + OpenAPI.Clients.set_param(_ctx.query, "limit", limit; style="form", is_explode=true) # type Int64 + OpenAPI.Clients.set_header_accept(_ctx, ["application/json", ]) + OpenAPI.Clients.set_header_content_type(_ctx, (_mediaType === nothing) ? [] : [_mediaType]) + return _ctx +end + +@doc raw"""Params: +- id::Int64 (required) +- offset::Int64 +- limit::Int64 + +Return: RunningJobsResponse, OpenAPI.Clients.ApiResponse +""" +function get_running_jobs(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = _oacinternal_get_running_jobs(_api, id; offset=offset, limit=limit, _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx) +end + +function get_running_jobs(_api::WorkflowsApi, response_stream::Channel, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = _oacinternal_get_running_jobs(_api, id; offset=offset, limit=limit, _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx, response_stream) +end + +const _returntypes_get_slurm_job_correlations_WorkflowsApi = Dict{Regex,Type}( + Regex("^" * replace("200", "x"=>".") * "\$") => SlurmJobCorrelationsResponse, + Regex("^" * replace("403", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("404", "x"=>".") * "\$") => ErrorResponse, + Regex("^" * replace("500", "x"=>".") * "\$") => ErrorResponse, +) + +function _oacinternal_get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = OpenAPI.Clients.Ctx(_api.client, "GET", _returntypes_get_slurm_job_correlations_WorkflowsApi, "/workflows/{id}/slurm_job_correlations", []) + OpenAPI.Clients.set_param(_ctx.path, "id", id) # type Int64 + OpenAPI.Clients.set_param(_ctx.query, "offset", offset; style="form", is_explode=true) # type Int64 + OpenAPI.Clients.set_param(_ctx.query, "limit", limit; style="form", is_explode=true) # type Int64 + OpenAPI.Clients.set_header_accept(_ctx, ["application/json", ]) + OpenAPI.Clients.set_header_content_type(_ctx, (_mediaType === nothing) ? [] : [_mediaType]) + return _ctx +end + +@doc raw"""Params: +- id::Int64 (required) +- offset::Int64 +- limit::Int64 + +Return: SlurmJobCorrelationsResponse, OpenAPI.Clients.ApiResponse +""" +function get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = _oacinternal_get_slurm_job_correlations(_api, id; offset=offset, limit=limit, _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx) +end + +function get_slurm_job_correlations(_api::WorkflowsApi, response_stream::Channel, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) + _ctx = _oacinternal_get_slurm_job_correlations(_api, id; offset=offset, limit=limit, _mediaType=_mediaType) + return OpenAPI.Clients.exec(_ctx, response_stream) +end + const _returntypes_get_workflow_WorkflowsApi = Dict{Regex,Type}( Regex("^" * replace("200", "x"=>".") * "\$") => WorkflowModel, ) @@ -777,6 +845,8 @@ export create_workflow export delete_workflow export get_active_task_for_workflow export get_ready_job_requirements +export get_running_jobs +export get_slurm_job_correlations export get_workflow export get_workflow_status export initialize_jobs diff --git a/julia_client/Torc/src/api/modelincludes.jl b/julia_client/Torc/src/api/modelincludes.jl index d61e112d..6c7382ac 100644 --- a/julia_client/Torc/src/api/modelincludes.jl +++ b/julia_client/Torc/src/api/modelincludes.jl @@ -75,7 +75,11 @@ include("models/model_ResourceMonitorConfig.jl") include("models/model_ResourceRequirementsModel.jl") include("models/model_ResultModel.jl") include("models/model_RoCrateEntityModel.jl") +include("models/model_RunningJobModel.jl") +include("models/model_RunningJobsResponse.jl") include("models/model_ScheduledComputeNodesModel.jl") +include("models/model_SlurmJobCorrelationModel.jl") +include("models/model_SlurmJobCorrelationsResponse.jl") include("models/model_SlurmSchedulerModel.jl") include("models/model_SlurmStatsModel.jl") include("models/model_SpawnJobModel.jl") diff --git a/julia_client/Torc/src/api/models/model_ResultModel.jl b/julia_client/Torc/src/api/models/model_ResultModel.jl index bda96330..15287736 100644 --- a/julia_client/Torc/src/api/models/model_ResultModel.jl +++ b/julia_client/Torc/src/api/models/model_ResultModel.jl @@ -13,6 +13,7 @@ exec_time_minutes=nothing, id=nothing, job_id=nothing, + job_name=nothing, peak_cpu_percent=nothing, peak_memory_bytes=nothing, return_code=nothing, @@ -29,6 +30,7 @@ - exec_time_minutes::Float64 - id::Int64 - job_id::Int64 + - job_name::String : Name of the job this result belongs to. Populated by the server on read paths (list/get) as a convenience so clients need not re-fetch jobs; it is ignored on create/update input. - peak_cpu_percent::Float64 - peak_memory_bytes::Int64 - return_code::Int64 @@ -45,6 +47,7 @@ Base.@kwdef mutable struct ResultModel <: OpenAPI.APIModel exec_time_minutes::Union{Nothing, Float64} = nothing id::Union{Nothing, Int64} = nothing job_id::Union{Nothing, Int64} = nothing + job_name::Union{Nothing, String} = nothing peak_cpu_percent::Union{Nothing, Float64} = nothing peak_memory_bytes::Union{Nothing, Int64} = nothing return_code::Union{Nothing, Int64} = nothing @@ -52,14 +55,14 @@ Base.@kwdef mutable struct ResultModel <: OpenAPI.APIModel status = nothing # spec type: Union{ Nothing, JobStatus } workflow_id::Union{Nothing, Int64} = nothing - function ResultModel(attempt_id, avg_cpu_percent, avg_memory_bytes, completion_time, compute_node_id, exec_time_minutes, id, job_id, peak_cpu_percent, peak_memory_bytes, return_code, run_id, status, workflow_id, ) - o = new(attempt_id, avg_cpu_percent, avg_memory_bytes, completion_time, compute_node_id, exec_time_minutes, id, job_id, peak_cpu_percent, peak_memory_bytes, return_code, run_id, status, workflow_id, ) + function ResultModel(attempt_id, avg_cpu_percent, avg_memory_bytes, completion_time, compute_node_id, exec_time_minutes, id, job_id, job_name, peak_cpu_percent, peak_memory_bytes, return_code, run_id, status, workflow_id, ) + o = new(attempt_id, avg_cpu_percent, avg_memory_bytes, completion_time, compute_node_id, exec_time_minutes, id, job_id, job_name, peak_cpu_percent, peak_memory_bytes, return_code, run_id, status, workflow_id, ) OpenAPI.validate_properties(o) return o end end # type ResultModel -const _property_types_ResultModel = Dict{Symbol,String}(Symbol("attempt_id")=>"Int64", Symbol("avg_cpu_percent")=>"Float64", Symbol("avg_memory_bytes")=>"Int64", Symbol("completion_time")=>"String", Symbol("compute_node_id")=>"Int64", Symbol("exec_time_minutes")=>"Float64", Symbol("id")=>"Int64", Symbol("job_id")=>"Int64", Symbol("peak_cpu_percent")=>"Float64", Symbol("peak_memory_bytes")=>"Int64", Symbol("return_code")=>"Int64", Symbol("run_id")=>"Int64", Symbol("status")=>"JobStatus", Symbol("workflow_id")=>"Int64", ) +const _property_types_ResultModel = Dict{Symbol,String}(Symbol("attempt_id")=>"Int64", Symbol("avg_cpu_percent")=>"Float64", Symbol("avg_memory_bytes")=>"Int64", Symbol("completion_time")=>"String", Symbol("compute_node_id")=>"Int64", Symbol("exec_time_minutes")=>"Float64", Symbol("id")=>"Int64", Symbol("job_id")=>"Int64", Symbol("job_name")=>"String", Symbol("peak_cpu_percent")=>"Float64", Symbol("peak_memory_bytes")=>"Int64", Symbol("return_code")=>"Int64", Symbol("run_id")=>"Int64", Symbol("status")=>"JobStatus", Symbol("workflow_id")=>"Int64", ) OpenAPI.property_type(::Type{ ResultModel }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_ResultModel[name]))} function OpenAPI.check_required(o::ResultModel) @@ -83,6 +86,7 @@ function OpenAPI.validate_properties(o::ResultModel) OpenAPI.validate_property(ResultModel, Symbol("exec_time_minutes"), o.exec_time_minutes) OpenAPI.validate_property(ResultModel, Symbol("id"), o.id) OpenAPI.validate_property(ResultModel, Symbol("job_id"), o.job_id) + OpenAPI.validate_property(ResultModel, Symbol("job_name"), o.job_name) OpenAPI.validate_property(ResultModel, Symbol("peak_cpu_percent"), o.peak_cpu_percent) OpenAPI.validate_property(ResultModel, Symbol("peak_memory_bytes"), o.peak_memory_bytes) OpenAPI.validate_property(ResultModel, Symbol("return_code"), o.return_code) @@ -122,6 +126,7 @@ function OpenAPI.validate_property(::Type{ ResultModel }, name::Symbol, val) OpenAPI.validate_param(name, "ResultModel", :format, val, "int64") end + if name === Symbol("peak_cpu_percent") OpenAPI.validate_param(name, "ResultModel", :format, val, "double") end diff --git a/julia_client/Torc/src/api/models/model_RunningJobModel.jl b/julia_client/Torc/src/api/models/model_RunningJobModel.jl new file mode 100644 index 00000000..1983bf84 --- /dev/null +++ b/julia_client/Torc/src/api/models/model_RunningJobModel.jl @@ -0,0 +1,69 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""RunningJobModel +A currently-running job together with the compute node it occupies and, when the node was provisioned by a scheduler, that scheduler's job ID. `scheduler_type` is generic (e.g. \"slurm\", \"local\"); `scheduler_job_id` is populated only for scheduler-managed nodes (the Slurm job ID today). + + RunningJobModel(; + compute_node_name=nothing, + job_id=nothing, + job_name=nothing, + scheduler_job_id=nothing, + scheduler_type=nothing, + start_time=nothing, + ) + + - compute_node_name::String + - job_id::Int64 + - job_name::String + - scheduler_job_id::String + - scheduler_type::String + - start_time::String : RFC3339 time the job started running, for computing elapsed time. +""" +Base.@kwdef mutable struct RunningJobModel <: OpenAPI.APIModel + compute_node_name::Union{Nothing, String} = nothing + job_id::Union{Nothing, Int64} = nothing + job_name::Union{Nothing, String} = nothing + scheduler_job_id::Union{Nothing, String} = nothing + scheduler_type::Union{Nothing, String} = nothing + start_time::Union{Nothing, String} = nothing + + function RunningJobModel(compute_node_name, job_id, job_name, scheduler_job_id, scheduler_type, start_time, ) + o = new(compute_node_name, job_id, job_name, scheduler_job_id, scheduler_type, start_time, ) + OpenAPI.validate_properties(o) + return o + end +end # type RunningJobModel + +const _property_types_RunningJobModel = Dict{Symbol,String}(Symbol("compute_node_name")=>"String", Symbol("job_id")=>"Int64", Symbol("job_name")=>"String", Symbol("scheduler_job_id")=>"String", Symbol("scheduler_type")=>"String", Symbol("start_time")=>"String", ) +OpenAPI.property_type(::Type{ RunningJobModel }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_RunningJobModel[name]))} + +function OpenAPI.check_required(o::RunningJobModel) + o.compute_node_name === nothing && (return false) + o.job_id === nothing && (return false) + o.job_name === nothing && (return false) + o.scheduler_type === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::RunningJobModel) + OpenAPI.validate_property(RunningJobModel, Symbol("compute_node_name"), o.compute_node_name) + OpenAPI.validate_property(RunningJobModel, Symbol("job_id"), o.job_id) + OpenAPI.validate_property(RunningJobModel, Symbol("job_name"), o.job_name) + OpenAPI.validate_property(RunningJobModel, Symbol("scheduler_job_id"), o.scheduler_job_id) + OpenAPI.validate_property(RunningJobModel, Symbol("scheduler_type"), o.scheduler_type) + OpenAPI.validate_property(RunningJobModel, Symbol("start_time"), o.start_time) +end + +function OpenAPI.validate_property(::Type{ RunningJobModel }, name::Symbol, val) + + + if name === Symbol("job_id") + OpenAPI.validate_param(name, "RunningJobModel", :format, val, "int64") + end + + + + +end diff --git a/julia_client/Torc/src/api/models/model_RunningJobsResponse.jl b/julia_client/Torc/src/api/models/model_RunningJobsResponse.jl new file mode 100644 index 00000000..0ff65eff --- /dev/null +++ b/julia_client/Torc/src/api/models/model_RunningJobsResponse.jl @@ -0,0 +1,80 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""RunningJobsResponse +A page of currently-running jobs for a workflow, computed server-side. + + RunningJobsResponse(; + count=nothing, + has_more=nothing, + items=nothing, + max_limit=nothing, + offset=nothing, + total_count=nothing, + ) + + - count::Int64 + - has_more::Bool + - items::Vector{RunningJobModel} + - max_limit::Int64 + - offset::Int64 + - total_count::Int64 +""" +Base.@kwdef mutable struct RunningJobsResponse <: OpenAPI.APIModel + count::Union{Nothing, Int64} = nothing + has_more::Union{Nothing, Bool} = nothing + items::Union{Nothing, Vector} = nothing # spec type: Union{ Nothing, Vector{RunningJobModel} } + max_limit::Union{Nothing, Int64} = nothing + offset::Union{Nothing, Int64} = nothing + total_count::Union{Nothing, Int64} = nothing + + function RunningJobsResponse(count, has_more, items, max_limit, offset, total_count, ) + o = new(count, has_more, items, max_limit, offset, total_count, ) + OpenAPI.validate_properties(o) + return o + end +end # type RunningJobsResponse + +const _property_types_RunningJobsResponse = Dict{Symbol,String}(Symbol("count")=>"Int64", Symbol("has_more")=>"Bool", Symbol("items")=>"Vector{RunningJobModel}", Symbol("max_limit")=>"Int64", Symbol("offset")=>"Int64", Symbol("total_count")=>"Int64", ) +OpenAPI.property_type(::Type{ RunningJobsResponse }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_RunningJobsResponse[name]))} + +function OpenAPI.check_required(o::RunningJobsResponse) + o.count === nothing && (return false) + o.has_more === nothing && (return false) + o.items === nothing && (return false) + o.max_limit === nothing && (return false) + o.offset === nothing && (return false) + o.total_count === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::RunningJobsResponse) + OpenAPI.validate_property(RunningJobsResponse, Symbol("count"), o.count) + OpenAPI.validate_property(RunningJobsResponse, Symbol("has_more"), o.has_more) + OpenAPI.validate_property(RunningJobsResponse, Symbol("items"), o.items) + OpenAPI.validate_property(RunningJobsResponse, Symbol("max_limit"), o.max_limit) + OpenAPI.validate_property(RunningJobsResponse, Symbol("offset"), o.offset) + OpenAPI.validate_property(RunningJobsResponse, Symbol("total_count"), o.total_count) +end + +function OpenAPI.validate_property(::Type{ RunningJobsResponse }, name::Symbol, val) + + if name === Symbol("count") + OpenAPI.validate_param(name, "RunningJobsResponse", :format, val, "int64") + end + + + + if name === Symbol("max_limit") + OpenAPI.validate_param(name, "RunningJobsResponse", :format, val, "int64") + end + + if name === Symbol("offset") + OpenAPI.validate_param(name, "RunningJobsResponse", :format, val, "int64") + end + + if name === Symbol("total_count") + OpenAPI.validate_param(name, "RunningJobsResponse", :format, val, "int64") + end +end diff --git a/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl new file mode 100644 index 00000000..f8423f89 --- /dev/null +++ b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationModel.jl @@ -0,0 +1,53 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""SlurmJobCorrelationModel +One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given Torc job, derived from scheduled_compute_node -> compute_node -> result. + + SlurmJobCorrelationModel(; + job_id=nothing, + job_name=nothing, + slurm_job_id=nothing, + ) + + - job_id::Int64 + - job_name::String + - slurm_job_id::String +""" +Base.@kwdef mutable struct SlurmJobCorrelationModel <: OpenAPI.APIModel + job_id::Union{Nothing, Int64} = nothing + job_name::Union{Nothing, String} = nothing + slurm_job_id::Union{Nothing, String} = nothing + + function SlurmJobCorrelationModel(job_id, job_name, slurm_job_id, ) + o = new(job_id, job_name, slurm_job_id, ) + OpenAPI.validate_properties(o) + return o + end +end # type SlurmJobCorrelationModel + +const _property_types_SlurmJobCorrelationModel = Dict{Symbol,String}(Symbol("job_id")=>"Int64", Symbol("job_name")=>"String", Symbol("slurm_job_id")=>"String", ) +OpenAPI.property_type(::Type{ SlurmJobCorrelationModel }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_SlurmJobCorrelationModel[name]))} + +function OpenAPI.check_required(o::SlurmJobCorrelationModel) + o.job_id === nothing && (return false) + o.job_name === nothing && (return false) + o.slurm_job_id === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::SlurmJobCorrelationModel) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("job_id"), o.job_id) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("job_name"), o.job_name) + OpenAPI.validate_property(SlurmJobCorrelationModel, Symbol("slurm_job_id"), o.slurm_job_id) +end + +function OpenAPI.validate_property(::Type{ SlurmJobCorrelationModel }, name::Symbol, val) + + if name === Symbol("job_id") + OpenAPI.validate_param(name, "SlurmJobCorrelationModel", :format, val, "int64") + end + + +end diff --git a/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl new file mode 100644 index 00000000..0f4fe236 --- /dev/null +++ b/julia_client/Torc/src/api/models/model_SlurmJobCorrelationsResponse.jl @@ -0,0 +1,80 @@ +# This file was generated by the Julia OpenAPI Code Generator +# Do not modify this file directly. Modify the OpenAPI specification instead. + + +@doc raw"""SlurmJobCorrelationsResponse +A page of Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + + SlurmJobCorrelationsResponse(; + count=nothing, + has_more=nothing, + items=nothing, + max_limit=nothing, + offset=nothing, + total_count=nothing, + ) + + - count::Int64 + - has_more::Bool + - items::Vector{SlurmJobCorrelationModel} + - max_limit::Int64 + - offset::Int64 + - total_count::Int64 +""" +Base.@kwdef mutable struct SlurmJobCorrelationsResponse <: OpenAPI.APIModel + count::Union{Nothing, Int64} = nothing + has_more::Union{Nothing, Bool} = nothing + items::Union{Nothing, Vector} = nothing # spec type: Union{ Nothing, Vector{SlurmJobCorrelationModel} } + max_limit::Union{Nothing, Int64} = nothing + offset::Union{Nothing, Int64} = nothing + total_count::Union{Nothing, Int64} = nothing + + function SlurmJobCorrelationsResponse(count, has_more, items, max_limit, offset, total_count, ) + o = new(count, has_more, items, max_limit, offset, total_count, ) + OpenAPI.validate_properties(o) + return o + end +end # type SlurmJobCorrelationsResponse + +const _property_types_SlurmJobCorrelationsResponse = Dict{Symbol,String}(Symbol("count")=>"Int64", Symbol("has_more")=>"Bool", Symbol("items")=>"Vector{SlurmJobCorrelationModel}", Symbol("max_limit")=>"Int64", Symbol("offset")=>"Int64", Symbol("total_count")=>"Int64", ) +OpenAPI.property_type(::Type{ SlurmJobCorrelationsResponse }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_SlurmJobCorrelationsResponse[name]))} + +function OpenAPI.check_required(o::SlurmJobCorrelationsResponse) + o.count === nothing && (return false) + o.has_more === nothing && (return false) + o.items === nothing && (return false) + o.max_limit === nothing && (return false) + o.offset === nothing && (return false) + o.total_count === nothing && (return false) + true +end + +function OpenAPI.validate_properties(o::SlurmJobCorrelationsResponse) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("count"), o.count) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("has_more"), o.has_more) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("items"), o.items) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("max_limit"), o.max_limit) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("offset"), o.offset) + OpenAPI.validate_property(SlurmJobCorrelationsResponse, Symbol("total_count"), o.total_count) +end + +function OpenAPI.validate_property(::Type{ SlurmJobCorrelationsResponse }, name::Symbol, val) + + if name === Symbol("count") + OpenAPI.validate_param(name, "SlurmJobCorrelationsResponse", :format, val, "int64") + end + + + + if name === Symbol("max_limit") + OpenAPI.validate_param(name, "SlurmJobCorrelationsResponse", :format, val, "int64") + end + + if name === Symbol("offset") + OpenAPI.validate_param(name, "SlurmJobCorrelationsResponse", :format, val, "int64") + end + + if name === Symbol("total_count") + OpenAPI.validate_param(name, "SlurmJobCorrelationsResponse", :format, val, "int64") + end +end diff --git a/julia_client/julia_client/README.md b/julia_client/julia_client/README.md index a4d39d32..83157819 100644 --- a/julia_client/julia_client/README.md +++ b/julia_client/julia_client/README.md @@ -133,6 +133,8 @@ Class | Method *WorkflowsApi* | [**delete_workflow**](docs/WorkflowsApi.md#delete_workflow)
**DELETE** /workflows/{id}
*WorkflowsApi* | [**get_active_task_for_workflow**](docs/WorkflowsApi.md#get_active_task_for_workflow)
**GET** /workflows/{id}/active_task
*WorkflowsApi* | [**get_ready_job_requirements**](docs/WorkflowsApi.md#get_ready_job_requirements)
**GET** /workflows/{id}/ready_job_requirements
+*WorkflowsApi* | [**get_running_jobs**](docs/WorkflowsApi.md#get_running_jobs)
**GET** /workflows/{id}/running_jobs
+*WorkflowsApi* | [**get_slurm_job_correlations**](docs/WorkflowsApi.md#get_slurm_job_correlations)
**GET** /workflows/{id}/slurm_job_correlations
*WorkflowsApi* | [**get_workflow**](docs/WorkflowsApi.md#get_workflow)
**GET** /workflows/{id}
*WorkflowsApi* | [**get_workflow_status**](docs/WorkflowsApi.md#get_workflow_status)
**GET** /workflows/{id}/status
*WorkflowsApi* | [**initialize_jobs**](docs/WorkflowsApi.md#initialize_jobs)
**POST** /workflows/{id}/initialize_jobs
@@ -227,7 +229,11 @@ Class | Method - [ResourceRequirementsModel](docs/ResourceRequirementsModel.md) - [ResultModel](docs/ResultModel.md) - [RoCrateEntityModel](docs/RoCrateEntityModel.md) + - [RunningJobModel](docs/RunningJobModel.md) + - [RunningJobsResponse](docs/RunningJobsResponse.md) - [ScheduledComputeNodesModel](docs/ScheduledComputeNodesModel.md) + - [SlurmJobCorrelationModel](docs/SlurmJobCorrelationModel.md) + - [SlurmJobCorrelationsResponse](docs/SlurmJobCorrelationsResponse.md) - [SlurmSchedulerModel](docs/SlurmSchedulerModel.md) - [SlurmStatsModel](docs/SlurmStatsModel.md) - [SpawnJobModel](docs/SpawnJobModel.md) diff --git a/julia_client/julia_client/docs/ResultModel.md b/julia_client/julia_client/docs/ResultModel.md index 2911dd35..bd19033b 100644 --- a/julia_client/julia_client/docs/ResultModel.md +++ b/julia_client/julia_client/docs/ResultModel.md @@ -12,6 +12,7 @@ Name | Type | Description | Notes **exec_time_minutes** | **Float64** | | [default to nothing] **id** | **Int64** | | [optional] [default to nothing] **job_id** | **Int64** | | [default to nothing] +**job_name** | **String** | Name of the job this result belongs to. Populated by the server on read paths (list/get) as a convenience so clients need not re-fetch jobs; it is ignored on create/update input. | [optional] [default to nothing] **peak_cpu_percent** | **Float64** | | [optional] [default to nothing] **peak_memory_bytes** | **Int64** | | [optional] [default to nothing] **return_code** | **Int64** | | [default to nothing] diff --git a/julia_client/julia_client/docs/RunningJobModel.md b/julia_client/julia_client/docs/RunningJobModel.md new file mode 100644 index 00000000..c5a8aacf --- /dev/null +++ b/julia_client/julia_client/docs/RunningJobModel.md @@ -0,0 +1,17 @@ +# RunningJobModel + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**compute_node_name** | **String** | | [default to nothing] +**job_id** | **Int64** | | [default to nothing] +**job_name** | **String** | | [default to nothing] +**scheduler_job_id** | **String** | | [optional] [default to nothing] +**scheduler_type** | **String** | | [default to nothing] +**start_time** | **String** | RFC3339 time the job started running, for computing elapsed time. | [optional] [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/RunningJobsResponse.md b/julia_client/julia_client/docs/RunningJobsResponse.md new file mode 100644 index 00000000..887d405b --- /dev/null +++ b/julia_client/julia_client/docs/RunningJobsResponse.md @@ -0,0 +1,17 @@ +# RunningJobsResponse + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**count** | **Int64** | | [default to nothing] +**has_more** | **Bool** | | [default to nothing] +**items** | [**Vector{RunningJobModel}**](RunningJobModel.md) | | [default to nothing] +**max_limit** | **Int64** | | [default to nothing] +**offset** | **Int64** | | [default to nothing] +**total_count** | **Int64** | | [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/SlurmJobCorrelationModel.md b/julia_client/julia_client/docs/SlurmJobCorrelationModel.md new file mode 100644 index 00000000..e17c92d9 --- /dev/null +++ b/julia_client/julia_client/docs/SlurmJobCorrelationModel.md @@ -0,0 +1,14 @@ +# SlurmJobCorrelationModel + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**job_id** | **Int64** | | [default to nothing] +**job_name** | **String** | | [default to nothing] +**slurm_job_id** | **String** | | [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md b/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md new file mode 100644 index 00000000..fe2cbfb4 --- /dev/null +++ b/julia_client/julia_client/docs/SlurmJobCorrelationsResponse.md @@ -0,0 +1,17 @@ +# SlurmJobCorrelationsResponse + + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**count** | **Int64** | | [default to nothing] +**has_more** | **Bool** | | [default to nothing] +**items** | [**Vector{SlurmJobCorrelationModel}**](SlurmJobCorrelationModel.md) | | [default to nothing] +**max_limit** | **Int64** | | [default to nothing] +**offset** | **Int64** | | [default to nothing] +**total_count** | **Int64** | | [default to nothing] + + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + + diff --git a/julia_client/julia_client/docs/WorkflowsApi.md b/julia_client/julia_client/docs/WorkflowsApi.md index 41d2417e..a7793b84 100644 --- a/julia_client/julia_client/docs/WorkflowsApi.md +++ b/julia_client/julia_client/docs/WorkflowsApi.md @@ -13,6 +13,8 @@ Method | HTTP request | Description [**delete_workflow**](WorkflowsApi.md#delete_workflow) | **DELETE** /workflows/{id} | [**get_active_task_for_workflow**](WorkflowsApi.md#get_active_task_for_workflow) | **GET** /workflows/{id}/active_task | [**get_ready_job_requirements**](WorkflowsApi.md#get_ready_job_requirements) | **GET** /workflows/{id}/ready_job_requirements | +[**get_running_jobs**](WorkflowsApi.md#get_running_jobs) | **GET** /workflows/{id}/running_jobs | +[**get_slurm_job_correlations**](WorkflowsApi.md#get_slurm_job_correlations) | **GET** /workflows/{id}/slurm_job_correlations | [**get_workflow**](WorkflowsApi.md#get_workflow) | **GET** /workflows/{id} | [**get_workflow_status**](WorkflowsApi.md#get_workflow_status) | **GET** /workflows/{id}/status | [**initialize_jobs**](WorkflowsApi.md#initialize_jobs) | **POST** /workflows/{id}/initialize_jobs | @@ -305,6 +307,76 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) +# **get_running_jobs** +> get_running_jobs(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) -> RunningJobsResponse, OpenAPI.Clients.ApiResponse
+> get_running_jobs(_api::WorkflowsApi, response_stream::Channel, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) -> Channel{ RunningJobsResponse }, OpenAPI.Clients.ApiResponse + + + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **_api** | **WorkflowsApi** | API context | +**id** | **Int64** | Workflow ID | + +### Optional Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **offset** | **Int64** | | [default to nothing] + **limit** | **Int64** | | [default to nothing] + +### Return type + +[**RunningJobsResponse**](RunningJobsResponse.md) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: application/json + +[[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) + +# **get_slurm_job_correlations** +> get_slurm_job_correlations(_api::WorkflowsApi, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) -> SlurmJobCorrelationsResponse, OpenAPI.Clients.ApiResponse
+> get_slurm_job_correlations(_api::WorkflowsApi, response_stream::Channel, id::Int64; offset=nothing, limit=nothing, _mediaType=nothing) -> Channel{ SlurmJobCorrelationsResponse }, OpenAPI.Clients.ApiResponse + + + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **_api** | **WorkflowsApi** | API context | +**id** | **Int64** | Workflow ID | + +### Optional Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **offset** | **Int64** | | [default to nothing] + **limit** | **Int64** | | [default to nothing] + +### Return type + +[**SlurmJobCorrelationsResponse**](SlurmJobCorrelationsResponse.md) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: application/json + +[[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) + # **get_workflow** > get_workflow(_api::WorkflowsApi, id::Int64; _mediaType=nothing) -> WorkflowModel, OpenAPI.Clients.ApiResponse
> get_workflow(_api::WorkflowsApi, response_stream::Channel, id::Int64; _mediaType=nothing) -> Channel{ WorkflowModel }, OpenAPI.Clients.ApiResponse diff --git a/python_client/src/torc/openapi_client/__init__.py b/python_client/src/torc/openapi_client/__init__.py index c9386eba..407643f5 100644 --- a/python_client/src/torc/openapi_client/__init__.py +++ b/python_client/src/torc/openapi_client/__init__.py @@ -120,7 +120,11 @@ "ResourceRequirementsModel", "ResultModel", "RoCrateEntityModel", + "RunningJobModel", + "RunningJobsResponse", "ScheduledComputeNodesModel", + "SlurmJobCorrelationModel", + "SlurmJobCorrelationsResponse", "SlurmSchedulerModel", "SlurmStatsModel", "SpawnJobModel", @@ -247,7 +251,11 @@ from torc.openapi_client.models.resource_requirements_model import ResourceRequirementsModel as ResourceRequirementsModel from torc.openapi_client.models.result_model import ResultModel as ResultModel from torc.openapi_client.models.ro_crate_entity_model import RoCrateEntityModel as RoCrateEntityModel +from torc.openapi_client.models.running_job_model import RunningJobModel as RunningJobModel +from torc.openapi_client.models.running_jobs_response import RunningJobsResponse as RunningJobsResponse from torc.openapi_client.models.scheduled_compute_nodes_model import ScheduledComputeNodesModel as ScheduledComputeNodesModel +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel as SlurmJobCorrelationModel +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse as SlurmJobCorrelationsResponse from torc.openapi_client.models.slurm_scheduler_model import SlurmSchedulerModel as SlurmSchedulerModel from torc.openapi_client.models.slurm_stats_model import SlurmStatsModel as SlurmStatsModel from torc.openapi_client.models.spawn_job_model import SpawnJobModel as SpawnJobModel diff --git a/python_client/src/torc/openapi_client/api/workflows_api.py b/python_client/src/torc/openapi_client/api/workflows_api.py index 46f12751..7306a764 100644 --- a/python_client/src/torc/openapi_client/api/workflows_api.py +++ b/python_client/src/torc/openapi_client/api/workflows_api.py @@ -38,6 +38,8 @@ from torc.openapi_client.models.list_workflows_response import ListWorkflowsResponse from torc.openapi_client.models.process_changed_job_inputs_response import ProcessChangedJobInputsResponse from torc.openapi_client.models.reset_job_status_response import ResetJobStatusResponse +from torc.openapi_client.models.running_jobs_response import RunningJobsResponse +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse from torc.openapi_client.models.workflow_model import WorkflowModel from torc.openapi_client.models.workflow_status_response import WorkflowStatusResponse @@ -2562,6 +2564,606 @@ def _get_ready_job_requirements_serialize( + @validate_call + def get_running_jobs( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RunningJobsResponse: + """get_running_jobs + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_running_jobs_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "RunningJobsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def get_running_jobs_with_http_info( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[RunningJobsResponse]: + """get_running_jobs + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_running_jobs_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "RunningJobsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def get_running_jobs_without_preload_content( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """get_running_jobs + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_running_jobs_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "RunningJobsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _get_running_jobs_serialize( + self, + id, + offset, + limit, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[ + str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] + ] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if id is not None: + _path_params['id'] = id + # process the query parameters + if offset is not None: + + _query_params.append(('offset', offset)) + + if limit is not None: + + _query_params.append(('limit', limit)) + + # process the header parameters + # process the form parameters + # process the body parameter + + + # set the HTTP header `Accept` + if 'Accept' not in _header_params: + _header_params['Accept'] = self.api_client.select_header_accept( + [ + 'application/json' + ] + ) + + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='GET', + resource_path='/workflows/{id}/running_jobs', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + + @validate_call + def get_slurm_job_correlations( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> SlurmJobCorrelationsResponse: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def get_slurm_job_correlations_with_http_info( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[SlurmJobCorrelationsResponse]: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def get_slurm_job_correlations_without_preload_content( + self, + id: Annotated[StrictInt, Field(description="Workflow ID")], + offset: Optional[StrictInt] = None, + limit: Optional[StrictInt] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """get_slurm_job_correlations + + + :param id: Workflow ID (required) + :type id: int + :param offset: + :type offset: int + :param limit: + :type limit: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._get_slurm_job_correlations_serialize( + id=id, + offset=offset, + limit=limit, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "SlurmJobCorrelationsResponse", + '403': "ErrorResponse", + '404': "ErrorResponse", + '500': "ErrorResponse", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _get_slurm_job_correlations_serialize( + self, + id, + offset, + limit, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[ + str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] + ] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if id is not None: + _path_params['id'] = id + # process the query parameters + if offset is not None: + + _query_params.append(('offset', offset)) + + if limit is not None: + + _query_params.append(('limit', limit)) + + # process the header parameters + # process the form parameters + # process the body parameter + + + # set the HTTP header `Accept` + if 'Accept' not in _header_params: + _header_params['Accept'] = self.api_client.select_header_accept( + [ + 'application/json' + ] + ) + + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='GET', + resource_path='/workflows/{id}/slurm_job_correlations', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + @validate_call def get_workflow( self, diff --git a/python_client/src/torc/openapi_client/models/__init__.py b/python_client/src/torc/openapi_client/models/__init__.py index 18e6d41d..38eb181b 100644 --- a/python_client/src/torc/openapi_client/models/__init__.py +++ b/python_client/src/torc/openapi_client/models/__init__.py @@ -87,7 +87,11 @@ from torc.openapi_client.models.resource_requirements_model import ResourceRequirementsModel from torc.openapi_client.models.result_model import ResultModel from torc.openapi_client.models.ro_crate_entity_model import RoCrateEntityModel +from torc.openapi_client.models.running_job_model import RunningJobModel +from torc.openapi_client.models.running_jobs_response import RunningJobsResponse from torc.openapi_client.models.scheduled_compute_nodes_model import ScheduledComputeNodesModel +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel +from torc.openapi_client.models.slurm_job_correlations_response import SlurmJobCorrelationsResponse from torc.openapi_client.models.slurm_scheduler_model import SlurmSchedulerModel from torc.openapi_client.models.slurm_stats_model import SlurmStatsModel from torc.openapi_client.models.spawn_job_model import SpawnJobModel diff --git a/python_client/src/torc/openapi_client/models/result_model.py b/python_client/src/torc/openapi_client/models/result_model.py index d195d060..55c46e56 100644 --- a/python_client/src/torc/openapi_client/models/result_model.py +++ b/python_client/src/torc/openapi_client/models/result_model.py @@ -17,7 +17,7 @@ import re # noqa: F401 import json -from pydantic import BaseModel, ConfigDict, StrictFloat, StrictInt, StrictStr +from pydantic import BaseModel, ConfigDict, Field, StrictFloat, StrictInt, StrictStr from typing import Any, ClassVar, Dict, List, Optional, Union from torc.openapi_client.models.job_status import JobStatus from typing import Optional, Set @@ -35,13 +35,14 @@ class ResultModel(BaseModel): exec_time_minutes: Union[StrictFloat, StrictInt] id: Optional[StrictInt] = None job_id: StrictInt + job_name: Optional[StrictStr] = Field(default=None, description="Name of the job this result belongs to. Populated by the server on read paths (list/get) as a convenience so clients need not re-fetch jobs; it is ignored on create/update input.") peak_cpu_percent: Optional[Union[StrictFloat, StrictInt]] = None peak_memory_bytes: Optional[StrictInt] = None return_code: StrictInt run_id: StrictInt status: JobStatus workflow_id: StrictInt - __properties: ClassVar[List[str]] = ["attempt_id", "avg_cpu_percent", "avg_memory_bytes", "completion_time", "compute_node_id", "exec_time_minutes", "id", "job_id", "peak_cpu_percent", "peak_memory_bytes", "return_code", "run_id", "status", "workflow_id"] + __properties: ClassVar[List[str]] = ["attempt_id", "avg_cpu_percent", "avg_memory_bytes", "completion_time", "compute_node_id", "exec_time_minutes", "id", "job_id", "job_name", "peak_cpu_percent", "peak_memory_bytes", "return_code", "run_id", "status", "workflow_id"] model_config = ConfigDict( populate_by_name=True, @@ -102,6 +103,11 @@ def to_dict(self) -> Dict[str, Any]: if self.id is None and "id" in self.model_fields_set: _dict['id'] = None + # set to None if job_name (nullable) is None + # and model_fields_set contains the field + if self.job_name is None and "job_name" in self.model_fields_set: + _dict['job_name'] = None + # set to None if peak_cpu_percent (nullable) is None # and model_fields_set contains the field if self.peak_cpu_percent is None and "peak_cpu_percent" in self.model_fields_set: @@ -132,6 +138,7 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: "exec_time_minutes": obj.get("exec_time_minutes"), "id": obj.get("id"), "job_id": obj.get("job_id"), + "job_name": obj.get("job_name"), "peak_cpu_percent": obj.get("peak_cpu_percent"), "peak_memory_bytes": obj.get("peak_memory_bytes"), "return_code": obj.get("return_code"), diff --git a/python_client/src/torc/openapi_client/models/running_job_model.py b/python_client/src/torc/openapi_client/models/running_job_model.py new file mode 100644 index 00000000..abcae316 --- /dev/null +++ b/python_client/src/torc/openapi_client/models/running_job_model.py @@ -0,0 +1,107 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr +from typing import Any, ClassVar, Dict, List, Optional +from typing import Optional, Set +from typing_extensions import Self + +class RunningJobModel(BaseModel): + """ + A currently-running job together with the compute node it occupies and, when the node was provisioned by a scheduler, that scheduler's job ID. `scheduler_type` is generic (e.g. \"slurm\", \"local\"); `scheduler_job_id` is populated only for scheduler-managed nodes (the Slurm job ID today). + """ # noqa: E501 + compute_node_name: StrictStr + job_id: StrictInt + job_name: StrictStr + scheduler_job_id: Optional[StrictStr] = None + scheduler_type: StrictStr + start_time: Optional[StrictStr] = Field(default=None, description="RFC3339 time the job started running, for computing elapsed time.") + __properties: ClassVar[List[str]] = ["compute_node_name", "job_id", "job_name", "scheduler_job_id", "scheduler_type", "start_time"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of RunningJobModel from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # set to None if scheduler_job_id (nullable) is None + # and model_fields_set contains the field + if self.scheduler_job_id is None and "scheduler_job_id" in self.model_fields_set: + _dict['scheduler_job_id'] = None + + # set to None if start_time (nullable) is None + # and model_fields_set contains the field + if self.start_time is None and "start_time" in self.model_fields_set: + _dict['start_time'] = None + + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of RunningJobModel from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "compute_node_name": obj.get("compute_node_name"), + "job_id": obj.get("job_id"), + "job_name": obj.get("job_name"), + "scheduler_job_id": obj.get("scheduler_job_id"), + "scheduler_type": obj.get("scheduler_type"), + "start_time": obj.get("start_time") + }) + return _obj + + diff --git a/python_client/src/torc/openapi_client/models/running_jobs_response.py b/python_client/src/torc/openapi_client/models/running_jobs_response.py new file mode 100644 index 00000000..7fb5f7a5 --- /dev/null +++ b/python_client/src/torc/openapi_client/models/running_jobs_response.py @@ -0,0 +1,105 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, StrictBool, StrictInt +from typing import Any, ClassVar, Dict, List +from torc.openapi_client.models.running_job_model import RunningJobModel +from typing import Optional, Set +from typing_extensions import Self + +class RunningJobsResponse(BaseModel): + """ + A page of currently-running jobs for a workflow, computed server-side. + """ # noqa: E501 + count: StrictInt + has_more: StrictBool + items: List[RunningJobModel] + max_limit: StrictInt + offset: StrictInt + total_count: StrictInt + __properties: ClassVar[List[str]] = ["count", "has_more", "items", "max_limit", "offset", "total_count"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of RunningJobsResponse from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of each item in items (list) + _items = [] + if self.items: + for _item_items in self.items: + if _item_items: + _items.append(_item_items.to_dict()) + _dict['items'] = _items + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of RunningJobsResponse from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "count": obj.get("count"), + "has_more": obj.get("has_more"), + "items": [RunningJobModel.from_dict(_item) for _item in obj["items"]] if obj.get("items") is not None else None, + "max_limit": obj.get("max_limit"), + "offset": obj.get("offset"), + "total_count": obj.get("total_count") + }) + return _obj + + diff --git a/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py b/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py new file mode 100644 index 00000000..872ed69a --- /dev/null +++ b/python_client/src/torc/openapi_client/models/slurm_job_correlation_model.py @@ -0,0 +1,91 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, StrictInt, StrictStr +from typing import Any, ClassVar, Dict, List +from typing import Optional, Set +from typing_extensions import Self + +class SlurmJobCorrelationModel(BaseModel): + """ + One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given Torc job, derived from scheduled_compute_node -> compute_node -> result. + """ # noqa: E501 + job_id: StrictInt + job_name: StrictStr + slurm_job_id: StrictStr + __properties: ClassVar[List[str]] = ["job_id", "job_name", "slurm_job_id"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationModel from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationModel from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "job_id": obj.get("job_id"), + "job_name": obj.get("job_name"), + "slurm_job_id": obj.get("slurm_job_id") + }) + return _obj + + diff --git a/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py b/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py new file mode 100644 index 00000000..9a8d9239 --- /dev/null +++ b/python_client/src/torc/openapi_client/models/slurm_job_correlations_response.py @@ -0,0 +1,105 @@ +# coding: utf-8 + +""" + torc + + Rust-owned OpenAPI surface for Torc. + + The version of the OpenAPI document: 0.20.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, StrictBool, StrictInt +from typing import Any, ClassVar, Dict, List +from torc.openapi_client.models.slurm_job_correlation_model import SlurmJobCorrelationModel +from typing import Optional, Set +from typing_extensions import Self + +class SlurmJobCorrelationsResponse(BaseModel): + """ + A page of Slurm-job-to-Torc-job correlations for a workflow, computed server-side. + """ # noqa: E501 + count: StrictInt + has_more: StrictBool + items: List[SlurmJobCorrelationModel] + max_limit: StrictInt + offset: StrictInt + total_count: StrictInt + __properties: ClassVar[List[str]] = ["count", "has_more", "items", "max_limit", "offset", "total_count"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationsResponse from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of each item in items (list) + _items = [] + if self.items: + for _item_items in self.items: + if _item_items: + _items.append(_item_items.to_dict()) + _dict['items'] = _items + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of SlurmJobCorrelationsResponse from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "count": obj.get("count"), + "has_more": obj.get("has_more"), + "items": [SlurmJobCorrelationModel.from_dict(_item) for _item in obj["items"]] if obj.get("items") is not None else None, + "max_limit": obj.get("max_limit"), + "offset": obj.get("offset"), + "total_count": obj.get("total_count") + }) + return _obj + + diff --git a/src/client/apis/workflows_api.rs b/src/client/apis/workflows_api.rs index f2094778..c1ddaeb9 100644 --- a/src/client/apis/workflows_api.rs +++ b/src/client/apis/workflows_api.rs @@ -86,6 +86,26 @@ pub enum GetReadyJobRequirementsError { UnknownValue(serde_json::Value), } +/// struct for typed errors of method [`get_running_jobs`] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum GetRunningJobsError { + Status403(models::ErrorResponse), + Status404(models::ErrorResponse), + Status500(models::ErrorResponse), + UnknownValue(serde_json::Value), +} + +/// struct for typed errors of method [`get_slurm_job_correlations`] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum GetSlurmJobCorrelationsError { + Status403(models::ErrorResponse), + Status404(models::ErrorResponse), + Status500(models::ErrorResponse), + UnknownValue(serde_json::Value), +} + /// struct for typed errors of method [`get_workflow`] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] @@ -745,6 +765,138 @@ pub fn get_ready_job_requirements( } } +pub fn get_running_jobs( + configuration: &configuration::Configuration, + id: i64, + offset: Option, + limit: Option, +) -> Result> { + // add a prefix to parameters to efficiently prevent name collisions + let p_path_id = id; + let p_query_offset = offset; + let p_query_limit = limit; + + let uri_str = format!( + "{}/workflows/{id}/running_jobs", + configuration.base_path, + id = p_path_id + ); + let mut req_builder = configuration.client.request(reqwest::Method::GET, &uri_str); + + if let Some(ref param_value) = p_query_offset { + req_builder = req_builder.query(&[("offset", ¶m_value.to_string())]); + } + if let Some(ref param_value) = p_query_limit { + req_builder = req_builder.query(&[("limit", ¶m_value.to_string())]); + } + if let Some(ref user_agent) = configuration.user_agent { + req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone()); + } + req_builder = configuration.apply_auth(req_builder); + + let req = req_builder.build()?; + let resp = configuration.client.execute(req)?; + + let status = resp.status(); + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream"); + let content_type = super::ContentType::from(content_type); + + if !status.is_client_error() && !status.is_server_error() { + let content = resp.text()?; + match content_type { + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => { + return Err(Error::from(serde_json::Error::custom( + "Received `text/plain` content type response that cannot be converted to `models::RunningJobsResponse`", + ))); + } + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::RunningJobsResponse`" + )))); + } + } + } else { + let content = resp.text()?; + let entity: Option = serde_json::from_str(&content).ok(); + Err(Error::ResponseError(ResponseContent { + status, + content, + entity, + })) + } +} + +pub fn get_slurm_job_correlations( + configuration: &configuration::Configuration, + id: i64, + offset: Option, + limit: Option, +) -> Result> { + // add a prefix to parameters to efficiently prevent name collisions + let p_path_id = id; + let p_query_offset = offset; + let p_query_limit = limit; + + let uri_str = format!( + "{}/workflows/{id}/slurm_job_correlations", + configuration.base_path, + id = p_path_id + ); + let mut req_builder = configuration.client.request(reqwest::Method::GET, &uri_str); + + if let Some(ref param_value) = p_query_offset { + req_builder = req_builder.query(&[("offset", ¶m_value.to_string())]); + } + if let Some(ref param_value) = p_query_limit { + req_builder = req_builder.query(&[("limit", ¶m_value.to_string())]); + } + if let Some(ref user_agent) = configuration.user_agent { + req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone()); + } + req_builder = configuration.apply_auth(req_builder); + + let req = req_builder.build()?; + let resp = configuration.client.execute(req)?; + + let status = resp.status(); + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream"); + let content_type = super::ContentType::from(content_type); + + if !status.is_client_error() && !status.is_server_error() { + let content = resp.text()?; + match content_type { + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => { + return Err(Error::from(serde_json::Error::custom( + "Received `text/plain` content type response that cannot be converted to `models::SlurmJobCorrelationsResponse`", + ))); + } + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::SlurmJobCorrelationsResponse`" + )))); + } + } + } else { + let content = resp.text()?; + let entity: Option = serde_json::from_str(&content).ok(); + Err(Error::ResponseError(ResponseContent { + status, + content, + entity, + })) + } +} + pub fn get_workflow( configuration: &configuration::Configuration, id: i64, diff --git a/src/client/commands/compute_nodes.rs b/src/client/commands/compute_nodes.rs index fd8e5645..d746783a 100644 --- a/src/client/commands/compute_nodes.rs +++ b/src/client/commands/compute_nodes.rs @@ -17,6 +17,10 @@ struct ComputeNodeTableRow { id: i64, #[tabled(rename = "Hostname")] hostname: String, + #[tabled(rename = "Scheduler")] + scheduler_type: String, + #[tabled(rename = "Scheduler Job ID")] + scheduler_job_id: String, #[tabled(rename = "PID")] pid: i64, #[tabled(rename = "CPUs")] @@ -53,6 +57,8 @@ impl From<&models::ComputeNodeModel> for ComputeNodeTableRow { ComputeNodeTableRow { id: node.id.unwrap_or(-1), hostname: node.hostname.clone(), + scheduler_type: node.compute_node_type.clone(), + scheduler_job_id: format_scheduler_job_id(node), pid: node.pid, num_cpus: node.num_cpus, memory_gb: format!("{:.2}", node.memory_gb), @@ -66,6 +72,21 @@ impl From<&models::ComputeNodeModel> for ComputeNodeTableRow { } } +/// Extract the external scheduler job ID from the compute node's scheduler +/// blob for table display. Today only Slurm nodes set one (`slurm_job_id`); +/// other/local nodes show "-". Accepts the value as a number or string. +fn format_scheduler_job_id(node: &models::ComputeNodeModel) -> String { + node.scheduler + .as_ref() + .and_then(|s| s.get("slurm_job_id")) + .and_then(|v| { + v.as_i64() + .map(|n| n.to_string()) + .or_else(|| v.as_str().map(|s| s.to_string())) + }) + .unwrap_or_else(|| "-".to_string()) +} + fn format_system_cpu(node: &models::ComputeNodeModel) -> String { match (node.peak_cpu_percent, node.avg_cpu_percent) { (Some(peak), Some(avg)) => format!("{:.1}%/{:.1}%", peak, avg), @@ -258,6 +279,27 @@ mod tests { ) } + #[test] + fn compute_node_list_row_shows_scheduler_columns() { + // Local node: type shown, no scheduler job ID. + let local = make_compute_node(); + let local_row = ComputeNodeTableRow::from(&local); + assert_eq!(local_row.scheduler_type, "local"); + assert_eq!(local_row.scheduler_job_id, "-"); + + // Slurm node: type shown and the Slurm job ID surfaced from the blob. + let mut slurm = make_compute_node(); + slurm.compute_node_type = "slurm".to_string(); + slurm.scheduler = Some(serde_json::json!({ + "type": "slurm", + "scheduler_id": 3, + "slurm_job_id": 987654, + })); + let slurm_row = ComputeNodeTableRow::from(&slurm); + assert_eq!(slurm_row.scheduler_type, "slurm"); + assert_eq!(slurm_row.scheduler_job_id, "987654"); + } + #[test] fn compute_node_list_row_shows_resource_summary_when_present() { let mut node = make_compute_node(); diff --git a/src/client/commands/jobs.rs b/src/client/commands/jobs.rs index 504a915b..5e3cfb1a 100644 --- a/src/client/commands/jobs.rs +++ b/src/client/commands/jobs.rs @@ -98,6 +98,22 @@ struct JobFailureHandlerTableRow { rules_summary: String, } +#[derive(Tabled)] +struct RunningJobTableRow { + #[tabled(rename = "Job ID")] + job_id: i64, + #[tabled(rename = "Job Name")] + job_name: String, + #[tabled(rename = "Compute Node")] + compute_node: String, + #[tabled(rename = "Elapsed")] + elapsed: String, + #[tabled(rename = "Scheduler")] + scheduler_type: String, + #[tabled(rename = "Scheduler Job ID")] + scheduler_job_id: String, +} + #[derive(clap::Subcommand)] #[command(after_long_help = "\ EXAMPLES: @@ -398,6 +414,23 @@ EXAMPLES: #[arg(short, long)] job_id: Option, }, + /// List currently-running jobs with their compute node and scheduler info + #[command( + name = "running", + after_long_help = "\ +EXAMPLES: + # List running jobs with compute node names (and Slurm job IDs, if any) + torc jobs running 123 + + # JSON output + torc -f json jobs running 123 +" + )] + Running { + /// Workflow ID to list running jobs for (optional - will prompt if not provided) + #[arg()] + workflow_id: Option, + }, } pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format: &str) { @@ -1190,6 +1223,69 @@ pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format } } } + JobCommands::Running { workflow_id } => { + let user_name = get_env_user_name(); + let selected_workflow_id = match workflow_id { + Some(id) => *id, + None => select_workflow_interactively(config, &user_name).unwrap_or_else(|e| { + eprintln!("Error selecting workflow: {}", e); + std::process::exit(1); + }), + }; + + // Page through the server-side endpoint, which joins running jobs to + // their compute node and (when scheduler-managed) the scheduler job ID. + let mut running: Vec = Vec::new(); + let mut offset = 0; + loop { + let response = match apis::workflows_api::get_running_jobs( + config, + selected_workflow_id, + Some(offset), + None, + ) { + Ok(response) => response, + Err(e) => { + print_error("listing running jobs", &e); + std::process::exit(1); + } + }; + running.extend(response.items); + if !response.has_more { + break; + } + offset += response.count; + } + + if format == "json" { + // Wrap in {"items": [...]} for consistency with other list commands. + print_json_wrapped(&running, "running jobs"); + } else { + let rows: Vec = running + .iter() + .map(|j| RunningJobTableRow { + job_id: j.job_id, + job_name: j.job_name.clone(), + compute_node: j.compute_node_name.clone(), + elapsed: format_elapsed(j.start_time.as_deref()), + scheduler_type: j.scheduler_type.clone(), + // Match the compute-nodes listing: render absent IDs as "-". + scheduler_job_id: j + .scheduler_job_id + .clone() + .unwrap_or_else(|| "-".to_string()), + }) + .collect(); + + if format == "csv" { + display_csv(&rows); + } else if rows.is_empty() { + println!("No running jobs found"); + } else { + display_table_with_count(&rows, "running jobs"); + } + } + } } } diff --git a/src/client/commands/recover.rs b/src/client/commands/recover.rs index 81f50c22..53d781fc 100644 --- a/src/client/commands/recover.rs +++ b/src/client/commands/recover.rs @@ -684,7 +684,7 @@ fn get_slurm_log_info( workflow_id: i64, output_dir: &Path, ) -> Result { - build_results_report(config, Some(workflow_id), output_dir, false, &[]) + build_results_report(config, Some(workflow_id), output_dir, false, false, &[]) } /// Correlate failed jobs with their Slurm allocation logs diff --git a/src/client/commands/reports.rs b/src/client/commands/reports.rs index 9fb5a506..0c49fd73 100644 --- a/src/client/commands/reports.rs +++ b/src/client/commands/reports.rs @@ -590,15 +590,17 @@ pub fn generate_results_report( workflow_id: Option, output_dir: &Path, all_runs: bool, + failed: bool, job_ids: &[i64], ) { - let report = match build_results_report(config, workflow_id, output_dir, all_runs, job_ids) { - Ok(report) => report, - Err(e) => { - eprintln!("{}", e); - std::process::exit(1); - } - }; + let report = + match build_results_report(config, workflow_id, output_dir, all_runs, failed, job_ids) { + Ok(report) => report, + Err(e) => { + eprintln!("{}", e); + std::process::exit(1); + } + }; if report.total_results == 0 { if job_ids.is_empty() { @@ -620,6 +622,7 @@ pub fn build_results_report( workflow_id: Option, output_dir: &Path, all_runs: bool, + failed: bool, job_ids: &[i64], ) -> Result { if !output_dir.exists() { @@ -665,6 +668,14 @@ pub fn build_results_report( .collect() }; + // Respect --failed: keep only non-zero return codes (mirrors the non + // --include-logs path, which retains results where return_code != 0). + let results: Vec<_> = if failed { + results.into_iter().filter(|r| r.return_code != 0).collect() + } else { + results + }; + let mut result_records: Vec = Vec::new(); for result in &results { diff --git a/src/client/commands/results.rs b/src/client/commands/results.rs index e8a8334d..1f75e61b 100644 --- a/src/client/commands/results.rs +++ b/src/client/commands/results.rs @@ -35,29 +35,6 @@ fn format_cpu(percent: Option) -> String { } } -/// Helper function to create a map of job IDs to job names for a workflow -fn get_job_name_map( - config: &Configuration, - workflow_id: i64, -) -> std::collections::HashMap { - let mut job_names = std::collections::HashMap::new(); - - match pagination::paginate_jobs(config, workflow_id, pagination::JobListParams::new()) { - Ok(jobs) => { - for job in jobs { - if let Some(id) = job.id { - job_names.insert(id, job.name); - } - } - } - Err(_) => { - // If we can't fetch jobs, just continue without names - } - } - - job_names -} - #[derive(Tabled)] struct ResultTableRow { #[tabled(rename = "ID")] @@ -180,6 +157,7 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands, Some(selected_workflow_id), output_dir, *all_runs, + *failed, &job_ids, ); return; @@ -258,17 +236,15 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands, println!("No results found for workflow ID: {}", selected_workflow_id); } } else { - // Fetch job names for the workflow - let job_names = get_job_name_map(config, selected_workflow_id); - let rows: Vec = results .iter() .map(|result| ResultTableRow { id: result.id.unwrap_or(-1), job_id: result.job_id, - job_name: job_names - .get(&result.job_id) - .cloned() + // job_name is populated server-side on the result. + job_name: result + .job_name + .clone() .unwrap_or_else(|| "-".to_string()), workflow_id: result.workflow_id, run_id: result.run_id, @@ -292,11 +268,13 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands, } else { println!("Results for workflow ID {}:", selected_workflow_id); } - let exclude = vec![ - "WF ID".to_string(), - "Run ID".to_string(), - "Attempt".to_string(), - ]; + // Run ID is only meaningful when results span multiple + // runs; keep it hidden for the default single-run view + // but show it under --all-runs so runs are distinguishable. + let mut exclude = vec!["WF ID".to_string(), "Attempt".to_string()]; + if !*all_runs { + exclude.push("Run ID".to_string()); + } display_table_excluding(&rows, &exclude, "results"); } } diff --git a/src/client/commands/slurm.rs b/src/client/commands/slurm.rs index 59af0516..4166b09b 100644 --- a/src/client/commands/slurm.rs +++ b/src/client/commands/slurm.rs @@ -47,10 +47,10 @@ use crate::client::apis::configuration::Configuration; use crate::client::commands::get_env_user_name; use crate::client::commands::hpc::create_registry_with_config_public; use crate::client::commands::pagination::{ - ComputeNodeListParams, JobListParams, ResourceRequirementsListParams, ResultListParams, - ScheduledComputeNodeListParams, SlurmSchedulersListParams, paginate_compute_nodes, - paginate_jobs, paginate_resource_requirements, paginate_results, - paginate_scheduled_compute_nodes, paginate_slurm_schedulers, + JobListParams, ResourceRequirementsListParams, ResultListParams, + ScheduledComputeNodeListParams, SlurmSchedulersListParams, paginate_jobs, + paginate_resource_requirements, paginate_results, paginate_scheduled_compute_nodes, + paginate_slurm_schedulers, }; use crate::client::commands::{ print_error, select_workflow_interactively, @@ -2240,131 +2240,40 @@ fn build_slurm_to_jobs_map( ) -> HashMap> { let mut slurm_to_jobs: HashMap> = HashMap::new(); - // Step 1: Get all scheduled compute nodes (they have scheduler_id = Slurm job ID) - let scheduled_nodes = match paginate_scheduled_compute_nodes( - config, - workflow_id, - ScheduledComputeNodeListParams::new(), - ) { - Ok(nodes) => nodes, - Err(e) => { - warn!( - "Could not fetch scheduled compute nodes for job correlation: {}", - e - ); - return slurm_to_jobs; - } - }; - - // Build scn_id -> slurm_job_id map - let scn_to_slurm: HashMap = scheduled_nodes - .iter() - .filter(|scn| scn.scheduler_type == "slurm") - .filter_map(|scn| scn.id.map(|id| (id, scn.scheduler_id.to_string()))) - .collect(); - - if scn_to_slurm.is_empty() { - return slurm_to_jobs; - } - - // Step 2: Get all compute nodes and build slurm_job_id -> compute_node_ids map - let compute_nodes = - match paginate_compute_nodes(config, workflow_id, ComputeNodeListParams::new()) { - Ok(nodes) => nodes, + // The server correlates scheduled_compute_node -> compute_node -> result -> + // job in a single query (covering all runs), replacing what used to be four + // full-list fetches joined in memory here. Rows arrive grouped/sorted by + // (slurm_job_id, job_id); we page through them (server default page size) + // and accumulate, so each per-Slurm-job Vec stays deduplicated and ordered. + let mut offset = 0; + loop { + let response = match apis::workflows_api::get_slurm_job_correlations( + config, + workflow_id, + Some(offset), + None, + ) { + Ok(response) => response, Err(e) => { - warn!("Could not fetch compute nodes for job correlation: {}", e); + warn!("Could not fetch Slurm job correlations: {}", e); return slurm_to_jobs; } }; - // Build slurm_job_id -> Vec map using SCN relationship - let mut slurm_to_compute_nodes: HashMap> = HashMap::new(); - for node in &compute_nodes { - if node.compute_node_type != "slurm" { - continue; - } - if let Some(scheduler) = &node.scheduler { - // Get the SCN ID from the scheduler JSON - if let Some(scn_id) = scheduler.get("scheduler_id").and_then(|v| v.as_i64()) { - // Look up the Slurm job ID from our SCN map - if let Some(slurm_job_id) = scn_to_slurm.get(&scn_id) - && let Some(node_id) = node.id - { - slurm_to_compute_nodes - .entry(slurm_job_id.clone()) - .or_default() - .push(node_id); - } - } - } - } - - if slurm_to_compute_nodes.is_empty() { - return slurm_to_jobs; - } - - // Step 3: Get all results and build compute_node_id -> Vec map - let results = match paginate_results( - config, - workflow_id, - ResultListParams::new().with_all_runs(true), - ) { - Ok(results) => results, - Err(e) => { - warn!("Could not fetch results for job correlation: {}", e); - return slurm_to_jobs; - } - }; - - let mut compute_node_to_jobs: HashMap> = HashMap::new(); - for result in &results { - compute_node_to_jobs - .entry(result.compute_node_id) - .or_default() - .push(result.job_id); - } - - // Step 4: Get all jobs and build job_id -> job_name map - let jobs = match paginate_jobs(config, workflow_id, JobListParams::new()) { - Ok(jobs) => jobs, - Err(e) => { - warn!("Could not fetch jobs for job correlation: {}", e); - return slurm_to_jobs; - } - }; - - let job_id_to_name: HashMap = jobs - .iter() - .filter_map(|j| j.id.map(|id| (id, j.name.clone()))) - .collect(); - - // Step 5: Build the final slurm_job_id -> Vec map - for (slurm_id, compute_node_ids) in &slurm_to_compute_nodes { - let mut affected_jobs: Vec = Vec::new(); - let mut seen_job_ids: std::collections::HashSet = std::collections::HashSet::new(); - - for compute_node_id in compute_node_ids { - if let Some(job_ids) = compute_node_to_jobs.get(compute_node_id) { - for job_id in job_ids { - if seen_job_ids.insert(*job_id) { - let job_name = job_id_to_name - .get(job_id) - .cloned() - .unwrap_or_else(|| format!("job_{}", job_id)); - affected_jobs.push(AffectedJob { - job_id: *job_id, - job_name, - }); - } - } - } + for item in response.items { + slurm_to_jobs + .entry(item.slurm_job_id) + .or_default() + .push(AffectedJob { + job_id: item.job_id, + job_name: item.job_name, + }); } - if !affected_jobs.is_empty() { - // Sort by job_id for consistent output - affected_jobs.sort_by_key(|j| j.job_id); - slurm_to_jobs.insert(slurm_id.clone(), affected_jobs); + if !response.has_more { + break; } + offset += response.count; } slurm_to_jobs diff --git a/src/client/offline_journal.rs b/src/client/offline_journal.rs index 343ebf41..09826db4 100644 --- a/src/client/offline_journal.rs +++ b/src/client/offline_journal.rs @@ -248,6 +248,7 @@ mod tests { peak_cpu_percent: None, avg_cpu_percent: None, status: JobStatus::Completed, + job_name: None, }, } } diff --git a/src/client/report_models.rs b/src/client/report_models.rs index 6ac16aed..c34478e3 100644 --- a/src/client/report_models.rs +++ b/src/client/report_models.rs @@ -113,9 +113,11 @@ pub struct ResultsReport { pub workflow_user: String, pub all_runs: bool, pub total_results: usize, - /// Job result records + /// Job result records. /// - /// Note: This field is named `results` in JSON output (not `jobs`). + /// Serialized as `items` for consistency with the other list commands and + /// the REST API's paginated responses (which all wrap records in `items`). + #[serde(rename = "items")] pub results: Vec, } @@ -239,6 +241,15 @@ mod tests { }; let json = serde_json::to_string(&report).unwrap(); + // Records serialize under `items` (not `results`), matching other lists. + assert!( + json.contains("\"items\""), + "records should serialize under `items`: {json}" + ); + assert!( + !json.contains("\"results\""), + "records should not use the old `results` key: {json}" + ); let parsed: ResultsReport = serde_json::from_str(&json).unwrap(); assert_eq!(parsed.workflow_id, 1); diff --git a/src/lib.rs b/src/lib.rs index b9829e6f..d0de7d60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,8 +78,9 @@ pub use models::{ ListLocalSchedulersResponse, ListMissingUserDataResponse, ListRequiredExistingFilesResponse, ListResourceRequirementsResponse, ListResultsResponse, ListScheduledComputeNodesResponse, ListSlurmSchedulersResponse, ListUserDataResponse, ListWorkflowsResponse, LocalSchedulerModel, - ProcessChangedJobInputsResponse, ResourceRequirementsModel, ResultModel, - ScheduledComputeNodesModel, SlurmSchedulerModel, UserDataModel, WorkflowActionModel, + ProcessChangedJobInputsResponse, ResourceRequirementsModel, ResultModel, RunningJobModel, + RunningJobsResponse, ScheduledComputeNodesModel, SlurmJobCorrelationModel, + SlurmJobCorrelationsResponse, SlurmSchedulerModel, UserDataModel, WorkflowActionModel, WorkflowModel, WorkflowStatusResponse, }; diff --git a/src/models.rs b/src/models.rs index 11d9cd20..b2af6114 100644 --- a/src/models.rs +++ b/src/models.rs @@ -339,6 +339,11 @@ pub struct ResultModel { #[serde(skip_serializing_if = "Option::is_none")] pub avg_cpu_percent: Option, pub status: JobStatus, + /// Name of the job this result belongs to. Populated by the server on read + /// paths (list/get) as a convenience so clients need not re-fetch jobs; it + /// is ignored on create/update input. + #[serde(skip_serializing_if = "Option::is_none")] + pub job_name: Option, } #[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] @@ -1890,6 +1895,7 @@ impl ResultModel { peak_cpu_percent: None, avg_cpu_percent: None, status, + job_name: None, } } } @@ -2388,6 +2394,59 @@ pub struct WorkflowStatusResponse { pub is_canceled: bool, } +/// One Slurm-job-to-Torc-job correlation row: the Slurm job that ran a given +/// Torc job, derived from scheduled_compute_node -> compute_node -> result. +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SlurmJobCorrelationModel { + pub slurm_job_id: String, + pub job_id: i64, + pub job_name: String, +} + +/// A page of Slurm-job-to-Torc-job correlations for a workflow, computed +/// server-side. +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SlurmJobCorrelationsResponse { + pub items: Vec, + pub offset: i64, + pub max_limit: i64, + pub count: i64, + pub total_count: i64, + pub has_more: bool, +} + +/// A currently-running job together with the compute node it occupies and, +/// when the node was provisioned by a scheduler, that scheduler's job ID. +/// `scheduler_type` is generic (e.g. "slurm", "local"); `scheduler_job_id` +/// is populated only for scheduler-managed nodes (the Slurm job ID today). +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RunningJobModel { + pub job_id: i64, + pub job_name: String, + pub compute_node_name: String, + pub scheduler_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub scheduler_job_id: Option, + /// RFC3339 time the job started running, for computing elapsed time. + #[serde(skip_serializing_if = "Option::is_none")] + pub start_time: Option, +} + +/// A page of currently-running jobs for a workflow, computed server-side. +#[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RunningJobsResponse { + pub items: Vec, + pub offset: i64, + pub max_limit: i64, + pub count: i64, + pub total_count: i64, + pub has_more: bool, +} + #[cfg_attr(feature = "openapi-codegen", derive(utoipa::ToSchema))] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ResetJobStatusResponse { @@ -2521,6 +2580,7 @@ mod tests { peak_cpu_percent: Some(90.0), avg_cpu_percent: Some(60.0), status: JobStatus::Completed, + job_name: None, }; let file = FileModel { id: Some(1), diff --git a/src/openapi_spec.rs b/src/openapi_spec.rs index 9e2acf98..008e4a67 100644 --- a/src/openapi_spec.rs +++ b/src/openapi_spec.rs @@ -24,7 +24,8 @@ use crate::models::{ ListUserDataResponse, ListUserGroupMembershipsResponse, ListWorkflowsResponse, LocalSchedulerModel, MessageResponse, ProcessChangedJobInputsResponse, ReloadAuthResponse, RemoteWorkerModel, ResetJobStatusResponse, ResourceRequirementsModel, ResultModel, - RoCrateEntityModel, ScheduledComputeNodesModel, SlurmSchedulerModel, SlurmStatsModel, + RoCrateEntityModel, RunningJobModel, RunningJobsResponse, ScheduledComputeNodesModel, + SlurmJobCorrelationModel, SlurmJobCorrelationsResponse, SlurmSchedulerModel, SlurmStatsModel, SpawnJobModel, SpawnJobsRequest, SpawnJobsResponse, UserDataListModel, UserDataModel, UserGroupMembershipModel, WorkflowAccessGroupModel, WorkflowActionModel, WorkflowModel, WorkflowStatusResponse, @@ -153,7 +154,8 @@ mod openapi_workflow_paths { __path_archive_workflow, __path_batch_complete_jobs, __path_cancel_workflow, __path_claim_jobs_based_on_resources, __path_claim_next_jobs, __path_create_workflow, __path_delete_workflow, __path_get_active_task_for_workflow, - __path_get_ready_job_requirements, __path_get_workflow, __path_get_workflow_status, + __path_get_ready_job_requirements, __path_get_running_jobs, + __path_get_slurm_job_correlations, __path_get_workflow, __path_get_workflow_status, __path_initialize_jobs, __path_is_workflow_complete, __path_is_workflow_uninitialized, __path_list_job_dependencies, __path_list_job_file_relationships, __path_list_job_ids, __path_list_job_user_data_relationships, __path_list_missing_user_data, @@ -161,12 +163,12 @@ mod openapi_workflow_paths { __path_process_changed_job_inputs, __path_reset_job_status, __path_reset_workflow_status, __path_update_workflow, archive_workflow, batch_complete_jobs, cancel_workflow, claim_jobs_based_on_resources, claim_next_jobs, create_workflow, delete_workflow, - get_active_task_for_workflow, get_ready_job_requirements, get_workflow, - get_workflow_status, initialize_jobs, is_workflow_complete, is_workflow_uninitialized, - list_job_dependencies, list_job_file_relationships, list_job_ids, - list_job_user_data_relationships, list_missing_user_data, list_required_existing_files, - list_workflows, process_changed_job_inputs, reset_job_status, reset_workflow_status, - update_workflow, + get_active_task_for_workflow, get_ready_job_requirements, get_running_jobs, + get_slurm_job_correlations, get_workflow, get_workflow_status, initialize_jobs, + is_workflow_complete, is_workflow_uninitialized, list_job_dependencies, + list_job_file_relationships, list_job_ids, list_job_user_data_relationships, + list_missing_user_data, list_required_existing_files, list_workflows, + process_changed_job_inputs, reset_job_status, reset_workflow_status, update_workflow, }; } @@ -537,6 +539,8 @@ fn resolve_schema_properties<'a>( openapi_workflow_paths::delete_workflow, openapi_workflow_paths::get_workflow, openapi_workflow_paths::get_workflow_status, + openapi_workflow_paths::get_slurm_job_correlations, + openapi_workflow_paths::get_running_jobs, openapi_workflow_paths::update_workflow, openapi_workflow_paths::cancel_workflow, openapi_workflow_paths::initialize_jobs, @@ -650,6 +654,10 @@ fn resolve_schema_properties<'a>( IsUninitializedResponse, JobStatusCounts, WorkflowStatusResponse, + SlurmJobCorrelationModel, + SlurmJobCorrelationsResponse, + RunningJobModel, + RunningJobsResponse, ResetJobStatusResponse, UserDataModel, ListUserDataResponse @@ -1627,6 +1635,7 @@ pub fn parity_report(source: &str) -> Result, Box Result, Box Result, Box + Send>> = @@ -367,12 +381,9 @@ where let where_clause = where_conditions.join(" AND "); let sort_by = if let Some(ref col) = sort_by { if RESULT_COLUMNS.contains(&col.as_str()) { - // If we have a join (show_all_results is false), prefix with "r." if it's a result column - if !show_all_results { - Some(format!("r.{}", col)) - } else { - Some(col.clone()) - } + // `result` is always aliased `r`, so qualify the sort column to + // avoid ambiguity with the joined `job` table. + Some(format!("r.{}", col)) } else { debug!("Invalid sort column requested: {}", col); None // Fall back to default @@ -381,10 +392,18 @@ where None }; - // Build the complete query with pagination and sorting + // Build the complete query with pagination and sorting. The default sort + // column is qualified (`r.id`) for the same reason. let query = SqlQueryBuilder::new(base_query) .with_where(where_clause.clone()) - .with_pagination_and_sorting(offset, limit, sort_by, reverse_sort, "id", RESULT_COLUMNS) + .with_pagination_and_sorting( + offset, + limit, + sort_by, + reverse_sort, + "r.id", + RESULT_COLUMNS, + ) .build(); debug!("Executing query: {}", query); @@ -439,12 +458,15 @@ where peak_cpu_percent: record.get("peak_cpu_percent"), avg_cpu_percent: record.get("avg_cpu_percent"), status, + job_name: record.get("job_name"), }); } - // For proper pagination, we should get the total count without LIMIT/OFFSET + // For proper pagination, get the total count without LIMIT/OFFSET. The + // WHERE clause uses the `r.` prefix, so `result` is aliased `r` here too + // (the job LEFT JOIN is unnecessary for a count). let count_base = if show_all_results { - "SELECT COUNT(*) as total FROM result".to_string() + "SELECT COUNT(*) as total FROM result r".to_string() } else { "SELECT COUNT(*) as total FROM result r INNER JOIN workflow_result wr ON r.id = wr.result_id".to_string() }; diff --git a/src/server/api/workflows.rs b/src/server/api/workflows.rs index 0b22214e..bc46cef2 100644 --- a/src/server/api/workflows.rs +++ b/src/server/api/workflows.rs @@ -10,10 +10,11 @@ use sqlx::Row; use crate::server::api_responses::{ ArchiveWorkflowResponse, CancelWorkflowResponse, CreateWorkflowResponse, - DeleteWorkflowResponse, GetWorkflowResponse, GetWorkflowStatusResponse, - IsWorkflowCompleteResponse, IsWorkflowUninitializedResponse, ListJobDependenciesResponse, - ListJobFileRelationshipsResponse, ListJobUserDataRelationshipsResponse, ListWorkflowsResponse, - ResetWorkflowStatusResponse, UpdateWorkflowResponse, + DeleteWorkflowResponse, GetRunningJobsResponse, GetSlurmJobCorrelationsResponse, + GetWorkflowResponse, GetWorkflowStatusResponse, IsWorkflowCompleteResponse, + IsWorkflowUninitializedResponse, ListJobDependenciesResponse, ListJobFileRelationshipsResponse, + ListJobUserDataRelationshipsResponse, ListWorkflowsResponse, ResetWorkflowStatusResponse, + UpdateWorkflowResponse, }; use crate::models; @@ -185,6 +186,24 @@ pub trait WorkflowsApi { context: &C, ) -> Result; + /// Return Slurm-job-to-Torc-job correlations for the workflow. + async fn get_slurm_job_correlations( + &self, + id: i64, + offset: i64, + limit: i64, + context: &C, + ) -> Result; + + /// Return currently-running jobs with their compute node and scheduler info. + async fn get_running_jobs( + &self, + id: i64, + offset: i64, + limit: i64, + context: &C, + ) -> Result; + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, @@ -1432,6 +1451,190 @@ where )) } + /// Return Slurm-job-to-Torc-job correlations for the workflow. + /// + /// Joins scheduled_compute_node (the Slurm allocation, whose `scheduler_id` + /// is the Slurm job ID) -> compute_node (linked via the scheduler JSON's + /// `scheduler_id`) -> result (`compute_node_id`) -> job. Replaces the former + /// client-side correlation that fetched four full lists and joined them in + /// memory. Covers all runs, matching the previous `all_runs=true` behavior. + /// Each workflow filter propagates through the join conditions so every + /// table is narrowed by its `workflow_id` index before the join. + async fn get_slurm_job_correlations( + &self, + id: i64, + offset: i64, + limit: i64, + context: &C, + ) -> Result { + debug!( + "get_slurm_job_correlations({}, offset={}, limit={}) - X-Span-ID: {:?}", + id, + offset, + limit, + context.get().0.clone() + ); + + // Workflow existence/access (404/403) is already enforced by + // authorize_workflow! in the transport layer before this runs. + let pool = self.context.pool.as_ref(); + + // Shared grouped query (one row per distinct (Slurm job, Torc job)). + // The page query orders and slices it; the count query wraps it. Each + // table is narrowed by its workflow_id index before joining. + let grouped = r#" + SELECT + CAST(scn.scheduler_id AS TEXT) AS slurm_job_id, + j.id AS job_id, + j.name AS job_name + FROM scheduled_compute_node scn + JOIN compute_node cn + ON cn.workflow_id = scn.workflow_id + AND json_extract(cn.scheduler, '$.scheduler_id') = scn.id + JOIN result r + ON r.compute_node_id = cn.id + AND r.workflow_id = scn.workflow_id + JOIN job j ON j.id = r.job_id + WHERE scn.workflow_id = ? + AND scn.scheduler_type = 'slurm' + AND cn.compute_node_type = 'slurm' + GROUP BY slurm_job_id, j.id, j.name + "#; + + let total_count: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM ({grouped})")) + .bind(id) + .fetch_one(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to count Slurm correlations"))?; + + let page_sql = format!("{grouped} ORDER BY slurm_job_id, job_id LIMIT ? OFFSET ?"); + let rows = sqlx::query(&page_sql) + .bind(id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to correlate Slurm jobs"))?; + + let items: Vec = rows + .iter() + .map(|row| models::SlurmJobCorrelationModel { + slurm_job_id: row.get("slurm_job_id"), + job_id: row.get("job_id"), + job_name: row.get("job_name"), + }) + .collect(); + + let response = crate::paginated_list_response!( + models::SlurmJobCorrelationsResponse, + items, + offset, + total_count + ); + + Ok(GetSlurmJobCorrelationsResponse::SuccessfulResponse( + response, + )) + } + + /// Return currently-running jobs with their compute node and scheduler info. + /// + /// Joins live job state (`job.compute_node_id`, set while running) to the + /// compute node, and LEFT JOINs the scheduled_compute_node it belongs to so + /// the external scheduler job ID is included when one exists. The design is + /// scheduler-agnostic: `scheduler_type` comes from the compute node's type + /// (e.g. "slurm", "local") and `scheduler_job_id` is NULL for nodes not + /// provisioned by a scheduler. New scheduler types flow through unchanged. + async fn get_running_jobs( + &self, + id: i64, + offset: i64, + limit: i64, + context: &C, + ) -> Result { + debug!( + "get_running_jobs({}, offset={}, limit={}) - X-Span-ID: {:?}", + id, + offset, + limit, + context.get().0.clone() + ); + + // Workflow existence/access (404/403) is already enforced by + // authorize_workflow! in the transport layer before this runs. + let pool = self.context.pool.as_ref(); + + let running = models::JobStatus::Running.to_int(); + + let total_count: i64 = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM job j + JOIN compute_node cn + ON cn.id = j.compute_node_id + AND cn.workflow_id = j.workflow_id + WHERE j.workflow_id = ? + AND j.status = ? + "#, + ) + .bind(id) + .bind(running) + .fetch_one(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to count running jobs"))?; + + let rows = sqlx::query( + r#" + SELECT + j.id AS job_id, + j.name AS job_name, + cn.hostname AS compute_node_name, + cn.compute_node_type AS scheduler_type, + CAST(scn.scheduler_id AS TEXT) AS scheduler_job_id, + j.start_time AS start_time + FROM job j + JOIN compute_node cn + ON cn.id = j.compute_node_id + AND cn.workflow_id = j.workflow_id + LEFT JOIN scheduled_compute_node scn + ON scn.workflow_id = j.workflow_id + AND json_extract(cn.scheduler, '$.scheduler_id') = scn.id + WHERE j.workflow_id = ? + AND j.status = ? + ORDER BY j.id + LIMIT ? OFFSET ? + "#, + ) + .bind(id) + .bind(running) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + .map_err(|e| database_error_with_msg(e, "Failed to list running jobs"))?; + + let items: Vec = rows + .iter() + .map(|row| models::RunningJobModel { + job_id: row.get("job_id"), + job_name: row.get("job_name"), + compute_node_name: row.get("compute_node_name"), + scheduler_type: row.get("scheduler_type"), + scheduler_job_id: row.get("scheduler_job_id"), + start_time: row.get("start_time"), + }) + .collect(); + + let response = crate::paginated_list_response!( + models::RunningJobsResponse, + items, + offset, + total_count + ); + + Ok(GetRunningJobsResponse::SuccessfulResponse(response)) + } + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, diff --git a/src/server/api_contract.rs b/src/server/api_contract.rs index 53324a9e..832b4525 100644 --- a/src/server/api_contract.rs +++ b/src/server/api_contract.rs @@ -767,6 +767,24 @@ pub trait TransportApiCore { context: &C, ) -> Result; + /// Return Slurm-job-to-Torc-job correlations for the workflow. + async fn get_slurm_job_correlations( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result; + + /// Return currently-running jobs with their compute node and scheduler info. + async fn get_running_jobs( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result; + /// Return true if all jobs in the workflow are uninitialized or disabled. async fn is_workflow_uninitialized( &self, diff --git a/src/server/api_responses.rs b/src/server/api_responses.rs index 821c880f..31b7178a 100644 --- a/src/server/api_responses.rs +++ b/src/server/api_responses.rs @@ -1016,6 +1016,32 @@ pub enum GetWorkflowStatusResponse { DefaultErrorResponse(models::ErrorResponse), } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[must_use] +pub enum GetSlurmJobCorrelationsResponse { + /// Successful response + SuccessfulResponse(models::SlurmJobCorrelationsResponse), + /// Forbidden - user does not have access + ForbiddenErrorResponse(models::ErrorResponse), + /// Not found error response + NotFoundErrorResponse(models::ErrorResponse), + /// Default error response + DefaultErrorResponse(models::ErrorResponse), +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[must_use] +pub enum GetRunningJobsResponse { + /// Successful response + SuccessfulResponse(models::RunningJobsResponse), + /// Forbidden - user does not have access + ForbiddenErrorResponse(models::ErrorResponse), + /// Not found error response + NotFoundErrorResponse(models::ErrorResponse), + /// Default error response + DefaultErrorResponse(models::ErrorResponse), +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] #[must_use] pub enum IsWorkflowUninitializedResponse { diff --git a/src/server/http_server.rs b/src/server/http_server.rs index f65f1ec1..390af8b0 100644 --- a/src/server/http_server.rs +++ b/src/server/http_server.rs @@ -287,6 +287,19 @@ fn process_pagination_params( let processed_offset = offset.unwrap_or(0); let processed_limit = limit.unwrap_or(MAX_RECORD_TRANSFER_COUNT); + if processed_offset < 0 { + error!("Negative offset is not allowed: {}", processed_offset); + return Err(ApiError("offset cannot be negative".to_string())); + } + + // Reject negative limits explicitly: SQLite treats a negative LIMIT as + // "no limit", which would bypass MAX_RECORD_TRANSFER_COUNT and allow + // unbounded responses. + if processed_limit < 0 { + error!("Negative limit is not allowed: {}", processed_limit); + return Err(ApiError("limit cannot be negative".to_string())); + } + if processed_limit > MAX_RECORD_TRANSFER_COUNT { error!( "Limit exceeds maximum allowed value: {} > {}", @@ -2043,6 +2056,32 @@ where self.transport_get_workflow_status(id, context).await } + /// Return Slurm-job-to-Torc-job correlations for the workflow. + #[instrument(level = "debug", skip(self, context), fields(workflow_id = id))] + async fn get_slurm_job_correlations( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result { + self.transport_get_slurm_job_correlations(id, offset, limit, context) + .await + } + + /// Return currently-running jobs with their compute node and scheduler info. + #[instrument(level = "debug", skip(self, context), fields(workflow_id = id))] + async fn get_running_jobs( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result { + self.transport_get_running_jobs(id, offset, limit, context) + .await + } + async fn is_workflow_uninitialized( &self, id: i64, @@ -2641,3 +2680,38 @@ where } } } + +#[cfg(test)] +mod pagination_param_tests { + use super::process_pagination_params; + use crate::MAX_RECORD_TRANSFER_COUNT; + + #[test] + fn defaults_and_valid_values_are_accepted() { + assert_eq!( + process_pagination_params(None, None).unwrap(), + (0, MAX_RECORD_TRANSFER_COUNT) + ); + assert_eq!( + process_pagination_params(Some(5), Some(10)).unwrap(), + (5, 10) + ); + assert_eq!(process_pagination_params(Some(0), Some(0)).unwrap(), (0, 0)); + } + + #[test] + fn negative_limit_is_rejected() { + // SQLite would treat LIMIT -1 as unbounded; ensure it errors instead. + assert!(process_pagination_params(None, Some(-1)).is_err()); + } + + #[test] + fn negative_offset_is_rejected() { + assert!(process_pagination_params(Some(-1), None).is_err()); + } + + #[test] + fn limit_over_max_is_rejected() { + assert!(process_pagination_params(None, Some(MAX_RECORD_TRANSFER_COUNT + 1)).is_err()); + } +} diff --git a/src/server/http_server/workflows_transport.rs b/src/server/http_server/workflows_transport.rs index cbfbf349..0e7bf47d 100644 --- a/src/server/http_server/workflows_transport.rs +++ b/src/server/http_server/workflows_transport.rs @@ -404,6 +404,46 @@ where self.workflows_api.get_workflow_status(id, context).await } + pub(super) async fn transport_get_slurm_job_correlations( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result { + let (offset, limit) = authorize_workflow_and_paginate!( + self, + id, + context, + GetSlurmJobCorrelationsResponse, + offset, + limit + ); + self.workflows_api + .get_slurm_job_correlations(id, offset, limit, context) + .await + } + + pub(super) async fn transport_get_running_jobs( + &self, + id: i64, + offset: Option, + limit: Option, + context: &C, + ) -> Result { + let (offset, limit) = authorize_workflow_and_paginate!( + self, + id, + context, + GetRunningJobsResponse, + offset, + limit + ); + self.workflows_api + .get_running_jobs(id, offset, limit, context) + .await + } + pub(super) async fn transport_is_workflow_uninitialized( &self, id: i64, diff --git a/src/server/http_transport/response_mapping.rs b/src/server/http_transport/response_mapping.rs index 6558d81a..b58498fa 100644 --- a/src/server/http_transport/response_mapping.rs +++ b/src/server/http_transport/response_mapping.rs @@ -559,6 +559,16 @@ map_response!( GetWorkflowStatusResponse, SuccessfulResponse ); +map_response!( + get_slurm_job_correlations_response, + GetSlurmJobCorrelationsResponse, + SuccessfulResponse +); +map_response!( + get_running_jobs_response, + GetRunningJobsResponse, + SuccessfulResponse +); map_response!( is_workflow_uninitialized_response, IsWorkflowUninitializedResponse, diff --git a/src/server/live_router.rs b/src/server/live_router.rs index a0ab4c98..6d8040aa 100644 --- a/src/server/live_router.rs +++ b/src/server/live_router.rs @@ -333,6 +333,14 @@ pub fn app_router(state: LiveRouterState) -> Router { "/torc-service/v1/workflows/{id}/status", get(get_workflow_status), ) + .route( + "/torc-service/v1/workflows/{id}/slurm_job_correlations", + get(get_slurm_job_correlations), + ) + .route( + "/torc-service/v1/workflows/{id}/running_jobs", + get(get_running_jobs), + ) .route( "/torc-service/v1/workflows/{id}/is_uninitialized", get(is_workflow_uninitialized), @@ -3762,6 +3770,80 @@ pub async fn get_workflow_status( } } +#[derive(Debug, Clone, Deserialize, IntoParams)] +pub struct SlurmJobCorrelationsQuery { + #[param(nullable = true)] + pub offset: Option, + #[param(nullable = true)] + pub limit: Option, +} + +#[utoipa::path( + get, + tag = "workflows", + path = "/workflows/{id}/slurm_job_correlations", + operation_id = "get_slurm_job_correlations", + params(("id" = i64, Path, description = "Workflow ID"), SlurmJobCorrelationsQuery), + responses( + (status = 200, body = models::SlurmJobCorrelationsResponse), + (status = 403, body = models::ErrorResponse, description = "User does not have access"), + (status = 404, body = models::ErrorResponse, description = "Workflow not found"), + (status = 500, body = models::ErrorResponse) + ) +)] +pub async fn get_slurm_job_correlations( + State(state): State, + Path(id): Path, + Query(query): Query, + Extension(context): Extension, +) -> Response { + match state + .server + .get_slurm_job_correlations(id, query.offset, query.limit, &context) + .await + { + Ok(response) => get_slurm_job_correlations_response(response), + Err(err) => error_response(StatusCode::INTERNAL_SERVER_ERROR, err.0), + } +} + +#[derive(Debug, Clone, Deserialize, IntoParams)] +pub struct RunningJobsQuery { + #[param(nullable = true)] + pub offset: Option, + #[param(nullable = true)] + pub limit: Option, +} + +#[utoipa::path( + get, + tag = "workflows", + path = "/workflows/{id}/running_jobs", + operation_id = "get_running_jobs", + params(("id" = i64, Path, description = "Workflow ID"), RunningJobsQuery), + responses( + (status = 200, body = models::RunningJobsResponse), + (status = 403, body = models::ErrorResponse, description = "User does not have access"), + (status = 404, body = models::ErrorResponse, description = "Workflow not found"), + (status = 500, body = models::ErrorResponse) + ) +)] +pub async fn get_running_jobs( + State(state): State, + Path(id): Path, + Query(query): Query, + Extension(context): Extension, +) -> Response { + match state + .server + .get_running_jobs(id, query.offset, query.limit, &context) + .await + { + Ok(response) => get_running_jobs_response(response), + Err(err) => error_response(StatusCode::INTERNAL_SERVER_ERROR, err.0), + } +} + #[utoipa::path( get, tag = "workflows", diff --git a/src/server/response_types.rs b/src/server/response_types.rs index 0f851fcb..05b0255d 100644 --- a/src/server/response_types.rs +++ b/src/server/response_types.rs @@ -77,9 +77,9 @@ pub mod workflows { pub use crate::server::api_responses::{ ArchiveWorkflowResponse, CancelWorkflowResponse, ClaimActionResponse, CreateWorkflowActionResponse, CreateWorkflowResponse, DeleteWorkflowResponse, - GetActiveTaskResponse, GetPendingActionsResponse, GetWorkflowActionsResponse, - GetWorkflowResponse, GetWorkflowStatusResponse, IsWorkflowCompleteResponse, - IsWorkflowUninitializedResponse, ListWorkflowsResponse, ResetWorkflowStatusResponse, - UpdateWorkflowResponse, + GetActiveTaskResponse, GetPendingActionsResponse, GetRunningJobsResponse, + GetSlurmJobCorrelationsResponse, GetWorkflowActionsResponse, GetWorkflowResponse, + GetWorkflowStatusResponse, IsWorkflowCompleteResponse, IsWorkflowUninitializedResponse, + ListWorkflowsResponse, ResetWorkflowStatusResponse, UpdateWorkflowResponse, }; } diff --git a/tests/test_results.rs b/tests/test_results.rs index 8482188f..23496d72 100644 --- a/tests/test_results.rs +++ b/tests/test_results.rs @@ -1,8 +1,8 @@ mod common; use common::{ - ServerProcess, create_test_job, create_test_result, create_test_workflow, run_cli_with_json, - start_server, + ServerProcess, create_test_job, create_test_result, create_test_workflow, run_cli_command, + run_cli_with_json, start_server, }; use rstest::rstest; use serde_json::json; @@ -63,6 +63,12 @@ fn test_results_list_command_json(start_server: &ServerProcess) { assert!(result.get("exec_time_minutes").is_some()); assert!(result.get("completion_time").is_some()); assert!(result.get("status").is_some()); + // job_name is populated server-side (denormalized from the job). + let job_name = result.get("job_name").and_then(|v| v.as_str()); + assert!( + matches!(job_name, Some("job1") | Some("job2")), + "expected server-populated job_name, got {job_name:?}" + ); } } @@ -1010,3 +1016,40 @@ fn test_results_workflow_result_table_cleanup_on_reinitialize(start_server: &Ser "With --all-runs should still show 2 historical results after reinitialize" ); } + +#[rstest] +fn test_results_list_run_id_column_with_all_runs(start_server: &ServerProcess) { + let config = &start_server.config; + let workflow = create_test_workflow(config, "test_results_run_id_column"); + let workflow_id = workflow.id.unwrap(); + let job = create_test_job(config, workflow_id, "job1"); + let _result = create_test_result(config, workflow_id, job.id.unwrap()); + + // Default table view lists the result but hides the Run ID column. + let default_table = run_cli_command( + &["results", "list", &workflow_id.to_string()], + start_server, + None, + ) + .expect("results list (default)"); + assert!( + default_table.contains("job1"), + "default view should list the result, got:\n{default_table}" + ); + assert!( + !default_table.contains("Run ID"), + "default view should not include the Run ID column, got:\n{default_table}" + ); + + // --all-runs surfaces the Run ID column so runs are distinguishable. + let all_runs_table = run_cli_command( + &["results", "list", &workflow_id.to_string(), "--all-runs"], + start_server, + None, + ) + .expect("results list --all-runs"); + assert!( + all_runs_table.contains("Run ID"), + "--all-runs should include the Run ID column, got:\n{all_runs_table}" + ); +} diff --git a/tests/test_workflows.rs b/tests/test_workflows.rs index e482e8c6..cfac1fb2 100644 --- a/tests/test_workflows.rs +++ b/tests/test_workflows.rs @@ -1593,3 +1593,156 @@ fn test_get_workflow_status_not_found(start_server: &ServerProcess) { "expected error for nonexistent workflow status" ); } + +#[rstest] +fn test_get_slurm_job_correlations(start_server: &ServerProcess) { + let config = &start_server.config; + let workflow = create_test_workflow(config, "slurm_correlation_workflow"); + let workflow_id = workflow.id.unwrap(); + let job = create_test_job(config, workflow_id, "slurm_job"); + let job_id = job.id.unwrap(); + + // Slurm allocation: scheduler_id is the Slurm job ID. + let scn = apis::scheduled_compute_nodes_api::create_scheduled_compute_node( + config, + models::ScheduledComputeNodesModel::new( + workflow_id, + 987654, + 1, + "slurm".to_string(), + "active".to_string(), + ), + ) + .expect("create scheduled compute node"); + let scn_id = scn.id.unwrap(); + + // Compute node belonging to that allocation; the scheduler JSON links it + // back to the scheduled compute node via scheduler_id. + let compute_node = apis::compute_nodes_api::create_compute_node( + config, + models::ComputeNodeModel::new( + workflow_id, + "slurm-host".to_string(), + std::process::id() as i64, + chrono::Utc::now().to_rfc3339(), + 8, + 16.0, + 0, + 1, + "slurm".to_string(), + Some(serde_json::json!({ "scheduler_id": scn_id })), + ), + ) + .expect("create compute node"); + let compute_node_id = compute_node.id.unwrap(); + + // Result links the job to that compute node, completing the chain. + let result = models::ResultModel::new( + job_id, + workflow_id, + 1, + 1, + compute_node_id, + 0, + 1.0, + "2024-01-01T12:00:00.000Z".to_string(), + models::JobStatus::Completed, + ); + apis::results_api::create_result(config, result).expect("create result"); + + let response = apis::workflows_api::get_slurm_job_correlations(config, workflow_id, None, None) + .expect("get slurm job correlations"); + + assert_eq!(response.items.len(), 1, "expected one correlation row"); + assert_eq!(response.total_count, 1); + assert!(!response.has_more); + let item = &response.items[0]; + assert_eq!(item.slurm_job_id, "987654"); + assert_eq!(item.job_id, job_id); + assert_eq!(item.job_name, "slurm_job"); +} + +#[rstest] +fn test_get_slurm_job_correlations_not_found(start_server: &ServerProcess) { + let config = &start_server.config; + let result = apis::workflows_api::get_slurm_job_correlations(config, 999_999, None, None); + assert!( + result.is_err(), + "expected error for nonexistent workflow correlations" + ); +} + +#[rstest] +fn test_get_running_jobs(start_server: &ServerProcess) { + let config = &start_server.config; + let workflow = create_test_workflow(config, "running_jobs_workflow"); + let workflow_id = workflow.id.unwrap(); + let job = create_test_job(config, workflow_id, "running_job"); + let job_id = job.id.unwrap(); + + apis::workflows_api::initialize_jobs(config, workflow_id, None, None, None) + .expect("initialize jobs"); + let workflow = apis::workflows_api::get_workflow(config, workflow_id).expect("get workflow"); + let run_id = workflow.run_id.unwrap_or(0); + + // Slurm allocation plus a compute node that belongs to it (scheduler JSON + // links the node back to the scheduled compute node). + let scn = apis::scheduled_compute_nodes_api::create_scheduled_compute_node( + config, + models::ScheduledComputeNodesModel::new( + workflow_id, + 555111, + 1, + "slurm".to_string(), + "active".to_string(), + ), + ) + .expect("create scheduled compute node"); + let scn_id = scn.id.unwrap(); + let compute_node = apis::compute_nodes_api::create_compute_node( + config, + models::ComputeNodeModel::new( + workflow_id, + "node0099".to_string(), + std::process::id() as i64, + chrono::Utc::now().to_rfc3339(), + 8, + 16.0, + 0, + 1, + "slurm".to_string(), + Some(serde_json::json!({ "scheduler_id": scn_id })), + ), + ) + .expect("create compute node"); + let compute_node_id = compute_node.id.unwrap(); + + // Move the job to Running on that node. + apis::workflows_api::claim_next_jobs(config, workflow_id, Some(1)).expect("claim job"); + apis::jobs_api::start_job(config, job_id, run_id, compute_node_id).expect("start job"); + + let response = apis::workflows_api::get_running_jobs(config, workflow_id, None, None) + .expect("get running jobs"); + + assert_eq!(response.total_count, 1); + assert!(!response.has_more); + assert_eq!(response.items.len(), 1); + let item = &response.items[0]; + assert_eq!(item.job_id, job_id); + assert_eq!(item.job_name, "running_job"); + assert_eq!(item.compute_node_name, "node0099"); + assert_eq!(item.scheduler_type, "slurm"); + assert_eq!(item.scheduler_job_id.as_deref(), Some("555111")); + // start_job sets the job's start_time, surfaced here for elapsed-time display. + assert!( + item.start_time.is_some(), + "expected start_time to be populated" + ); +} + +#[rstest] +fn test_get_running_jobs_not_found(start_server: &ServerProcess) { + let config = &start_server.config; + let result = apis::workflows_api::get_running_jobs(config, 999_999, None, None); + assert!(result.is_err(), "expected error for nonexistent workflow"); +}