Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:caption: TransformationCleaningAgent options

"""

# # imports
import ast
import errno
Expand Down Expand Up @@ -69,7 +70,6 @@ def __init__(self, *args, **kwargs):
self.jobDB = None
self.pilotAgentsDB = None
self.taskQueueDB = None
self.storageManagementDB = None

# # transformations types
self.transformationTypes = None
Expand Down Expand Up @@ -144,11 +144,6 @@ def initialize(self):
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()

return S_OK()

#############################################################################
Expand Down Expand Up @@ -640,7 +635,6 @@ def __removeWMSTasks(self, transJobIDs):
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)
# Prevent 0 job IDs
jobIDs = [int(j) for j in transJobIDs if int(j)]
Expand Down
34 changes: 29 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE

Expand All @@ -55,6 +54,8 @@ def __init__(self, *args, **kwargs):

# clients
self.jobDB = None
self.taskQueueDB = None
self.pilotAgentsDB = None
self.sandboxDB = None

self.maxJobsAtOnce = 500
Expand All @@ -67,8 +68,25 @@ def __init__(self, *args, **kwargs):
def initialize(self):
"""Sets defaults"""

self.jobDB = JobDB()
self.sandboxDB = SandboxMetadataDB()
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
self.jobDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
if not result["OK"]:
return result
self.sandboxDB = result["Value"]()

agentTSTypes = self.am_getOption("ProductionTypes", [])
if agentTSTypes:
Expand Down Expand Up @@ -239,7 +257,13 @@ def _deleteRemoveJobs(self, jobList, remove=False):
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
result = wmsClient.removeJob(jobsList)
else:
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
result = kill_delete_jobs(
RIGHT_DELETE,
jobsList,
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
)
if not result["OK"]:
self.log.error(
f"Could not {'remove' if remove else 'delete'} jobs",
Expand Down
7 changes: 0 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def __init__(self, *args, **kwargs):
self.logDB = None
self.taskQueueDB = None
self.pilotAgentsDB = None
self.storageManagementDB = None
self.matchedTime = 7200
self.rescheduledTime = 600
self.submittingTime = 300
Expand Down Expand Up @@ -73,11 +72,6 @@ def initialize(self):
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()

# getting parameters

if not self.am_getOption("Enable", True):
Expand Down Expand Up @@ -267,7 +261,6 @@ def _failStalledJobs(self, jobID):
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)
if not res["OK"]:
self.log.error("Failed to kill job", jobID)
Expand Down
15 changes: 9 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition

Expand Down Expand Up @@ -96,11 +96,6 @@ def kill_delete_jobs(
if not result["OK"]:
return result
pilotagentsdb = result["Value"]()
if storagemanagementdb is None:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()

badIDs = []

Expand Down Expand Up @@ -133,6 +128,14 @@ def kill_delete_jobs(
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]

if stagingJobList:
if storagemanagementdb is None:
result = ObjectLoader().loadObject(
"StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB"
)
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()

gLogger.info("Going to send killing signal to stager as well!")
result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList)
if not result["OK"]:
Expand Down
22 changes: 8 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
""" JobManagerHandler is the implementation of the JobManager service
in the DISET framework
"""JobManagerHandler is the implementation of the JobManager service
in the DISET framework

The following methods are available in the Service interface
The following methods are available in the Service interface

submitJob()
rescheduleJob()
deleteJob()
killJob()
submitJob()
rescheduleJob()
deleteJob()
killJob()

"""

from pydantic import ValidationError

from DIRAC import S_ERROR, S_OK
Expand Down Expand Up @@ -65,11 +66,6 @@ def initializeHandler(cls, serviceInfoDict):
return result
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
cls.storageManagementDB = result["Value"](parentLogger=cls.log)

except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")

Expand Down Expand Up @@ -468,7 +464,6 @@ def export_deleteJob(self, jobIDs, force=False):
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)

result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
Expand Down Expand Up @@ -506,7 +501,6 @@ def export_killJob(self, jobIDs, force=False):
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)

result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
Expand Down
Loading