Skip to content

Commit 1ca01eb

Browse files
test: add regression test for populate(reserve_jobs=True) with restrictions
Adds test_populate_reserve_jobs_respects_restrictions which verifies that _populate_distributed() honours the caller's restriction when reserve_jobs=True. The test seeds a full job queue for all subjects, then calls populate(restriction, reserve_jobs=True, refresh=False) for a single subject and asserts that only that subject's rows were created. Also fixes the restrict() call in _populate_distributed to use semantic_check=False, matching the pattern in jobs.py refresh(), because the jobs table PK has different attribute lineage than key_source. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent f9a89f0 commit 1ca01eb

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

src/datajoint/autopopulate.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,10 @@ def handler(signum, frame):
494494
# Fetch pending jobs ordered by priority (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
495495
pending_query = self.jobs.pending & "scheduled_time <= CURRENT_TIMESTAMP(3)"
496496
if restrictions:
497-
pending_query = pending_query & self._jobs_to_do(restrictions)
497+
# Restrict to jobs whose keys match the caller's restrictions.
498+
# semantic_check=False is required because the jobs table PK has
499+
# different lineage than key_source (see jobs.py refresh()).
500+
pending_query = pending_query.restrict(self._jobs_to_do(restrictions), semantic_check=False)
498501
if priority is not None:
499502
pending_query = pending_query & f"priority <= {priority}"
500503

tests/integration/test_autopopulate.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,46 @@ def make_insert(self, key, result, scale):
352352
row = (TripartiteComputed & "source_id = 2").fetch1()
353353
assert row["scale"] == 5
354354
assert row["result"] == 1000 # 200 * 5
355+
356+
357+
def test_populate_reserve_jobs_respects_restrictions(clean_autopopulate, subject, experiment):
358+
"""Regression test for #1413: populate() with reserve_jobs=True must honour restrictions.
359+
360+
Previously _populate_distributed() refreshed the job queue with the
361+
restriction but then fetched *all* pending jobs, ignoring the restriction
362+
and processing every pending key.
363+
"""
364+
assert subject, "subject table is empty"
365+
assert not experiment, "experiment table already has rows"
366+
367+
# Clear any stale jobs from previous tests (success/error entries would
368+
# prevent refresh() from re-adding them as pending).
369+
experiment.jobs.delete_quick()
370+
371+
# Refresh the full job queue (no restriction) so that all subjects have
372+
# pending jobs — this simulates the real-world scenario where workers share
373+
# a single job queue but each worker restricts to its own subset.
374+
experiment.jobs.refresh(delay=-1)
375+
total_pending = len(experiment.jobs.pending)
376+
assert total_pending > 0, "job refresh produced no pending entries"
377+
378+
# Pick one subject to use as the restriction.
379+
first_subject_id = subject.keys(order_by="subject_id ASC", limit=1)[0]["subject_id"]
380+
restriction = {"subject_id": first_subject_id}
381+
382+
# Populate only for the restricted subject. refresh=False so we use the
383+
# existing queue populated above. The bug was that this call would process
384+
# ALL pending jobs instead of only those matching the restriction.
385+
experiment.populate(restriction, reserve_jobs=True, refresh=False)
386+
387+
# Only rows for the restricted subject should exist.
388+
assert len(experiment) > 0, "no rows were populated"
389+
assert len(experiment - restriction) == 0, (
390+
"populate(reserve_jobs=True) processed keys outside the restriction "
391+
f"({len(experiment - restriction)} extra rows found)"
392+
)
393+
394+
# Rows for all other subjects must still be absent.
395+
other_subjects = subject - restriction
396+
if other_subjects:
397+
assert len(experiment & other_subjects.proj()) == 0, "rows for unrestricted subjects were incorrectly populated"

0 commit comments

Comments
 (0)