Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ def submitJob(self, **kwargs):
resultDict["Jobs"] = []
for i in range(submittedJobs):
resultDict["Jobs"].append(".".join([cluster, str(i)]))
# Executable is transferred afterward
# Inform the caller that Condor cannot delete it before the end of the execution
resultDict["ExecutableToKeep"] = executable
else:
resultDict["Status"] = status
resultDict["Message"] = error
Expand Down
22 changes: 15 additions & 7 deletions src/DIRAC/Resources/Computing/BatchSystems/executeBatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
from urllib.parse import unquote as urlunquote


arguments = sys.argv[1]
inputDict = json.loads(urlunquote(arguments))
# Read options from JSON file
optionsFilePath = sys.argv[1]
with open(optionsFilePath, 'r') as f:
inputDict = json.load(f)

method = inputDict.pop('Method')
batchSystem = inputDict.pop('BatchSystem')
Expand All @@ -45,9 +47,15 @@
try:
result = getattr(batch, method)(**inputDict)
except Exception:
result = traceback.format_exc()

resultJson = urlquote(json.dumps(result))
print("============= Start output ===============")
print(resultJson)
# Wrap the traceback in a proper error structure
result = {
'Status': -1,
'Message': 'Exception during batch method execution',
'Traceback': traceback.format_exc()
}

# Write result to JSON file
resultFilePath = optionsFilePath.replace('.json', '_result.json')
with open(resultFilePath, 'w') as f:
json.dump(result, f)
"""
2 changes: 0 additions & 2 deletions src/DIRAC/Resources/Computing/LocalComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
batchSystemName = self.batchSystem.__class__.__name__.lower()
jobIDs = ["ssh" + batchSystemName + "://" + self.ceName + "/" + _id for _id in resultSubmit["Jobs"]]
result = S_OK(jobIDs)
if "ExecutableToKeep" in resultSubmit:
result["ExecutableToKeep"] = resultSubmit["ExecutableToKeep"]
else:
result = S_ERROR(resultSubmit["Message"])

Expand Down
99 changes: 61 additions & 38 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@
import os
import shutil
import stat
import tempfile
import uuid
from shlex import quote as shlex_quote
from urllib.parse import quote, unquote, urlparse
from urllib.parse import urlparse

import pexpect

Expand Down Expand Up @@ -484,47 +485,69 @@ def __executeHostCommand(self, command, options, ssh=None, host=None):
options["User"] = self.user
options["Queue"] = self.queue

options = json.dumps(options)
options = quote(options)
localOptionsFile = None
remoteOptionsFile = None
localResultFile = None
remoteResultFile = None
try:
# Write options to a local temporary file
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(options, f)
localOptionsFile = f.name

# Upload the options file to the remote host
remoteOptionsFile = f"{self.sharedArea}/batch_options_{uuid.uuid4().hex}.json"
result = ssh.scpCall(30, localOptionsFile, remoteOptionsFile)
if not result["OK"]:
return result

cmd = (
"bash --login -c 'python3 %s/execute_batch %s || python %s/execute_batch %s || python2 %s/execute_batch %s'"
% (self.sharedArea, options, self.sharedArea, options, self.sharedArea, options)
)
# Execute the batch command with the options file path
cmd = (
f"bash --login -c 'python3 {self.sharedArea}/execute_batch {remoteOptionsFile} || "
f"python {self.sharedArea}/execute_batch {remoteOptionsFile} || "
f"python2 {self.sharedArea}/execute_batch {remoteOptionsFile}'"
)

self.log.verbose(f"CE submission command: {cmd}")
self.log.verbose(f"CE submission command: {cmd}")

result = ssh.sshCall(120, cmd)
if not result["OK"]:
self.log.error(f"{self.ceType} CE job submission failed", result["Message"])
return result
result = ssh.sshCall(120, cmd)
if not result["OK"]:
self.log.error(f"{self.ceType} CE job submission failed", result["Message"])
return result

sshStatus = result["Value"][0]
sshStdout = result["Value"][1]
sshStderr = result["Value"][2]

# Examine results of the job submission
if sshStatus == 0:
output = sshStdout.strip().replace("\r", "").strip()
if not output:
return S_ERROR("No output from remote command")

try:
index = output.index("============= Start output ===============")
output = output[index + 42 :]
except ValueError:
return S_ERROR(f"Invalid output from remote command: {output}")

try:
output = unquote(output)
result = json.loads(output)
if isinstance(result, str) and result.startswith("Exception:"):
return S_ERROR(result)
return S_OK(result)
except Exception:
return S_ERROR("Invalid return structure from job submission")
else:
return S_ERROR("\n".join([sshStdout, sshStderr]))
sshStatus = result["Value"][0]
if sshStatus != 0:
sshStdout = result["Value"][1]
sshStderr = result["Value"][2]
return S_ERROR(f"CE job submission command failed with status {sshStatus}: {sshStdout} {sshStderr}")

# The result should be written to a JSON file by execute_batch
# Compute the expected result file path
remoteResultFile = remoteOptionsFile.replace(".json", "_result.json")

# Try to download the result file
with tempfile.NamedTemporaryFile(mode="r", suffix=".json", delete=False) as f:
localResultFile = f.name

result = ssh.scpCall(30, localResultFile, remoteResultFile, upload=False)
if not result["OK"]:
return result

# Read the result from the downloaded file
with open(localResultFile) as f:
result = json.load(f)
return S_OK(result)
finally:
# Clean up local temporary file
if localOptionsFile and os.path.exists(localOptionsFile):
os.remove(localOptionsFile)
if localResultFile and os.path.exists(localResultFile):
os.remove(localResultFile)
# Clean up remote temporary files
if remoteOptionsFile:
ssh.sshCall(30, f"rm -f {remoteOptionsFile}")
if remoteResultFile:
ssh.sshCall(30, f"rm -f {remoteResultFile}")

def submitJob(self, executableFile, proxy, numberOfJobs=1):
# self.log.verbose( "Executable file path: %s" % executableFile )
Expand Down
Loading