11from DIRAC .Core .Utilities .ReturnValues import convertToReturnValue
22
33from DIRAC .Core .Security .DiracX import DiracXClient
4+ from DIRAC .WorkloadManagementSystem .DB .JobDBUtils import extractJDL
45
56
67class JobMonitoringClient :
78 def fetch (self , parameters , jobIDs ):
9+ if not isinstance (jobIDs , list ):
10+ jobIDs = [jobIDs ]
811 with DiracXClient () as api :
912 jobs = api .jobs .search (
1013 parameters = ["JobID" ] + parameters ,
11- search = [{"parameter" : "JobID" , "operator" : "in" , "values" : jobIDs }],
14+ search = [{"parameter" : "JobID" , "operator" : "in" , "values" : jobIDs }], # type: ignore
1215 )
1316 return {j ["JobID" ]: {param : j [param ] for param in parameters } for j in jobs }
1417
@@ -20,6 +23,185 @@ def getJobsMinorStatus(self, jobIDs):
2023 def getJobsStates (self , jobIDs ):
2124 return self .fetch (["Status" , "MinorStatus" , "ApplicationStatus" ], jobIDs )
2225
26+ @convertToReturnValue
27+ def getJobsApplicationStatus (self , jobIDs ):
28+ return self .fetch (["ApplicationStatus" ], jobIDs )
29+
2330 @convertToReturnValue
2431 def getJobsSites (self , jobIDs ):
2532 return self .fetch (["Site" ], jobIDs )
33+
34+ @convertToReturnValue
35+ def getJobsStatus (self , jobIDs ):
36+ return self .fetch (["Status" ], jobIDs )
37+
38+ @convertToReturnValue
39+ def getJobParameter (self , jobID , parName = "" ):
40+ with DiracXClient () as api :
41+ par = [] if not parName else [parName ]
42+ return api .jobs .get_job_parameters (job_id = jobID , parameters = [par ])
43+
44+ # @convertToReturnValue
45+ # def getJobParameters(self, jobIDs, parName=""):
46+ # with DiracXClient() as api:
47+ # par = [] if not parName else [parName]
48+ # return api.jobs.get_job_parameters(job_id=jobID, parameters=[parName])
49+
50+ @convertToReturnValue
51+ def getInputData (self , jobIDs ):
52+ with DiracXClient () as api :
53+ # It's a mess
54+ if isinstance (jobIDs , int ):
55+ jobIDs = [jobIDs ]
56+ elif isinstance (jobIDs , str ):
57+ jobIDs = [int (jobIDs )]
58+ elif isinstance (jobIDs , list ):
59+ if isinstance (jobIDs [0 ], str ):
60+ jobIDs = [int (jobID ) for jobID in jobIDs ]
61+
62+ reqDict = {}
63+
64+ for jobID in jobIDs :
65+ reqDict [jobID ] = api .jobs .get_input_data (jobID ) # type: ignore
66+
67+ def cleanLFN (data ):
68+ lfn = data ["LFN" ]
69+ if lfn .lower ().startswith ("lfn:" ):
70+ return lfn [4 :]
71+ return lfn
72+
73+ # WARNING /!\: Old code is super bizarre.
74+ # It can return list as well as dict...
75+ # By default, it's better to return a dict.
76+
77+ return_dict = {}
78+ for data in reqDict :
79+ job_id = data ["JobID" ]
80+ if not job_id in return_dict :
81+ return_dict [job_id ] = []
82+
83+ return_dict [job_id ].append (cleanLFN (data ))
84+
85+ return return_dict
86+
87+ def convert_condDict_to_searchSpec (self , condDict ):
88+ """
89+ Convert condDict to list of SearchSpec dicts.
90+
91+ - If key is str: one SearchSpec
92+ - If key is tuple: multiple SearchSpec (one per key in tuple)
93+ - If value is dict: skip this condDict entry
94+ - list value -> operator 'in'
95+ - scalar value -> operator 'eq'
96+ """
97+ search_list = []
98+
99+ if not condDict :
100+ return search_list
101+
102+ for key , val in condDict .items ():
103+ if isinstance (val , dict ):
104+ # abandon this condition entry
105+ continue
106+
107+ # Normalize keys: if tuple, treat each element as a separate key
108+ keys = [key ] if isinstance (key , str ) else list (key )
109+
110+ for k in keys :
111+ if not isinstance (k , str ):
112+ # skip non-str keys silently (or raise error if you want)
113+ continue
114+
115+ if isinstance (val , list ):
116+ search_list .append ({"parameter" : k , "operator" : "in" , "values" : val })
117+ else :
118+ search_list .append ({"parameter" : k , "operator" : "eq" , "value" : val })
119+
120+ return search_list
121+
122+ @convertToReturnValue
123+ def getJobs (self , attrDict = {}, cutDate = None ):
124+ # Get job >>>IDS<<<
125+ with DiracXClient () as api :
126+ search = self .convert_condDict_to_searchSpec (attrDict )
127+
128+ if cutDate :
129+ # TODO: Verify the date format!
130+ search .append ({"parameter" : "LastUpdateTime" , "operator" : "gt" , "value" : str (cutDate )})
131+
132+ jobs = api .jobs .search (parameters = ["JobID" ], search = search )
133+
134+ # Returns a **string**??, cf test_JobStateUpdateAndJobMonitoringMultiple
135+ return [str (job ["JobID" ]) for job in jobs ]
136+
137+ @convertToReturnValue
138+ def getJobJDL (self , jobID , original ):
139+ with DiracXClient () as api :
140+ jdl = api .jobs .get_job_jdl (job_id = jobID )
141+
142+ if not jdl :
143+ return ""
144+
145+ if original :
146+ return extractJDL (jdl .original_jdl )
147+
148+ return extractJDL (jdl .jdl )
149+
150+ @convertToReturnValue
151+ def getJobLoggingInfo (self , jobID ):
152+ fields = ["DateTime" , "Source" , "Status" , "MinorStatus" , "ApplicationStatus" ] # CF Dirac.py
153+
154+ jobs = self .fetch (parameters = ["LoggingInfo" ] + fields , jobIDs = [jobID ])
155+
156+ # Normally, only one job
157+ if len (jobs ) == 0 :
158+ return []
159+ job = jobs [0 ]
160+
161+ # Rearrange into a list, CF Dirac.py
162+ res = []
163+ for field in fields :
164+ res .append (job .get (field , "" ))
165+
166+ return res
167+
168+ @convertToReturnValue
169+ def getJobSummary (self , jobID ):
170+ with DiracXClient () as api :
171+ # TODO: Fix diracx client...
172+ jobs = api .jobs .summary (
173+ grouping = [], search = [{"parameter" : "JobID" , "operator" : "eq" , "value" : jobID }] # type: ignore
174+ )
175+
176+ if jobs :
177+ return jobs [0 ]
178+ return []
179+
180+ @convertToReturnValue
181+ def getJobsSummary (self , jobIDs ):
182+ return self .fetch (parameters = [], jobIDs = jobIDs )
183+
184+ @convertToReturnValue
185+ def getJobAttributes (self , jobID , attrList = None ):
186+ if not attrList :
187+ attrList = []
188+
189+ return self .fetch (parameters = attrList , jobIDs = [jobID ])
190+
191+ @convertToReturnValue
192+ def getJobAttribute (self , jobID , attribute ):
193+ return self .fetch (parameters = [attribute ], jobIDs = [jobID ])
194+
195+ @convertToReturnValue
196+ def getJobHeartBeatData (self , jobID ):
197+ with DiracXClient () as api :
198+ res = api .jobs .get_job_heartbeat_info (job_id = jobID )
199+
200+ result = []
201+ for row in res :
202+ name = row ["Name" ]
203+ value = str (row ["Value" ])
204+ heartbeattime = row ["HeartBeatTime" ]
205+ # ('name', '"0.1"', 'time')
206+ # :)
207+ result .append ((str (name ), "%.01f" % (float (value .replace ('"' , "" ))), str (heartbeattime )))
0 commit comments