diff --git a/.gitignore b/.gitignore index 1311f7c..48d648b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ vendor/ promotion/ dev-server/ Dockerfile.dev -terraform/ \ No newline at end of file +terraform/ +dev/ \ No newline at end of file diff --git a/README.md b/README.md index f4b7e94..65715f9 100644 --- a/README.md +++ b/README.md @@ -281,7 +281,7 @@ make deploy ### Standalone GPU Metrics CLI -Collect GPU metrics without Kubernetes — works on bare metal, SLURM jobs, Singularity containers. +Collect GPU metrics without Kubernetes — works on bare metal, SLURM jobs, Flux jobs, and Singularity containers. ```bash gpu-metrics # one-shot table output @@ -291,6 +291,20 @@ gpu-metrics --interval 5s # continuous collection gpu-metrics --device 0 --quiet # single GPU, no logs ``` +**SLURM** — auto-detected when `SLURM_JOB_ID` is set. Collects only the GPUs assigned to your job step: + +```bash +srun --gres=gpu:2 gpu-metrics --format json +``` + +**Flux** — auto-detected when `FLUX_JOB_ID` is set. Collects only the GPUs in `CUDA_VISIBLE_DEVICES`: + +```bash +flux run -N1 -g2 gpu-metrics --format json +``` + +See **[HPC Integration](docs/hpc.md)** for SLURM/Flux usage, CSV context columns, and Singularity examples. + Or build the Docker image directly: ```bash @@ -318,6 +332,7 @@ docker push your-registry/keda-gpu-scaler:v0.1.0 - **[Design Document](docs/DESIGN.md)** — Architecture decisions, gRPC interface, scaling profiles, testing strategy - **[Migration Guide](docs/MIGRATION.md)** — Replace dcgm-exporter + Prometheus with keda-gpu-scaler +- **[HPC Integration](docs/hpc.md)** — SLURM and Flux workload manager support - **[FAQ](docs/FAQ.md)** — Common questions about GPU scaling, MIG, multi-GPU, scale-to-zero - **[Changelog](CHANGELOG.md)** — Release history @@ -347,8 +362,8 @@ Using keda-gpu-scaler? Add your organization to [ADOPTERS.md](ADOPTERS.md). ### In Progress (v0.5.0) -- [ ] SLURM workload manager integration ([#52](https://github.com/pmady/keda-gpu-scaler/issues/52)) -- [ ] Flux workload manager integration ([#53](https://github.com/pmady/keda-gpu-scaler/issues/53)) +- [x] SLURM workload manager integration ([#52](https://github.com/pmady/keda-gpu-scaler/issues/52)) +- [x] Flux workload manager integration ([#53](https://github.com/pmady/keda-gpu-scaler/issues/53)) - [ ] Cross-environment GPU metrics parity — K8s, SLURM, Flux ([#54](https://github.com/pmady/keda-gpu-scaler/issues/54)) - [ ] NVIDIA collaboration for cross-platform metrics diff --git a/cmd/gpu-metrics/main.go b/cmd/gpu-metrics/main.go index 792c573..9d90da5 100644 --- a/cmd/gpu-metrics/main.go +++ b/cmd/gpu-metrics/main.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/pmady/keda-gpu-scaler/pkg/flux" "github.com/pmady/keda-gpu-scaler/pkg/gpu" "github.com/pmady/keda-gpu-scaler/pkg/slurm" "go.uber.org/zap" @@ -38,6 +39,7 @@ var ( device = flag.Int("device", -1, "GPU device index (-1 = all)") quiet = flag.Bool("quiet", false, "Suppress log output") slurmMode = flag.String("slurm", "auto", "SLURM mode: auto, on, off") + fluxMode = flag.String("flux", "auto", "Flux mode: auto, on, off") ) func main() { @@ -70,14 +72,27 @@ func main() { } } + // detect Flux (mutually exclusive with SLURM in practice, but both are allowed) + var fluxCtx *flux.JobContext + if slurmCtx == nil && useFlux(*fluxMode) { + ctx := flux.FromEnv() + fluxCtx = &ctx + if !*quiet { + logger.Info("Flux job detected", + zap.String("job_id", ctx.JobID), + zap.Int("task_rank", ctx.TaskRank), + zap.String("gpus", ctx.GPUs)) + } + } + // one-shot if *interval <= 0 { - metrics, err := collect(collector, slurmCtx) + metrics, err := collect(collector, slurmCtx, fluxCtx) if err != nil { fmt.Fprintf(os.Stderr, "collection failed: %v\n", err) os.Exit(1) } - output(metrics, *format, slurmCtx) + output(metrics, *format, slurmCtx, fluxCtx) return } @@ -89,11 +104,11 @@ func main() { defer ticker.Stop() for { - metrics, err := collect(collector, slurmCtx) + metrics, err := collect(collector, slurmCtx, fluxCtx) if err != nil { fmt.Fprintf(os.Stderr, "collection failed: %v\n", err) } else { - output(metrics, *format, slurmCtx) + output(metrics, *format, slurmCtx, fluxCtx) } select { @@ -115,7 +130,20 @@ func useSLURM(mode string) bool { } } -func collect(c gpu.MetricsCollector, sctx *slurm.JobContext) ([]gpu.Metrics, error) { +func useFlux(mode string) bool { + switch mode { + case "on": + return true + case "off": + return false + default: // auto + return flux.Detect() + } +} + +// collect gathers metrics for the appropriate set of GPUs. +// Priority: --device flag > scheduler-assigned GPUs > all GPUs. +func collect(c gpu.MetricsCollector, sctx *slurm.JobContext, fctx *flux.JobContext) ([]gpu.Metrics, error) { if *device >= 0 { m, err := c.CollectDevice(*device) if err != nil { @@ -124,69 +152,89 @@ func collect(c gpu.MetricsCollector, sctx *slurm.JobContext) ([]gpu.Metrics, err return []gpu.Metrics{m}, nil } - // if inside SLURM, only collect assigned GPUs + // SLURM: collect only assigned GPUs if sctx != nil { - devs := sctx.VisibleDevices() - if len(devs) > 0 { - var out []gpu.Metrics - for _, idx := range devs { - m, err := c.CollectDevice(idx) - if err != nil { - return nil, fmt.Errorf("gpu %d: %w", idx, err) - } - out = append(out, m) - } - return out, nil + if devs := sctx.VisibleDevices(); len(devs) > 0 { + return collectDevices(c, devs) + } + } + + // Flux: collect only assigned GPUs + if fctx != nil { + if devs := fctx.VisibleDevices(); len(devs) > 0 { + return collectDevices(c, devs) } } return c.CollectAll() } -func output(metrics []gpu.Metrics, format string, sctx *slurm.JobContext) { +// collectDevices collects metrics for an explicit list of device indices. +func collectDevices(c gpu.MetricsCollector, devs []int) ([]gpu.Metrics, error) { + out := make([]gpu.Metrics, 0, len(devs)) + for _, idx := range devs { + m, err := c.CollectDevice(idx) + if err != nil { + return nil, fmt.Errorf("gpu %d: %w", idx, err) + } + out = append(out, m) + } + return out, nil +} + +func output(metrics []gpu.Metrics, format string, sctx *slurm.JobContext, fctx *flux.JobContext) { switch format { case "json": - outputJSON(metrics, sctx) + outputJSON(metrics, sctx, fctx) case "csv": - outputCSV(metrics, sctx) + outputCSV(metrics, sctx, fctx) default: - outputTable(metrics, sctx) + outputTable(metrics, sctx, fctx) } } type jsonOutput struct { SLURM *slurm.JobContext `json:"slurm,omitempty"` + Flux *flux.JobContext `json:"flux,omitempty"` Devices []gpu.Metrics `json:"devices"` } -func outputJSON(metrics []gpu.Metrics, sctx *slurm.JobContext) { +func outputJSON(metrics []gpu.Metrics, sctx *slurm.JobContext, fctx *flux.JobContext) { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") - _ = enc.Encode(jsonOutput{SLURM: sctx, Devices: metrics}) + _ = enc.Encode(jsonOutput{SLURM: sctx, Flux: fctx, Devices: metrics}) } -func outputCSV(metrics []gpu.Metrics, sctx *slurm.JobContext) { +func outputCSV(metrics []gpu.Metrics, sctx *slurm.JobContext, fctx *flux.JobContext) { w := csv.NewWriter(os.Stdout) hdr := csvHeader() if sctx != nil { hdr = append(sctx.Header(), hdr...) + } else if fctx != nil { + hdr = append(fctx.Header(), hdr...) } _ = w.Write(hdr) for _, m := range metrics { row := csvRow(m) if sctx != nil { row = append(sctx.Row(), row...) + } else if fctx != nil { + row = append(fctx.Row(), row...) } _ = w.Write(row) } w.Flush() } -func outputTable(metrics []gpu.Metrics, sctx *slurm.JobContext) { +func outputTable(metrics []gpu.Metrics, sctx *slurm.JobContext, fctx *flux.JobContext) { if sctx != nil { fmt.Printf("SLURM Job %s (%s) — node %s, rank %d, gpus [%s]\n\n", sctx.JobID, sctx.JobName, sctx.NodeName, sctx.ProcID, sctx.GPUs) } + if fctx != nil { + fmt.Printf("Flux Job %s — task rank %d, local rank %d, gpus [%s]\n\n", + fctx.JobID, fctx.TaskRank, fctx.LocalID, fctx.GPUs) + } fmt.Printf("%-5s %-20s %6s %6s %10s %10s %6s %6s %10s %10s %10s %10s\n", "GPU", "Name", "Util%", "Mem%", "MemUsed", "MemTotal", "Temp", "Power", "PCIeTx", "PCIeRx", "NVLTx", "NVLRx") diff --git a/docs/hpc.md b/docs/hpc.md new file mode 100644 index 0000000..fb27cf4 --- /dev/null +++ b/docs/hpc.md @@ -0,0 +1,170 @@ +# HPC Workload Manager Integration + +The standalone `gpu-metrics` CLI collects GPU metrics via NVML without requiring Kubernetes or KEDA. It auto-detects common HPC schedulers and scopes metrics to the GPUs allocated to your job. + +> [!NOTE] +> `gpu-metrics` requires `libnvidia-ml.so` (installed with the NVIDIA driver) on the host. It exits immediately with `nvml init failed` on machines without an NVIDIA driver. + +--- + +## SLURM + +[SLURM](https://slurm.schedmd.com/) is the dominant workload manager in academic and government HPC clusters. When you launch `gpu-metrics` inside a SLURM job, it automatically reads `SLURM_JOB_ID` and the assigned GPU indices from `SLURM_STEP_GPUS` (falling back to `SLURM_JOB_GPUS`, `GPU_DEVICE_ORDINAL`, then `CUDA_VISIBLE_DEVICES`). + +### Detection + +SLURM mode activates automatically (`--slurm auto`, the default) when `SLURM_JOB_ID` is set. You can force it on or off: + +```bash +gpu-metrics --slurm auto # default: activate if inside a SLURM job +gpu-metrics --slurm on # always treat as SLURM job +gpu-metrics --slurm off # ignore SLURM environment +``` + +### Usage + +```bash +# One-shot table output — only shows GPUs allocated to this job +srun --gres=gpu:2 gpu-metrics + +# JSON with SLURM job context +srun --gres=gpu:2 gpu-metrics --format json + +# Continuous collection every 5 seconds +srun --gres=gpu:2 gpu-metrics --interval 5s --format csv + +# From a batch script +#SBATCH --gres=gpu:4 +gpu-metrics --format json > gpu-metrics-$SLURM_JOB_ID.json +``` + +### JSON output + +When SLURM is detected, a `slurm` block is included in the JSON output: + +```json +{ + "slurm": { + "JobID": "98765", + "JobName": "train-llm", + "Partition": "gpu-a100", + "NodeName": "node02", + "ProcID": 8, + "LocalID": 2, + "GPUs": "0,1,2,3" + }, + "devices": [...] +} +``` + +### GPU assignment + +SLURM exposes assigned GPUs via these env vars, checked in priority order: + +| Variable | Description | +|----------|-------------| +| `SLURM_STEP_GPUS` | GPUs for the current step (most specific) | +| `SLURM_JOB_GPUS` | GPUs for the whole job | +| `GPU_DEVICE_ORDINAL` | Alternative GPU ordinal variable | +| `CUDA_VISIBLE_DEVICES` | CUDA-level restriction (fallback) | + +--- + +## Flux + +[Flux](https://flux-framework.org/) is a next-generation workload manager developed at Lawrence Livermore National Laboratory (LLNL). It is gaining adoption at DOE national labs and is designed for heterogeneous hardware including GPUs. When you launch `gpu-metrics` inside a Flux job, it reads `FLUX_JOB_ID` and the assigned GPUs from `CUDA_VISIBLE_DEVICES`, which Flux sets automatically when GPU affinity is active (the default for jobs submitted with `-g N`). + +### Detection + +Flux mode activates automatically (`--flux auto`, the default) when `FLUX_JOB_ID` is set. You can force it on or off: + +```bash +gpu-metrics --flux auto # default: activate if inside a Flux job +gpu-metrics --flux on # always treat as Flux job +gpu-metrics --flux off # ignore Flux environment +``` + +### Usage + +```bash +# One-shot table output — only shows GPUs allocated to this task +flux run -N1 -g1 gpu-metrics + +# JSON with Flux job context +flux run -N1 -g2 gpu-metrics --format json + +# Continuous collection every 5 seconds +flux run -N1 -g4 gpu-metrics --interval 5s --format json + +# Multi-node: each task collects its own assigned GPUs +flux run -N4 -g2 --tasks-per-node=1 gpu-metrics --format json +``` + +### JSON output + +When Flux is detected, a `flux` block is included in the JSON output: + +```json +{ + "flux": { + "JobID": "f23r45t", + "TaskRank": 4, + "LocalID": 0, + "NumTasks": 8, + "NumNodes": 2, + "GPUs": "0,1" + }, + "devices": [...] +} +``` + +### GPU assignment + +Flux sets `CUDA_VISIBLE_DEVICES` automatically when a job requests GPUs with `-g N` or `--gpus-per-task=N`. NVML honours this restriction, so `gpu-metrics` already sees only the allocated devices. The `flux` JSON block records the original `CUDA_VISIBLE_DEVICES` value for reference. + +> [!IMPORTANT] +> If you submit a Flux job **without** GPU affinity (no `-g` flag), `CUDA_VISIBLE_DEVICES` will not be set and `gpu-metrics` will collect from all GPUs on the node. Always submit with `-g N` for correct per-task isolation. + +--- + +## Scheduler auto-detection + +`gpu-metrics` checks schedulers in this order and uses the first one it finds: + +1. **SLURM** — if `SLURM_JOB_ID` is set (checked first) +2. **Flux** — if `FLUX_JOB_ID` is set (only if SLURM was not detected) +3. **Bare metal** — collect from all visible GPUs + +To disable auto-detection for both: `--slurm off --flux off` + +--- + +## CSV output with scheduler context + +When a scheduler is active, the scheduler columns are prepended to each CSV row: + +**SLURM:** +``` +JobID,JobName,Partition,Node,Rank,LocalRank,GPUs,index,uuid,name,... +98765,train-llm,gpu-a100,node02,8,2,0,0,GPU-aaa,A100,... +``` + +**Flux:** +``` +FluxJobID,TaskRank,LocalRank,GPUs,index,uuid,name,... +f23r45t,4,0,0,0,0,GPU-bbb,H100,... +``` + +--- + +## Singularity / Apptainer containers + +`gpu-metrics` works inside Singularity/Apptainer containers on SLURM or Flux nodes. The NVIDIA runtime passes `CUDA_VISIBLE_DEVICES` into the container, and scheduler env vars are inherited automatically: + +```bash +# SLURM + Singularity +srun --gres=gpu:2 singularity exec --nv gpu-metrics.sif gpu-metrics --format json + +# Flux + Singularity +flux run -N1 -g2 singularity exec --nv gpu-metrics.sif gpu-metrics --format json +``` diff --git a/mkdocs.yml b/mkdocs.yml index 8e07346..1823aca 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -34,6 +34,7 @@ nav: - Architecture: DESIGN.md - Migration Guide: MIGRATION.md - Configuration: configuration.md + - HPC Integration: hpc.md - FAQ: FAQ.md - Changelog: changelog.md - Contributing: contributing.md diff --git a/pkg/flux/flux.go b/pkg/flux/flux.go new file mode 100644 index 0000000..ff8f2f9 --- /dev/null +++ b/pkg/flux/flux.go @@ -0,0 +1,107 @@ +/* +Copyright 2026 The keda-gpu-scaler Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package flux detects and exposes Flux workload manager job context. +// It mirrors the pkg/slurm API so gpu-metrics can treat both HPC +// schedulers uniformly. +package flux + +import ( + "os" + "strconv" + "strings" +) + +// JobContext holds Flux job metadata read from environment variables. +// Flux sets these unconditionally for every task launched with `flux run` or `flux submit`. +type JobContext struct { + JobID string + TaskRank int + LocalID int + NumTasks int + NumNodes int + URI string + GPUs string +} + +// Detect returns true if the current process is running inside a Flux job. +func Detect() bool { + _, ok := os.LookupEnv("FLUX_JOB_ID") + return ok +} + +// FromEnv parses the Flux environment variables into a JobContext. +func FromEnv() JobContext { + return JobContext{ + JobID: os.Getenv("FLUX_JOB_ID"), + TaskRank: envInt("FLUX_TASK_RANK"), + LocalID: envInt("FLUX_TASK_LOCAL_ID"), + NumTasks: envInt("FLUX_JOB_SIZE"), + NumNodes: envInt("FLUX_JOB_NNODES"), + URI: os.Getenv("FLUX_URI"), + GPUs: fluxGPUs(), + } +} + +// Header returns column names for table/CSV output. +func (j JobContext) Header() []string { + return []string{"FluxJobID", "TaskRank", "LocalRank", "GPUs"} +} + +// Row returns the values matching Header(). +func (j JobContext) Row() []string { + return []string{ + j.JobID, + strconv.Itoa(j.TaskRank), + strconv.Itoa(j.LocalID), + j.GPUs, + } +} + +// VisibleDevices parses GPUs into a slice of integer device indices. +// Non-numeric entries (e.g. MIG UUIDs) are silently skipped because +// per-instance MIG metrics are not yet supported. +func (j JobContext) VisibleDevices() []int { + if j.GPUs == "" { + return nil + } + parts := strings.Split(j.GPUs, ",") + devs := make([]int, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if idx, err := strconv.Atoi(p); err == nil { + devs = append(devs, idx) + } + } + return devs +} + +// fluxGPUs reads the GPU device indices allocated to this Flux task. +// Flux sets CUDA_VISIBLE_DEVICES automatically when a job is submitted +// with -g / --gpus-per-task and GPU affinity is enabled (the default). +// There is no Flux-specific env var equivalent to SLURM_STEP_GPUS, so +// CUDA_VISIBLE_DEVICES is the canonical source. +func fluxGPUs() string { + if v := os.Getenv("CUDA_VISIBLE_DEVICES"); v != "" { + return v + } + return "" +} + +func envInt(key string) int { + v, _ := strconv.Atoi(os.Getenv(key)) + return v +} diff --git a/pkg/flux/flux_test.go b/pkg/flux/flux_test.go new file mode 100644 index 0000000..9776089 --- /dev/null +++ b/pkg/flux/flux_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2026 The keda-gpu-scaler Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flux + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func setEnv(t *testing.T, kvs map[string]string) { + t.Helper() + for k, v := range kvs { + t.Setenv(k, v) + } +} + +func TestDetect(t *testing.T) { + tests := []struct { + name string + env map[string]string + want bool + }{ + { + name: "inside flux job", + env: map[string]string{"FLUX_JOB_ID": "f23r45t"}, + want: true, + }, + { + name: "outside flux", + env: map[string]string{}, + want: false, + }, + { + name: "empty job id still counts", + env: map[string]string{"FLUX_JOB_ID": ""}, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + os.Clearenv() + setEnv(t, tt.env) + assert.Equal(t, tt.want, Detect()) + }) + } +} + +func TestFromEnv(t *testing.T) { + tests := []struct { + name string + env map[string]string + want JobContext + }{ + { + name: "full flux environment", + env: map[string]string{ + "FLUX_JOB_ID": "f23r45t", + "FLUX_TASK_RANK": "4", + "FLUX_TASK_LOCAL_ID": "0", + "FLUX_JOB_SIZE": "8", + "FLUX_JOB_NNODES": "2", + "FLUX_URI": "local:///run/flux/local", + "CUDA_VISIBLE_DEVICES": "0,1", + }, + want: JobContext{ + JobID: "f23r45t", + TaskRank: 4, + LocalID: 0, + NumTasks: 8, + NumNodes: 2, + URI: "local:///run/flux/local", + GPUs: "0,1", + }, + }, + { + name: "minimal - job id only", + env: map[string]string{ + "FLUX_JOB_ID": "abc123", + }, + want: JobContext{ + JobID: "abc123", + }, + }, + { + name: "empty env", + env: map[string]string{}, + want: JobContext{}, + }, + { + name: "single gpu", + env: map[string]string{ + "FLUX_JOB_ID": "g99", + "FLUX_TASK_RANK": "0", + "FLUX_TASK_LOCAL_ID": "0", + "FLUX_JOB_SIZE": "1", + "FLUX_JOB_NNODES": "1", + "CUDA_VISIBLE_DEVICES": "2", + }, + want: JobContext{ + JobID: "g99", + TaskRank: 0, + LocalID: 0, + NumTasks: 1, + NumNodes: 1, + GPUs: "2", + }, + }, + { + name: "8-gpu DGX node", + env: map[string]string{ + "FLUX_JOB_ID": "h100job", + "FLUX_TASK_RANK": "0", + "FLUX_TASK_LOCAL_ID": "0", + "FLUX_JOB_SIZE": "1", + "FLUX_JOB_NNODES": "1", + "CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7", + }, + want: JobContext{ + JobID: "h100job", + TaskRank: 0, + LocalID: 0, + NumTasks: 1, + NumNodes: 1, + GPUs: "0,1,2,3,4,5,6,7", + }, + }, + { + name: "multi-node MPI-style job", + env: map[string]string{ + "FLUX_JOB_ID": "mpirun42", + "FLUX_TASK_RANK": "16", + "FLUX_TASK_LOCAL_ID": "2", + "FLUX_JOB_SIZE": "64", + "FLUX_JOB_NNODES": "8", + "CUDA_VISIBLE_DEVICES": "2,3", + }, + want: JobContext{ + JobID: "mpirun42", + TaskRank: 16, + LocalID: 2, + NumTasks: 64, + NumNodes: 8, + GPUs: "2,3", + }, + }, + { + name: "bad int values default to zero", + env: map[string]string{ + "FLUX_JOB_ID": "xyz", + "FLUX_TASK_RANK": "not-a-number", + "FLUX_JOB_SIZE": "", + "FLUX_TASK_LOCAL_ID": "abc", + }, + want: JobContext{ + JobID: "xyz", + }, + }, + { + name: "no CUDA_VISIBLE_DEVICES means no GPUs", + env: map[string]string{ + "FLUX_JOB_ID": "cpuonly", + "FLUX_TASK_RANK": "0", + "FLUX_JOB_SIZE": "4", + "FLUX_JOB_NNODES": "1", + }, + want: JobContext{ + JobID: "cpuonly", + NumTasks: 4, + NumNodes: 1, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + os.Clearenv() + setEnv(t, tt.env) + got := FromEnv() + assert.Equal(t, tt.want, got) + }) + } +} + +func TestVisibleDevices(t *testing.T) { + tests := []struct { + name string + gpus string + want []int + }{ + {name: "multi gpu", gpus: "0,1,2,3", want: []int{0, 1, 2, 3}}, + {name: "single gpu", gpus: "2", want: []int{2}}, + {name: "empty", gpus: "", want: nil}, + {name: "with spaces", gpus: "0, 1, 3", want: []int{0, 1, 3}}, + {name: "mig uuid skipped", gpus: "GPU-abc123,1", want: []int{1}}, + {name: "all garbage", gpus: "foo,bar", want: []int{}}, + {name: "trailing comma", gpus: "0,1,", want: []int{0, 1}}, + {name: "high indices", gpus: "4,5,6,7", want: []int{4, 5, 6, 7}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := JobContext{GPUs: tt.gpus} + assert.Equal(t, tt.want, j.VisibleDevices()) + }) + } +} + +func TestHeaderRowAlignment(t *testing.T) { + j := JobContext{JobID: "test", TaskRank: 1} + assert.Equal(t, len(j.Header()), len(j.Row())) +} + +func TestRowValues(t *testing.T) { + j := JobContext{ + JobID: "f23r45t", + TaskRank: 4, + LocalID: 0, + GPUs: "0,1", + } + row := j.Row() + assert.Equal(t, "f23r45t", row[0]) + assert.Equal(t, "4", row[1]) + assert.Equal(t, "0", row[2]) + assert.Equal(t, "0,1", row[3]) +} + +func TestRowZeroValues(t *testing.T) { + j := JobContext{} + row := j.Row() + // TaskRank and LocalID should be "0", not empty + assert.Equal(t, "0", row[1]) + assert.Equal(t, "0", row[2]) +} + +func TestHeaderContents(t *testing.T) { + j := JobContext{} + hdr := j.Header() + assert.Contains(t, hdr, "FluxJobID") + assert.Contains(t, hdr, "TaskRank") + assert.Contains(t, hdr, "GPUs") +}