Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions snakemake_executor_plugin_slurm_jobstep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
import socket
import subprocess
import sys
from dataclasses import dataclass, field
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.real import RealExecutor
from snakemake_interface_executor_plugins.jobs import (
JobExecutorInterface,
)
from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings
from snakemake_interface_executor_plugins.settings import (
CommonSettings,
ExecMode,
ExecutorSettingsBase,
)
from snakemake_interface_common.exceptions import WorkflowError


Expand All @@ -38,6 +43,25 @@
)


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
"""Settings for the SLURM jobstep executor plugin."""

pass_command_as_script: bool = field(
default=False,
metadata={
"help": (
"Pass to srun the command to be executed as a shell script "
"(fed through stdin) instead of wrapping it in the command line "
"call. Useful when a limit exists on SLURM command line length (ie. "
"max_submit_line_size)."
),
"env_var": False,
"required": False,
},
)


# Required:
# Implementation of your executor
class Executor(RealExecutor):
Expand All @@ -58,6 +82,7 @@ def run_job(self, job: JobExecutorInterface):
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.

jobsteps = dict()
srun_script = None
# TODO revisit special handling for group job levels via srun at a later stage
# if job.is_group():

Expand Down Expand Up @@ -118,14 +143,29 @@ def run_job(self, job: JobExecutorInterface):

call = "srun -n1 --cpu-bind=q "
call += f" {get_cpu_setting(job, self.gpu_job)} "
call += f" {self.format_job_exec(job)}"
if self.workflow.executor_settings.pass_command_as_script:
# format the job to execute with all the snakemake parameters
# into a script
srun_script = self.format_job_exec(job)
# the process will read the srun script from stdin
call += " sh -s"
else:
call += f" {self.format_job_exec(job)}"

self.logger.debug(f"This job is a group job: {job.is_group()}")
self.logger.debug(f"The call for this job is: {call}")
self.logger.debug(f"Job is running on host: {socket.gethostname()}")
if srun_script is not None:
self.logger.debug(f"The script for this job is: \n{srun_script}")
# this dict is to support the to be implemented feature of oversubscription in
# "ordinary" group jobs.
jobsteps[job] = subprocess.Popen(call, shell=True)
jobsteps[job] = subprocess.Popen(
call, shell=True, text=True, stdin=subprocess.PIPE
)
if srun_script is not None:
# pass the srun bash script via stdin
jobsteps[job].stdin.write(srun_script)
jobsteps[job].stdin.close()
Comment on lines +165 to +168
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling for stdin write operation.

Writing to stdin can raise BrokenPipeError if the subprocess terminates prematurely. Without error handling, this would cause an unhandled exception with unclear error messaging. Wrap the write operation in try-except for better error reporting and robustness.

Apply this diff to add error handling:

         if srun_script is not None:
-            # pass the srun bash script via stdin
-            jobsteps[job].stdin.write(srun_script)
-            jobsteps[job].stdin.close()
+            try:
+                # pass the srun bash script via stdin
+                jobsteps[job].stdin.write(srun_script)
+                jobsteps[job].stdin.close()
+            except BrokenPipeError:
+                # subprocess terminated before reading stdin
+                self.logger.error(
+                    f"Failed to write script to stdin for job {job}. "
+                    "Subprocess may have terminated prematurely."
+                )
+                self.report_job_error(SubmittedJobInfo(job))
+                raise WorkflowError(
+                    f"Job {job} failed: subprocess terminated before reading script"
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if srun_script is not None:
# pass the srun bash script via stdin
jobsteps[job].stdin.write(srun_script)
jobsteps[job].stdin.close()
if srun_script is not None:
try:
# pass the srun bash script via stdin
jobsteps[job].stdin.write(srun_script)
jobsteps[job].stdin.close()
except BrokenPipeError:
# subprocess terminated before reading stdin
self.logger.error(
f"Failed to write script to stdin for job {job}. "
"Subprocess may have terminated prematurely."
)
self.report_job_error(SubmittedJobInfo(job))
raise WorkflowError(
f"Job {job} failed: subprocess terminated before reading script"
)
🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm_jobstep/__init__.py around lines 134-137, the
direct write to jobsteps[job].stdin can raise BrokenPipeError if the subprocess
exits; wrap the write and close in a try/except/finally block that catches
BrokenPipeError (and optionally OSError), logs a clear error message including
the exception details using the module's existing logger (e.g., logger or
process_logger), and ensures stdin is closed in finally to avoid resource leaks;
do not swallow unexpected exceptions unless re-raised after logging.


job_info = SubmittedJobInfo(job)
self.report_job_submission(job_info)
Expand Down