Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions src/DIRAC/TransformationSystem/DB/TransformationDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
MAX_ERROR_COUNT = 10

TMP_TABLE_JOIN_LIMIT = 100
# Max number of LFNs in the memory table
GET_FILE_ID_BULK_SIZE = 30_000
#############################################################################


Expand Down Expand Up @@ -1324,27 +1326,36 @@ def __getFileIDsForLfns(self, lfns, connection=False):

if not lfns:
return ({}, {})
# Create temporary table for LFNs
sqlCmd = "CREATE TEMPORARY TABLE to_query_LFNs (LFN VARCHAR(255) NOT NULL, PRIMARY KEY (LFN)) ENGINE=MEMORY;"
returnValueOrRaise(self._update(sqlCmd, conn=connection))

try:
# Insert LFNs into temporary table
sqlCmd = "INSERT INTO to_query_LFNs (LFN) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(lfn,) for lfn in lfns], conn=connection))
lfns_to_ids = {}
ids_to_lfns = {}

# Query using JOIN with temporary table
req = "SELECT df.LFN, df.FileID FROM DataFiles df JOIN to_query_LFNs t ON df.LFN = t.LFN;"
res = returnValueOrRaise(self._query(req, conn=connection))
for lfn_bulk in breakListIntoChunks(lfns, GET_FILE_ID_BULK_SIZE):
# Create temporary table for LFNs
sqlCmd = (
"CREATE TEMPORARY TABLE to_query_LFNs (LFN VARCHAR(255) NOT NULL, PRIMARY KEY (LFN)) ENGINE=MEMORY;"
)
returnValueOrRaise(self._update(sqlCmd, conn=connection))

lfns = dict(res)
# Reverse dictionary
fids = {fileID: lfn for lfn, fileID in lfns.items()}
return (fids, lfns)
finally:
# Clean up temporary table
sqlCmd = "DROP TEMPORARY TABLE to_query_LFNs"
self._update(sqlCmd, conn=connection)
try:
# Insert LFNs into temporary table
sqlCmd = "INSERT INTO to_query_LFNs (LFN) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(lfn,) for lfn in lfn_bulk], conn=connection))

# Query using JOIN with temporary table
req = "SELECT df.LFN, df.FileID FROM DataFiles df JOIN to_query_LFNs t ON df.LFN = t.LFN;"
res = returnValueOrRaise(self._query(req, conn=connection))

lfns_to_ids.update(res)
# Reverse dictionary

finally:
# Clean up temporary table
sqlCmd = "DROP TEMPORARY TABLE to_query_LFNs"
self._update(sqlCmd, conn=connection)

ids_to_lfns = {fileID: lfn for lfn, fileID in lfns_to_ids.items()}
return (ids_to_lfns, lfns_to_ids)

def __getLfnsForFileIDs(self, fileIDs, connection=False):
"""Get lfns for the given list of fileIDs"""
Expand Down
Loading