Skip to content

Commit 08cc2d9

Browse files
Merge pull request #1414 from datajoint/fix/1413-populate-reserve-jobs-restrictions
fix: populate() with reserve_jobs=True ignores restrictions (#1413)
2 parents e9b6e83 + 5610dfe commit 08cc2d9

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

src/datajoint/autopopulate.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,11 @@ def handler(signum, frame):
493493

494494
# Fetch pending jobs ordered by priority (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
495495
pending_query = self.jobs.pending & "scheduled_time <= CURRENT_TIMESTAMP(3)"
496+
if restrictions:
497+
# Restrict to jobs whose keys match the caller's restrictions.
498+
# semantic_check=False is required because the jobs table PK has
499+
# different lineage than key_source (see jobs.py refresh()).
500+
pending_query = pending_query.restrict(self._jobs_to_do(restrictions), semantic_check=False)
496501
if priority is not None:
497502
pending_query = pending_query & f"priority <= {priority}"
498503

tests/integration/test_autopopulate.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,46 @@ def make_insert(self, key, result, scale):
352352
row = (TripartiteComputed & "source_id = 2").fetch1()
353353
assert row["scale"] == 5
354354
assert row["result"] == 1000 # 200 * 5
355+
356+
357+
def test_populate_reserve_jobs_respects_restrictions(clean_autopopulate, subject, experiment):
358+
"""Regression test for #1413: populate() with reserve_jobs=True must honour restrictions.
359+
360+
Previously _populate_distributed() refreshed the job queue with the
361+
restriction but then fetched *all* pending jobs, ignoring the restriction
362+
and processing every pending key.
363+
"""
364+
assert subject, "subject table is empty"
365+
assert not experiment, "experiment table already has rows"
366+
367+
# Clear any stale jobs from previous tests (success/error entries would
368+
# prevent refresh() from re-adding them as pending).
369+
experiment.jobs.delete_quick()
370+
371+
# Refresh the full job queue (no restriction) so that all subjects have
372+
# pending jobs — this simulates the real-world scenario where workers share
373+
# a single job queue but each worker restricts to its own subset.
374+
experiment.jobs.refresh(delay=-1)
375+
total_pending = len(experiment.jobs.pending)
376+
assert total_pending > 0, "job refresh produced no pending entries"
377+
378+
# Pick one subject to use as the restriction.
379+
first_subject_id = subject.keys(order_by="subject_id ASC", limit=1)[0]["subject_id"]
380+
restriction = {"subject_id": first_subject_id}
381+
382+
# Populate only for the restricted subject. refresh=False so we use the
383+
# existing queue populated above. The bug was that this call would process
384+
# ALL pending jobs instead of only those matching the restriction.
385+
experiment.populate(restriction, reserve_jobs=True, refresh=False)
386+
387+
# Only rows for the restricted subject should exist.
388+
assert len(experiment) > 0, "no rows were populated"
389+
assert len(experiment - restriction) == 0, (
390+
"populate(reserve_jobs=True) processed keys outside the restriction "
391+
f"({len(experiment - restriction)} extra rows found)"
392+
)
393+
394+
# Rows for all other subjects must still be absent.
395+
other_subjects = subject - restriction
396+
if other_subjects:
397+
assert len(experiment & other_subjects.proj()) == 0, "rows for unrestricted subjects were incorrectly populated"

0 commit comments

Comments
 (0)