77import socket
88import subprocess
99import sys
10+ from dataclasses import dataclass , field
1011from snakemake_interface_executor_plugins .executors .base import SubmittedJobInfo
1112from snakemake_interface_executor_plugins .executors .real import RealExecutor
1213from snakemake_interface_executor_plugins .jobs import (
1314 JobExecutorInterface ,
1415)
15- from snakemake_interface_executor_plugins .settings import ExecMode , CommonSettings
16+ from snakemake_interface_executor_plugins .settings import (
17+ CommonSettings ,
18+ ExecMode ,
19+ ExecutorSettingsBase ,
20+ )
1621from snakemake_interface_common .exceptions import WorkflowError
1722
1823
3843)
3944
4045
46+ @dataclass
47+ class ExecutorSettings (ExecutorSettingsBase ):
48+ """Settings for the SLURM jobstep executor plugin."""
49+
50+ pass_command_as_script : bool = field (
51+ default = False ,
52+ metadata = {
53+ "help" : (
54+ "Pass to srun the command to be executed as a shell script "
55+ "(fed through stdin) instead of wrapping it in the command line "
56+ "call. Useful when a limit exists on SLURM command line length (ie. "
57+ "max_submit_line_size)."
58+ ),
59+ "env_var" : False ,
60+ "required" : False ,
61+ },
62+ )
63+
64+
4165# Required:
4266# Implementation of your executor
4367class Executor (RealExecutor ):
@@ -58,6 +82,7 @@ def run_job(self, job: JobExecutorInterface):
5882 # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.
5983
6084 jobsteps = dict ()
85+ srun_script = None
6186 # TODO revisit special handling for group job levels via srun at a later stage
6287 # if job.is_group():
6388
@@ -118,14 +143,27 @@ def run_job(self, job: JobExecutorInterface):
118143
119144 call = "srun -n1 --cpu-bind=q "
120145 call += f" { get_cpu_setting (job , self .gpu_job )} "
121- call += f" { self .format_job_exec (job )} "
146+ if self .workflow .executor_settings .pass_command_as_script :
147+ # format the job to execute with all the snakemake parameters
148+ # into a script
149+ srun_script = self .format_job_exec (job )
150+ # the process will read the srun script from stdin
151+ call += " sh -s"
122152
123153 self .logger .debug (f"This job is a group job: { job .is_group ()} " )
124154 self .logger .debug (f"The call for this job is: { call } " )
125155 self .logger .debug (f"Job is running on host: { socket .gethostname ()} " )
156+ if srun_script is not None :
157+ self .logger .debug (f"The script for this job is: \n { srun_script } " )
126158 # this dict is to support the to be implemented feature of oversubscription in
127159 # "ordinary" group jobs.
128- jobsteps [job ] = subprocess .Popen (call , shell = True )
160+ jobsteps [job ] = subprocess .Popen (
161+ call , shell = True , text = True , stdin = subprocess .PIPE
162+ )
163+ if srun_script is not None :
164+ # pass the srun bash script via stdin
165+ jobsteps [job ].stdin .write (srun_script )
166+ jobsteps [job ].stdin .close ()
129167
130168 job_info = SubmittedJobInfo (job )
131169 self .report_job_submission (job_info )
0 commit comments