PSv2: Track and display image count progress and state#1121
PSv2: Track and display image count progress and state#1121carlosgjs wants to merge 27 commits intoRolnickLab:mainfrom
Conversation
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces TaskStateManager with AsyncJobStateManager, adds failed-image tracking and cumulative counts (detections, classifications, captures) through NATS pipeline result handling, and updates _update_job_progress to accept a complete_state and propagate computed completion and counts into job progress and status. Changes
Sequence DiagramsequenceDiagram
participant NATS
participant Pipeline as process_nats_pipeline_result
participant StateMgr as AsyncJobStateManager
participant Redis
participant Django as Job Model
NATS->>Pipeline: deliver result (success / error)
Pipeline->>Pipeline: extract processed_image_ids, error_result, counts
Pipeline->>StateMgr: _commit_update(processed_ids, stage, failed_image_ids?)
StateMgr->>Redis: add/remove failed ids, update counters (processed, total, failed)
StateMgr-->>Pipeline: JobStateProgress (processed, remaining, failed, percentage)
Pipeline->>Pipeline: compute complete_state from failure ratio and FAILURE_THRESHOLD
Pipeline->>Pipeline: call _update_job_progress(job_id, stage, pct, complete_state, detections..., classifications..., captures...)
Pipeline->>Django: persist job.status and progress.summary.status when complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
* fix: update date picker version and tweak layout logic * feat: set start month based on selected date
* merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
…kLab#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
…dler (RolnickLab#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds richer progress tracking for async ML image processing jobs (detections/classifications/captures/failed) and uses failure ratio to determine final job state, bringing async behavior closer to the existing sync path.
Changes:
- Extend
TaskStateManager/TaskProgressto track cumulative detections/classifications/captures and a unique failed-image set in Redis. - Update async result handling (
process_nats_pipeline_result) to record failed images and per-result counts, and to propagate these into job progress updates (including final success/failure state). - Expand
TaskStateManagerunit tests to validate the new progress fields and cleanup behavior (partial coverage).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
ami/ml/orchestration/task_state.py |
Adds Redis-backed cumulative counters + failed-image tracking and surfaces them via TaskProgress. |
ami/jobs/tasks.py |
Plumbs new progress metrics through async pipeline result processing and modifies job completion state handling. |
ami/ml/tests.py |
Adds/updates tests for new progress fields and failed-image tracking. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@ami/jobs/tasks.py`:
- Around line 92-95: complete_state currently computed once using
FAILURE_THRESHOLD, JobState and progress_info and then reused later for the
"results" stage, which can be stale; before calling _update_job_progress for the
"results" stage (and before the state_manager.update_state that sets results to
100%), re-read the latest progress counts (e.g., fetch updated
progress_info.failed and progress_info.total from the state manager or from the
second state_manager.update_state response) and recompute complete_state =
JobState.FAILURE if (failed/total) >= FAILURE_THRESHOLD else JobState.SUCCESS so
the final job.status reflects the most recent state.
- Around line 92-95: The code currently divides progress_info.failed by
progress_info.total without guarding total==0, causing ZeroDivisionError; update
the logic around FAILURE_THRESHOLD, complete_state and JobState so that if
progress_info.total == 0 you short-circuit (set complete_state =
JobState.SUCCESS) and only compute (progress_info.failed / progress_info.total)
>= FAILURE_THRESHOLD when total > 0, mirroring the guard used in
_get_progress/task_state.py.
🧹 Nitpick comments (4)
ami/jobs/tasks.py (2)
133-140: Redundantif pipeline_resultchecks insideif pipeline_result:block.Lines 134 and 137 guard with
if pipeline_result else 0, but this code is already inside theif pipeline_result:block at line 123. The checks are dead branches.♻️ Simplification
# Calculate detection and classification counts from this result - detections_count = len(pipeline_result.detections) if pipeline_result else 0 - classifications_count = ( - sum(len(detection.classifications) for detection in pipeline_result.detections) - if pipeline_result - else 0 - ) + detections_count = len(pipeline_result.detections) + classifications_count = sum(len(detection.classifications) for detection in pipeline_result.detections) captures_count = len(pipeline_result.source_images)
195-197:complete_statetyped asAny— prefer a narrower type.Using
Anyweakens type safety. Since this always receives aJobStatevalue, type it accordingly.♻️ Tighten the type
+from ami.jobs.models import JobState # if not already imported at module level + def _update_job_progress( - job_id: int, stage: str, progress_percentage: float, complete_state: Any, **state_params + job_id: int, stage: str, progress_percentage: float, complete_state: "JobState", **state_params ) -> None:Since
JobStateis imported locally to avoid circular imports, you can use a string literal ("JobState") orTYPE_CHECKINGguard for the annotation.ami/ml/orchestration/task_state.py (2)
146-169: Read-modify-write on counters is safe only under the lock — consider a comment.The cumulative counting (lines 148–158) and failed-set union (lines 162–164) use non-atomic
cache.get+cache.set. This is safe because the callerupdate_stateholds a per-job lock, but_get_progressis also called directly in tests. A brief inline comment noting the lock invariant would help future maintainers.📝 Suggested comment
+ # NOTE: The read-modify-write below is safe because callers must hold + # the per-job lock (acquired in update_state). Do not call _get_progress + # without the lock in production code. + # Update cumulative detection, classification, and capture counts current_detections = cache.get(self._detections_key, 0)
139-141: O(n) filtering of pending images on every update.
remaining_imagesis computed with a list comprehension iterating all pending images (line 139), which grows as job size grows. For large jobs (tens of thousands of images), this linear scan on every progress update could become a bottleneck. A Redis set withSREMwould make removals O(1) per image, but this may be acceptable for current workloads.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/tasks.py (1)
124-183:⚠️ Potential issue | 🔴 Critical
self.retry()at line 163 will be swallowed by the broadexcept Exceptionat line 180.
celery.exceptions.Retryis a subclass ofException. Whenstate_manager.update_statereturnsNoneandself.retry(...)is raised at line 163, it's caught by line 180, logged as an error, and silently discarded — the task will never actually retry.Either narrow the except clause or re-raise
Retry:Proposed fix (option A — let Retry propagate)
+ from celery.exceptions import Retry + try: # Save to database (this is the slow operation) ... - except Exception as e: + except Retry: + raise + except Exception as e: job.logger.error( f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message." )
🧹 Nitpick comments (4)
ami/ml/tests.py (2)
883-886: Consider also assertingcapturescount in the init helper.
_init_and_verifychecksdetections,classifications, andfailedbut omitscaptures. Whilecapturesis tested intest_cumulative_counting, adding it here keeps the initial-state verification comprehensive and consistent.Proposed fix
self.assertEqual(progress.detections, 0) self.assertEqual(progress.classifications, 0) self.assertEqual(progress.failed, 0) + self.assertEqual(progress.captures, 0)
1104-1122:cache.geton a Redis set key may returnNoneor an unexpected type depending on the cache backend.Line 1114 assumes
cache.get(self.manager._failed_key)returns a Python set with alen(). This works with Django'slocmemcache in tests, but if the_get_progressmethod usescache.get/cache.setto store a Python set, the test is valid. Just be aware this would break if the underlying storage changes (e.g., raw Redis commands instead of Django cache).ami/jobs/tasks.py (2)
137-144: Redundantif pipeline_resultguards inside anif pipeline_result:block.Lines 138 and 141 check
if pipeline_resultagain, but they're already inside theif pipeline_result:block starting at line 127.Simplified
- detections_count = len(pipeline_result.detections) if pipeline_result else 0 - classifications_count = ( - sum(len(detection.classifications) for detection in pipeline_result.detections) - if pipeline_result - else 0 - ) + detections_count = len(pipeline_result.detections) + classifications_count = sum(len(d.classifications) for d in pipeline_result.detections) captures_count = len(pipeline_result.source_images)
204-206: Consider usingJobStateinstead ofAnyforcomplete_statetype hint.
Anyobscures the expected type. SinceJobStateis already imported inside the function body, you could use a string literal"JobState"or import it at module level underTYPE_CHECKINGto get a proper type hint without circular import issues.
# Conflicts: # ami/ml/orchestration/task_state.py
Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in RolnickLab#1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com>
- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ami/jobs/tasks.py (2)
159-184:⚠️ Potential issue | 🔴 Critical
raise self.retry()on line 164 will be swallowed by theexcept Exceptionon line 181.Celery's
self.retry()raisescelery.exceptions.Retry, which is a subclass ofException. The broadexcept Exceptionat line 181 will catch it, log it as a processing error, and silently discard the retry. This means if the lock isn't acquired for the "results" stage update, the progress will never be updated for that image, and the job may never complete.Note that the identical pattern at line 95 works correctly because it's outside any
try/exceptblock.🐛 Proposed fix — exclude Retry from the catch-all
+ from celery.exceptions import Retry as CeleryRetry + try: # Save to database (this is the slow operation) detections_count, classifications_count, captures_count = 0, 0, 0 ... + except CeleryRetry: + raise except Exception as e: job.logger.error( f"Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the task message." )
129-184:⚠️ Potential issue | 🟠 MajorNATS ack at line 147 precedes the results-stage progress update — if the latter fails, the message is lost.
After
_ack_task_via_natssucceeds (line 147), any exception in the results-stageupdate_stateor_update_job_progress(lines 150–179) is caught by theexceptat line 181 without re-raising. The error message claims "NATS will redeliver the task message," but the message has already been acknowledged. This could leave the job stuck with a results stage that never reaches 100%.Consider either moving the ack after the results-stage progress update, or re-raising the exception so Celery retries the task (NATS redelivery would then handle it since ack didn't happen).
🧹 Nitpick comments (1)
ami/ml/orchestration/task_state.py (1)
144-208: Cumulative counters are shared across stages — works but is implicit.The detection/classification/capture counters use a single set of Redis keys (not per-stage), so the "process" stage always contributes 0 and the "results" stage contributes actual counts. This works correctly given the current call sites in
tasks.py, but it's fragile if a future caller passes non-zero counts for the "process" stage — they'd be silently accumulated.A brief comment noting that counters are only expected to be incremented during the "results" stage would help future maintainners.
Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com>
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
@carlosgjs what do you think about using the existing Job model to track the progress & failure counts directly? instead of adding counts in Redis then copying to the Job model. Also I am a bit confused about what the "Tasks" are in Redis now, since they have counts for the Job's total captures, failures, etc. Or are those counts for a single batch? The current job model counts the progress & failures per stage type, so there isn't a single count for all failures. Or if there is a reason to use Redis I am open to that! But maybe we can update the naming & docstrings.
Also, I am thinking we should simplify the logic determining if a Job is in the FAILURE state. Let's just show the counts. Really we need a new state like "COMPLETED" instead of Celery's SUCCESS & FAILURE states. "Completed with errors". Then we can remove a number of checks related to the stage status & overall status.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
ami/ml/orchestration/jobs.py (1)
15-21:⚠️ Potential issue | 🟡 MinorStale docstring reference to
TaskStateManager.Line 19 still says "Redis state (via TaskStateManager.cleanup)" but the class has been renamed to
AsyncJobStateManager.📝 Proposed fix
- 1. Redis state (via TaskStateManager.cleanup): + 1. Redis state (via AsyncJobStateManager.cleanup):ami/ml/tests.py (1)
860-861:⚠️ Potential issue | 🟡 MinorStale class docstring.
Line 861 says "Test TaskStateManager" but the class under test is now
AsyncJobStateManager.📝 Proposed fix
class TestTaskStateManager(TestCase): - """Test TaskStateManager for job progress tracking.""" + """Test AsyncJobStateManager for job progress tracking."""ami/jobs/test_tasks.py (1)
89-93:⚠️ Potential issue | 🟡 MinorStale docstring reference.
Line 92 still says "Assert TaskStateManager state is correct" — should reference
AsyncJobStateManager.📝 Proposed fix
def _assert_progress_updated( self, job_id: int, expected_processed: int, expected_total: int, stage: str = "process" ): - """Assert TaskStateManager state is correct.""" + """Assert AsyncJobStateManager state is correct."""
🤖 Fix all issues with AI agents
In `@docs/claude/nats-todo.md`:
- Around line 132-159: Fix three minor typos in the notes: replace the word
"prevous" with "previous", change "hbs" to "has", and correct "SLUM" to "SLURM"
in the nats-todo notes (search for the exact tokens "prevous", "hbs", and "SLUM"
to locate the spots to update).
- Around line 86-93: The doc exposes an internal IP (192.168.123.176) in the
NATS monitoring instructions; update the text to avoid leaking internal network
addresses by replacing the IP with the hostname (e.g., ami-redis-1) or a
placeholder like <internal-host> and/or remove the IP entirely, and ensure the
example line that reads "http://192.168.123.176:8222" is changed to use the
hostname or placeholder so the SSH tunnel instructions and the dashboard server
URL examples remain accurate and non-sensitive.
- Around line 10-16: The doc contains sensitive infrastructure topology
(hostnames like ami-cc/ami-worker-2 and SSH commands) that should be removed;
replace those concrete hostnames/SSH commands with a brief, non-sensitive
operational note and/or move detailed steps to the private ops repo or wiki, and
redact the specific commands from the document while keeping the relevant config
detail (the NATS_URL change from the old default to nats://nats:4222) documented
generically; search for occurrences of NATS_URL and the nats://nats:4222 default
in the docs entry and replace the explicit SSH/host examples with a short,
environment-agnostic remediation note pointing to the private ops location.
In `@docs/claude/planning/pr-trackcounts-next-session.md`:
- Around line 39-51: Update the stale references in the document: replace any
mention of the old class name TaskStateManager with AsyncJobStateManager and
update the file reference ami/ml/orchestration/task_state.py to
ami/ml/orchestration/async_job_state.py (also check nearby mentions of
TaskStateManager/ task_state.py and adjust them to AsyncJobStateManager/
async_job_state.py so all references match the renamed class and file).
🧹 Nitpick comments (1)
ami/ml/orchestration/async_job_state.py (1)
144-208:_commit_updaterelies on the caller holding the lock — document and enforce this.The read-modify-write cycle on counters (lines 171–181) and the failed set (lines 184–190) is safe only because
update_stateserializes access via the Redis lock. However,_commit_updateis callable directly (and tests do so), and has no guard of its own.The docstring on line 154 says "Must be called under lock," which is good. Consider raising or asserting if the lock is not held, or at minimum note in the docstring that direct calls outside tests are unsafe.
| - **Root cause of job 2226 failure:** worker-2 was missing `NATS_URL` in `.envs/.production/.django`, so it defaulted to `nats://localhost:4222`. Every NATS ack from worker-2 failed with `Connect call failed ('127.0.0.1', 4222)`. | ||
| - **Fix applied in code:** Changed default in `config/settings/base.py:268` from `nats://localhost:4222` to `nats://nats:4222` (matches the hostname mapped via `extra_hosts` in all compose files). | ||
| - **Still needed on server:** | ||
| ```bash | ||
| ssh ami-cc "ssh ami-worker-2 'echo NATS_URL=nats://nats:4222 >> ~/ami-platform/.envs/.production/.django'" | ||
| ssh ami-cc "ssh ami-worker-2 'cd ~/ami-platform && docker compose -f docker-compose.worker.yml restart celeryworker'" | ||
| ``` |
There was a problem hiding this comment.
Internal infrastructure details committed to the repository.
These lines contain server hostnames (ami-cc, ami-worker-2), env file paths, and SSH commands specific to your production/staging environment. While no actual secrets (passwords/tokens) are present, this is infrastructure topology information that may be better suited for a private ops repo or wiki rather than the main codebase.
🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 10 - 16, The doc contains sensitive
infrastructure topology (hostnames like ami-cc/ami-worker-2 and SSH commands)
that should be removed; replace those concrete hostnames/SSH commands with a
brief, non-sensitive operational note and/or move detailed steps to the private
ops repo or wiki, and redact the specific commands from the document while
keeping the relevant config detail (the NATS_URL change from the old default to
nats://nats:4222) documented generically; search for occurrences of NATS_URL and
the nats://nats:4222 default in the docs entry and replace the explicit SSH/host
examples with a short, environment-agnostic remediation note pointing to the
private ops location.
| ### Expose NATS monitoring for dashboard access | ||
|
|
||
| - **Port 8222 is already exposed** on ami-redis-1, so `http://192.168.123.176:8222` should work from the VPN | ||
| - **For browser dashboard** (https://natsdashboard.com/): Needs the monitoring endpoint reachable from your browser. Use SSH tunnel if not on VPN: | ||
| ```bash | ||
| ssh -L 8222:localhost:8222 ami-cc -t "ssh -L 8222:localhost:8222 ami-redis-1" | ||
| ``` | ||
| Then open https://natsdashboard.com/ with server URL `http://localhost:8222` |
There was a problem hiding this comment.
Internal IP address in documentation.
Line 88 contains the internal IP 192.168.123.176. Consider referencing by hostname or removing if this doc is in a public or semi-public repository.
🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 86 - 93, The doc exposes an internal
IP (192.168.123.176) in the NATS monitoring instructions; update the text to
avoid leaking internal network addresses by replacing the IP with the hostname
(e.g., ami-redis-1) or a placeholder like <internal-host> and/or remove the IP
entirely, and ensure the example line that reads "http://192.168.123.176:8222"
is changed to use the hostname or placeholder so the SSH tunnel instructions and
the dashboard server URL examples remain accurate and non-sensitive.
| - Processing service failing on batches with different image sizes | ||
| - How can we mark an image/task as failed and say don't retry? | ||
| - Processing service still needs to batch classifications (like prevous methods) | ||
| - Nats jobs appear stuck if there are any task failures: https://antenna.insectai.org/projects/18/jobs/2228 | ||
| - If a task crashes, the whole worker seems to reset | ||
| - Then no tasks are found remaining for the job in NATS | ||
| 2026-02-09 18:23:49 [info ] No jobs found, sleeping for 5 seconds | ||
| 2026-02-09 18:23:54 [info ] Checking for jobs for pipeline panama_moths_2023 | ||
| 2026-02-09 18:23:55 [info ] Checking for jobs for pipeline panama_moths_2024 | ||
| 2026-02-09 18:23:55 [info ] Checking for jobs for pipeline quebec_vermont_moths_2023 | ||
| 2026-02-09 18:23:55 [info ] Processing job 2229 with pipeline quebec_vermont_moths_2023 | ||
| 2026-02-09 18:23:55 [info ] Worker 0/2 starting iteration for job 2229 | ||
| 2026-02-09 18:23:55 [info ] Worker 1/2 starting iteration for job 2229 | ||
| 2026-02-09 18:23:59 [info ] Worker 0: No more tasks for job 2229 | ||
| 2026-02-09 18:23:59 [info ] Worker 0: Iterator finished | ||
| 2026-02-09 18:24:03 [info ] Worker 1: No more tasks for job 2229 | ||
| 2026-02-09 18:24:03 [info ] Worker 1: Iterator finished | ||
| 2026-02-09 18:24:03 [info ] Done, detections: 0. Detecting time: 0.0, classification time: 0.0, dl time: 0.0, save time: 0.0 | ||
|
|
||
| - Would love some logs like "no task has been picked up in X minutes" or "last seen", etc. | ||
| - Skip jobs that hbs no tasks in the initial query | ||
|
|
||
| - test in a SLUM job! yeah! in Victoria? | ||
|
|
||
| - jumps around between jobs - good thing? annoying? what about when there is only one job open? | ||
| - time for time estimates | ||
|
|
||
| - bring back vectors asap |
There was a problem hiding this comment.
Minor typos in the notes section.
- Line 134: "prevous" → "previous"
- Line 152: "hbs" → "has"
- Line 154: "SLUM" → likely "SLURM"
🤖 Prompt for AI Agents
In `@docs/claude/nats-todo.md` around lines 132 - 159, Fix three minor typos in
the notes: replace the word "prevous" with "previous", change "hbs" to "has",
and correct "SLUM" to "SLURM" in the nats-todo notes (search for the exact
tokens "prevous", "hbs", and "SLUM" to locate the spots to update).
| - **Redis side:** `TaskStateManager` → `JobStateProgress` (dataclass, ephemeral in Redis) | ||
| - Tracks pending image IDs per stage (process, results) | ||
| - Tracks cumulative counts: detections, classifications, captures, failed | ||
| - Single flat object — no per-stage breakdown of counts | ||
|
|
||
| The disconnect: Redis tracks **per-stage pending images** (separate pending lists for "process" and "results" stages) but returns **job-wide cumulative counts** (one detections counter, one failed set). So `JobStateProgress` is a hybrid — stage-scoped for image completion, but job-scoped for counts. | ||
|
|
||
| **Should counts be per-stage?** For example, "failed" in the process stage means images that errored during ML inference. But could there be failures in the results stage too (failed to save)? The sync path tracks `request_failed_images` (process failures) separately from `failed_save_tasks` (results failures). The async path currently lumps all failures into one set. | ||
|
|
||
| **Key files to read:** | ||
| - `ami/ml/orchestration/task_state.py` — `TaskStateManager` and `JobStateProgress` | ||
| - `ami/jobs/tasks.py:62-185` — `process_nats_pipeline_result` (async path, uses TaskStateManager) | ||
| - `ami/jobs/models.py:466-582` — `MLJob.process_images` (sync path, tracks counts locally) |
There was a problem hiding this comment.
Stale references to old names.
- Line 39:
TaskStateManager→ should beAsyncJobStateManager - Line 49:
ami/ml/orchestration/task_state.py→ should beami/ml/orchestration/async_job_state.py
These references are inconsistent with the rename performed in this PR.
🤖 Prompt for AI Agents
In `@docs/claude/planning/pr-trackcounts-next-session.md` around lines 39 - 51,
Update the stale references in the document: replace any mention of the old
class name TaskStateManager with AsyncJobStateManager and update the file
reference ami/ml/orchestration/task_state.py to
ami/ml/orchestration/async_job_state.py (also check nearby mentions of
TaskStateManager/ task_state.py and adjust them to AsyncJobStateManager/
async_job_state.py so all references match the renamed class and file).
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ami/ml/orchestration/async_job_state.py (1)
69-86:⚠️ Potential issue | 🟡 MinorStale docstring: references non-existent parameters.
The docstring for
update_statedocumentsdetections_count,classifications_count, andcaptures_countparameters (Lines 83–85) that don't exist in the method signature. These appear to be leftovers from an earlier iteration.✏️ Proposed fix
Args: processed_image_ids: Set of image IDs that have just been processed stage: The processing stage ("process" or "results") request_id: Unique identifier for this processing request - detections_count: Number of detections to add to cumulative count - classifications_count: Number of classifications to add to cumulative count - captures_count: Number of captures to add to cumulative count failed_image_ids: Set of image IDs that failed processing (optional)ami/jobs/tasks.py (1)
129-181:⚠️ Potential issue | 🟠 MajorIf an error occurs after the NATS ACK (Line 147), the results-stage progress will never be updated, potentially leaving the job stuck.
The
excepton Line 178 catches all exceptions and logs them, but doesn't re-raise. Since the NATS message was already acknowledged, it won't be redelivered. Ifstate_manager.update_state(Line 150) or_update_job_progress(Line 168) fails, the "results" stage will never reach 100% and the job will remain incomplete.Also, the error message on Line 180 says "NATS will redeliver the task message," which is incorrect for failures after the ACK.
Consider either:
- Moving the ACK after the results-stage update, or
- Re-raising after logging so the Celery task can be retried.
🧹 Nitpick comments (2)
ami/ml/tests.py (2)
860-861: Stale class name in test class docstring.The docstring still references
TaskStateManagerbut the class was renamed toAsyncJobStateManager.class TestTaskStateManager(TestCase): - """Test TaskStateManager for job progress tracking.""" + """Test AsyncJobStateManager for job progress tracking."""
1018-1036: Direct access to_failed_keyis fine for whitebox testing, butcache.geton Line 1028 could returnNoneif the test setup changes.Currently this is safe because the preceding
_commit_updatecall guarantees the key exists. Just be aware this is a fragile coupling—if_commit_update's caching behavior changes,len(failed_set)on Line 1029 would raiseTypeError.
Summary
This pull request introduces progress tracking of various counts for async image processing jobs. It brings the async path to parity with the sync path. This includes setting the JobState according to the ratio of failures.
TaskStateManagerandTaskProgress, including new Redis keys and logic for updating and cleaning up these metrics. * Updatedprocess_nats_pipeline_resultinami/jobs/tasks.pyto calculate and pass failed image IDs and detection/classification/capture counts toTaskStateManager, and to reflect these in job progress updates._update_job_progressto support custom completion states (success/failure) based on the proportion of failed images, and to propagate new progress metrics to the job model.ami/ml/tests.pyto validate the new progress fields, including detections, classifications, and failed image counts.Screenshots
Success case:


Failure case:

(test artificially failed some images). Also Job state set to Failure.
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Documentation