diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index b8f7b5bf1ce..1d228d08351 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -7,6 +7,7 @@ :caption: TransformationCleaningAgent options """ + # # imports import ast import errno @@ -69,7 +70,6 @@ def __init__(self, *args, **kwargs): self.jobDB = None self.pilotAgentsDB = None self.taskQueueDB = None - self.storageManagementDB = None # # transformations types self.transformationTypes = None @@ -144,11 +144,6 @@ def initialize(self): return result self.taskQueueDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() - return S_OK() ############################################################################# @@ -640,7 +635,6 @@ def __removeWMSTasks(self, transJobIDs): jobdb=self.jobDB, taskqueuedb=self.taskQueueDB, pilotagentsdb=self.pilotAgentsDB, - storagemanagementdb=self.storageManagementDB, ) # Prevent 0 job IDs jobIDs = [int(j) for j in transJobIDs if int(j)] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index de342224c05..bddf6d000d8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -31,15 +31,14 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.RequestManagementSystem.Client.File import File from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters -from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE @@ -55,6 +54,8 @@ def __init__(self, *args, **kwargs): # clients self.jobDB = None + self.taskQueueDB = None + self.pilotAgentsDB = None self.sandboxDB = None self.maxJobsAtOnce = 500 @@ -67,8 +68,25 @@ def __init__(self, *args, **kwargs): def initialize(self): """Sets defaults""" - self.jobDB = JobDB() - self.sandboxDB = SandboxMetadataDB() + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + self.jobDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") + if not result["OK"]: + return result + self.taskQueueDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + self.pilotAgentsDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") + if not result["OK"]: + return result + self.sandboxDB = result["Value"]() agentTSTypes = self.am_getOption("ProductionTypes", []) if agentTSTypes: @@ -239,7 +257,13 @@ def _deleteRemoveJobs(self, jobList, remove=False): wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup) result = wmsClient.removeJob(jobsList) else: - result = kill_delete_jobs(RIGHT_DELETE, jobsList) + result = kill_delete_jobs( + RIGHT_DELETE, + jobsList, + jobdb=self.jobDB, + taskqueuedb=self.taskQueueDB, + pilotagentsdb=self.pilotAgentsDB, + ) if not result["OK"]: self.log.error( f"Could not {'remove' if remove else 'delete'} jobs", diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 23092ef200c..437be824b43 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -41,7 +41,6 @@ def __init__(self, *args, **kwargs): self.logDB = None self.taskQueueDB = None self.pilotAgentsDB = None - self.storageManagementDB = None self.matchedTime = 7200 self.rescheduledTime = 600 self.submittingTime = 300 @@ -73,11 +72,6 @@ def initialize(self): return result self.pilotAgentsDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() - # getting parameters if not self.am_getOption("Enable", True): @@ -267,7 +261,6 @@ def _failStalledJobs(self, jobID): jobdb=self.jobDB, taskqueuedb=self.taskQueueDB, pilotagentsdb=self.pilotAgentsDB, - storagemanagementdb=self.storageManagementDB, ) if not res["OK"]: self.log.error("Failed to kill job", jobID) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py index 8af19416d1f..30e695e5334 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py @@ -1,6 +1,6 @@ from DIRAC import S_ERROR, S_OK, gLogger -from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition @@ -96,11 +96,6 @@ def kill_delete_jobs( if not result["OK"]: return result pilotagentsdb = result["Value"]() - if storagemanagementdb is None: - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - storagemanagementdb = result["Value"]() badIDs = [] @@ -133,6 +128,14 @@ def kill_delete_jobs( stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING] if stagingJobList: + if storagemanagementdb is None: + result = ObjectLoader().loadObject( + "StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB" + ) + if not result["OK"]: + return result + storagemanagementdb = result["Value"]() + gLogger.info("Going to send killing signal to stager as well!") result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 0f1d82d4ee1..87e37bdc028 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -1,14 +1,15 @@ -""" JobManagerHandler is the implementation of the JobManager service - in the DISET framework +"""JobManagerHandler is the implementation of the JobManager service +in the DISET framework - The following methods are available in the Service interface +The following methods are available in the Service interface - submitJob() - rescheduleJob() - deleteJob() - killJob() +submitJob() +rescheduleJob() +deleteJob() +killJob() """ + from pydantic import ValidationError from DIRAC import S_ERROR, S_OK @@ -65,11 +66,6 @@ def initializeHandler(cls, serviceInfoDict): return result cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - cls.storageManagementDB = result["Value"](parentLogger=cls.log) - except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp!r}") @@ -468,7 +464,6 @@ def export_deleteJob(self, jobIDs, force=False): jobdb=self.jobDB, taskqueuedb=self.taskQueueDB, pilotagentsdb=self.pilotAgentsDB, - storagemanagementdb=self.storageManagementDB, ) result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired() @@ -506,7 +501,6 @@ def export_killJob(self, jobIDs, force=False): jobdb=self.jobDB, taskqueuedb=self.taskQueueDB, pilotagentsdb=self.pilotAgentsDB, - storagemanagementdb=self.storageManagementDB, ) result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()