Skip to content

PSv2: Use connection pooling and retries for NATS#1130

Open
carlosgjs wants to merge 20 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/natsconn
Open

PSv2: Use connection pooling and retries for NATS#1130
carlosgjs wants to merge 20 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/natsconn

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 11, 2026

Summary

This pull request introduces improvements of the NATS JetStream task queue management to improve connection reliability, efficiency, and error handling. The main change is the introduction of a process-local NATS connection pool, which replaces the previous pattern of creating and closing connections for every operation. The code now uses retry logic with exponential backoff for all NATS operations. The context manager pattern for TaskQueueManager is removed, and all methods are updated to use the shared connection pool. Several methods are now decorated to automatically retry on connection errors.

Testing

Tested locally with multiple runs of 100 images, verifying tasks are acknowledges and NATS resources cleaned up.
image

Checklist

  • [ x] I have tested these changes appropriately.
  • [ x] I have added and/or modified relevant tests.
  • [ x] I updated relevant documentation or comments.
  • [ x] I have verified that this PR follows the project's coding standards.
  • [ x] Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Added automatic retry with backoff for transient NATS connection errors.
  • Refactor

    • Moved to a process-local, event-loop-bound pooled NATS connection model and lifecycle-managed access for queue operations.
    • Switched callers from context-manager usage to direct manager instantiation.
  • Bug Fixes

    • Improved handling of missing stream/consumer and timeout/no-message scenarios; best-effort cleanup with per-operation retries.
  • Tests

    • Expanded tests for connection pool lifecycle, retry behavior, ack/cleanup flows, and error handling.
  • Documentation

    • Updated test/run command docs to use CI docker-compose file.

@netlify
Copy link

netlify bot commented Feb 11, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit fa0f84b
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/698fd6a5657d8a0008bd2d70

@netlify
Copy link

netlify bot commented Feb 11, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit fa0f84b
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/698fd6a591c8500008db7032

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Introduces a per-event-loop NATS ConnectionPool and retry decorator, refactors TaskQueueManager to use pool-provided (nc, js) per operation, and updates publish/reserve/ack/delete/cleanup flows and tests to use lifecycle-managed connections with retry/backoff.

Changes

Cohort / File(s) Summary
Connection pool & API
ami/ml/orchestration/nats_connection.py
Adds event-loop-scoped ConnectionPool and ContextManagerConnection, plus module-level get_connection() and reset_connection() to provide/ reset (nc, js) with lazy locking and reconnection logic.
Task queue, retries & pool integration
ami/ml/orchestration/nats_queue.py
Adds retry_on_connection_error decorator; refactors TaskQueueManager to use _get_connection() (pool-backed) for publish/reserve/ack/delete/cleanup; handles NotFoundError/TimeoutError, per-operation retries and backoff.
NATS tests
ami/ml/orchestration/tests/test_nats_queue.py, ami/ml/orchestration/tests/test_nats_connection.py
Reworks NATS mocks to a context-managed pool setup, adds tests for retry/reset behavior and comprehensive ConnectionPool lifecycle (create, reuse, reconnect, close, reset) and module-level functions.
Cleanup tests update
ami/ml/orchestration/tests/test_cleanup.py
Adjusts checks to obtain JetStream context via TaskQueueManager._get_connection() and use returned js for stream/consumer verifications.
Call-site lifecycle changes
ami/jobs/tasks.py, ami/jobs/views.py, ami/ml/orchestration/jobs.py
Replaces several async with TaskQueueManager() usages with plain TaskQueueManager() instantiation and direct method calls, shifting resource-management semantics away from context-manager teardown.
Docs / CI command tweak
.agents/AGENTS.md
Updates CI/test docker-compose invocation examples to use docker-compose.ci.yml.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant TQM as TaskQueueManager
    participant Retry as RetryDecorator
    participant Pool as ConnectionPool
    participant NATS as NATS Client
    participant JS as JetStream

    App->>TQM: publish_task(job_id, task)
    activate TQM
    TQM->>Retry: wrapped call
    activate Retry
    Retry->>TQM: attempt
    TQM->>Pool: _get_connection()
    activate Pool
    Pool->>NATS: ensure/connect (nc)
    Pool->>JS: provide JetStreamContext (js)
    Pool-->>TQM: (nc, js)
    deactivate Pool
    TQM->>JS: publish(stream, data)
    JS-->>TQM: ack / error
    alt Connection error
        TQM-->>Retry: raise connection error
        Retry->>Pool: reset_connection()
        Retry->>Retry: backoff wait
        Retry->>TQM: retry attempt (up to max)
    end
    Retry-->>App: result
    deactivate Retry
    deactivate TQM

    App->>TQM: reserve_task(job_id, timeout)
    activate TQM
    TQM->>Retry: wrapped call
    activate Retry
    Retry->>TQM: attempt
    TQM->>Pool: _get_connection()
    Pool-->>TQM: (nc, js)
    TQM->>JS: pull_subscribe + fetch(timeout)
    alt Message received
        JS-->>TQM: message -> PipelineProcessingTask
        Retry-->>App: task
    else Timeout
        JS-->>TQM: TimeoutError
        Retry-->>App: None
    end
    deactivate Retry
    deactivate TQM
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

backend, ml

Poem

🐇 I hopped into the pool today,

Loops kept snug where connections stay,
Retries nudge with gentle beat,
Backoff twitches, then we meet,
Streams and ACKs — a tidy play.

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: introducing connection pooling and retries for NATS operations, which is the primary focus of this PR.
Description check ✅ Passed The description covers the summary, main changes, testing performed, and checklist; however, it lacks a 'Related Issues' section, 'Detailed Description' section with potential side effects/risks, explicit 'How to Test' instructions, and 'Deployment Notes' section.
Docstring Coverage ✅ Passed Docstring coverage is 82.81% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs requested a review from mihow February 11, 2026 18:13
@carlosgjs carlosgjs marked this pull request as ready for review February 11, 2026 18:21
Copilot AI review requested due to automatic review settings February 11, 2026 18:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors NATS JetStream task-queue interactions to reuse a shared (process-local) connection and to add retry/backoff behavior for improved reliability and reduced connection churn.

Changes:

  • Introduces a new ConnectionPool module to lazily create and reuse a NATS + JetStream connection.
  • Updates TaskQueueManager to always obtain connections from the pool, removes async context-manager lifecycle, and adds a retry decorator for connection-related failures.
  • Updates call sites and unit tests to match the new non-context-manager usage.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
ami/ml/orchestration/tests/test_nats_queue.py Refactors tests to mock the new connection pool and remove context-manager assumptions.
ami/ml/orchestration/nats_queue.py Adds retry decorator, removes per-operation connection creation, and routes all operations through the shared pool.
ami/ml/orchestration/nats_connection_pool.py New module implementing a process-local cached NATS connection + JetStream context.
ami/ml/orchestration/jobs.py Removes async with TaskQueueManager() usage in orchestration job helpers.
ami/jobs/views.py Removes async with TaskQueueManager() usage in the tasks endpoint.
ami/jobs/tasks.py Removes async with TaskQueueManager() usage when ACKing tasks from Celery.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection_pool.py`:
- Around line 104-112: get_pool() can race when multiple threads call it
concurrently; make initialization of the module-global _connection_pool
thread-safe by introducing a module-level threading.Lock (e.g.,
_connection_pool_lock) and use a double-checked locking pattern inside
get_pool(): check _connection_pool, acquire _connection_pool_lock, check again,
and only then instantiate ConnectionPool and assign to _connection_pool; ensure
you import threading and keep the global _connection_pool and lock declarations
at module scope so get_pool() uses them.
- Around line 88-97: The reset() method currently nulls out self._nc and
self._js and leaks an active NATS connection; modify reset (or add an async
reset_async) to attempt to close/drain the existing connection before clearing
references: if self._nc exists, call its close/drain routine (await if reset
becomes async, or schedule with asyncio.create_task(self._nc.close()) if keeping
reset synchronous) wrapped in try/except to swallow errors, then set self._nc =
None and self._js = None; update any callers (including the retry decorator) to
call the async version or rely on the scheduled background close so the old TCP
socket is not leaked.

In `@ami/ml/orchestration/nats_queue.py`:
- Line 318: The log message contains a missing space between "job" and the job
id; update the logger.info call that logs stream deletion (the line using
logger.info with f"Deleted stream {stream_name} for job'{job_id}'") to insert a
space so it reads f"Deleted stream {stream_name} for job '{job_id}'", leaving
the surrounding code unchanged.
🧹 Nitpick comments (6)
ami/ml/orchestration/jobs.py (1)

107-109: Use logger.exception to preserve the stack trace.

When a publish fails, the traceback is valuable for diagnosing whether it's a connection issue that exhausted retries, a serialization bug, etc.

Proposed fix
             except Exception as e:
-                logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
+                logger.exception(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
                 success = False
ami/ml/orchestration/tests/test_nats_queue.py (2)

88-100: Consider adding a test for the TimeoutError path in reserve_task.

This test mocks fetch to return [], which exercises the if msgs: falsy branch. However, in practice, NATS's fetch() raises nats_errors.TimeoutError when no messages are available (handled at Line 243 of nats_queue.py). Consider adding a test case where fetch raises TimeoutError to cover that code path as well.

Additional test case
async def test_reserve_task_timeout(self):
    """Test reserve_task when fetch raises TimeoutError (no messages)."""
    from nats import errors as nats_errors

    with self._mock_nats_setup() as (_, js, _):
        mock_psub = MagicMock()
        mock_psub.fetch = AsyncMock(side_effect=nats_errors.TimeoutError)
        mock_psub.unsubscribe = AsyncMock()
        js.pull_subscribe = AsyncMock(return_value=mock_psub)

        manager = TaskQueueManager()
        task = await manager.reserve_task(123)

        self.assertIsNone(task)
        mock_psub.unsubscribe.assert_called_once()

49-131: No tests for the retry/backoff behavior.

The retry_on_connection_error decorator is a core part of this PR. Consider adding at least one test that verifies a retried operation succeeds on a subsequent attempt after a connection error, and one that verifies the error is raised after exhausting retries. This would validate the most important new behavior introduced.

ami/ml/orchestration/nats_connection_pool.py (1)

50-58: Fast-path check outside the lock is fine for asyncio but worth a comment.

The health check at line 51 and stale-connection cleanup at lines 55-58 happen outside the lock, relying on asyncio's cooperative scheduling (no preemption between awaits). This is correct but non-obvious. A brief inline comment would help future readers.

Suggested comment
         # Fast path: connection exists, is open, and is connected
+        # Safe without lock: no await/yield between check and return (cooperative scheduling)
         if self._nc is not None and not self._nc.is_closed and self._nc.is_connected:
             return self._nc, self._js  # type: ignore
ami/ml/orchestration/nats_queue.py (2)

169-197: Redundant _get_connection() call — js fetched at line 182 is also re-fetched inside _ensure_stream and _ensure_consumer.

publish_task fetches js at line 182, but _ensure_stream (line 124) and _ensure_consumer (line 144) each fetch their own js from the pool internally. The js from line 182 is only used at line 194. This works because the pool returns the same object, but the pattern is slightly misleading — it looks like the connection from line 182 is used throughout when it's not.

Consider either (a) passing js into _ensure_stream/_ensure_consumer, or (b) moving the _get_connection() call to after the ensure calls since that's where it's first needed. Same applies to reserve_task.


199-249: reserve_task: implicit None return when msgs is empty.

If psub.fetch() returns an empty list (rather than raising TimeoutError), the function falls through the if msgs: block and except clause, runs finally, and returns None implicitly. This is technically correct per the return type, but an explicit return None after the if msgs: block would make the intent clearer.

Suggested improvement
             if msgs:
                 msg = msgs[0]
                 task_data = json.loads(msg.data.decode())
                 metadata = msg.metadata
 
                 # Parse the task data into PipelineProcessingTask
                 task = PipelineProcessingTask(**task_data)
                 # Set the reply_subject for acknowledgment
                 task.reply_subject = msg.reply
 
                 logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
                 return task
+            return None
 
         except nats_errors.TimeoutError:

carlosgjs and others added 2 commits February 11, 2026 11:58
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@mihow
Copy link
Collaborator

mihow commented Feb 12, 2026

Hi Carlos, here is a Claude-written comment after I spent some time with it reviewing these changes. Ha.


The retry decorator is a great addition — the nats.py client doesn't buffer operations during reconnection (unlike the Go client), so application-level retry is genuinely needed.

I think we can simplify the connection pooling, it may be unnecessary for our scale after all.

We run ~3 Celery workers on separate VMs. Since async_to_sync() is blocking, each worker only has one NATS connection alive at a time even with the old open/close pattern. The 2000 TCP connections concern from my original feedback was about connect/disconnect cycles, not concurrent connections. The overhead is real but modest — TCP handshake + TLS (if applicable) per operation.

More importantly, the event-loop-keyed WeakKeyDictionary pool adds complexity that's hard to reason about:

  • async_to_sync() (from asgiref/Django) creates ephemeral event loops per call, so the pool entries may not actually be reused between calls — defeating the purpose
  • reset() clears self._lock = None which can race with concurrent _ensure_lock() calls on the same pool instance
  • The pool fights NATS's built-in reconnection: get_connection() discards a client in RECONNECTING state and creates a new one, rather than letting the client finish reconnecting on its own

Suggested simplification

Keep the retry decorator (real value) and drop the pool. The connection scoping is already natural:

  1. queue_images_to_nats — already wraps the entire publish loop in one async_to_sync() call, so one connection covers all publishes. No change needed.
  2. _ack_task_via_nats — one connection per ACK is fine at our scale. The retry decorator handles transient failures.
  3. reserve_task (views.py) — one connection per batch fetch, already scoped correctly.

Concretely, this would mean:

  • Delete nats_connection_pool.py
  • Keep retry_on_connection_error decorator in nats_queue.py
  • Restore the async with TaskQueueManager() context manager (or just create+close within each method), but keep the retry wrapping
  • Optionally: pass reconnected_cb / disconnected_cb to nats.connect() for logging, since the defaults (allow_reconnect=True, 60 attempts, 2s apart) already handle socket-level reconnection

This keeps the retry logic (the real fix) while avoiding the event-loop-keyed pool complexity. What do you think?

carlosgjs and others added 3 commits February 12, 2026 16:16
…rop pool

Replace the event-loop-keyed WeakKeyDictionary connection pool with a
straightforward async context manager on TaskQueueManager. Each
async_to_sync() call now scopes one connection for its block of
operations (e.g. queue_all_images reuses one connection for all
publishes, _ack_task_via_nats gets one for the single ACK).

The retry decorator is preserved — on connection error it closes the
stale connection so the next _get_connection() creates a fresh one.
Also adds reconnected_cb/disconnected_cb logging callbacks to
nats.connect() and narrows bare except clauses to NotFoundError.

Co-Authored-By: Claude <noreply@anthropic.com>
await manager.publish_task(job_id, task)

The connection is created on entry and closed on exit. Within the block, the retry
decorator handles transient connection errors by clearing and recreating the connection.
Copy link
Collaborator Author

@carlosgjs carlosgjs Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihow - The downside of this approach is that there will still be a large number or connections opened and closed, which was one of the original problems we were trying to solve. The TaskQueueManager is used in 3 scenarios (assume a 1000 image job):

  • Queuing up all tasks for a job: Here we do ok since we can queue all 1000 images in one async context, so 1 connection is used.
  • Fetching /tasks. The workers will call this at least 1000/batch size, so we'll likely have 250-500 connections.
  • Acknowledging results: Each result is saved/acked on a separate celery task, so this will be 1000 connections.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for breaking that down @carlosgjs. I am fine to go with your implementation, but I got a little nervous. It seems like maybe you were fighting Django's async_to_sync() stuff, and I was just thinking about the first scenario (Queuing up all tasks for a job).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carlosgjs I reverted back to your implementation, but i got some help to add more comments & docstrings for me to know what's going on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just read your/claude's comment. Makes sense, I agree it's fine to keep the simplified approach and only optimize the connections if needed later.

mihow added a commit to uw-ssec/antenna that referenced this pull request Feb 13, 2026
Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow and others added 3 commits February 13, 2026 12:02
… churn

Reverts c384199 which replaced the event-loop-keyed connection pool with
a plain async context manager. The context manager approach opened and
closed a TCP connection per async block, causing ~1500 connections per
1000-image job (250-500 for task fetches, 1000 for ACKs).

The connection pool keeps one persistent connection per event loop and
reuses it across all TaskQueueManager operations.

Co-Authored-By: Claude <noreply@anthropic.com>
Extract connection pool to a pluggable design with two strategies:
- "pool" (default): persistent connection reuse per event loop
- "per_operation": fresh TCP connection each time, for debugging

Controlled by NATS_CONNECTION_STRATEGY Django setting. Both strategies
implement the same interface (get_connection, reset, close) so
TaskQueueManager is agnostic to which one is active.

Changes:
- Rename nats_connection_pool.py to nats_connection.py
- Rename get_pool() to get_provider()
- Use settings.NATS_URL directly instead of getattr with divergent defaults
- Narrow except clauses in _ensure_stream/_ensure_consumer to NotFoundError
- Add _js guard to fast path, add strategy logging
- Enhanced module and class docstrings

Co-Authored-By: Claude <noreply@anthropic.com>
Remove the switchable strategy pattern (Protocol + factory + Django setting)
and expose the connection pool directly via module-level get_connection() and
reset_connection() functions. The PerOperationConnection is archived as
ContextManagerConnection for reference. Remove NATS_CONNECTION_STRATEGY setting.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection.py`:
- Around line 186-192: The except block that catches RuntimeError from
asyncio.get_running_loop() should preserve the original exception chain: capture
the caught exception (e.g., "except RuntimeError as err:") and re-raise the
RuntimeError with your existing message using "raise ... from err" so the
original traceback is linked; update the block inside get_connection()/where
get_running_loop() is called to use "except RuntimeError as err" and "raise
RuntimeError(... ) from err".
- Around line 1-27: The pool keyed by event loop (ConnectionPool /
WeakKeyDictionary) doesn't guarantee reuse when callers use async_to_sync(), so
either confirm your runtime actually reuses the same event loop across
async_to_sync() or switch to a per-call connection strategy: remove/stop using
the loop-keyed pool lookup in ConnectionPool (and associated WeakKeyDictionary),
instead create and close a fresh nats.Client per operation (or default to
ContextManagerConnection) and rely on the existing retry_on_connection_error /
reset_connection logic for resilience; update any helper functions that call
ConnectionPool.create/get to use the per-call connect/close flow and keep
ContextManagerConnection as the simple fallback.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 253-276: The finally block currently awaits psub.unsubscribe()
which can raise and mask earlier exceptions and also makes the empty-msgs path
implicit; change the flow so that after calling psub.fetch(1, timeout=timeout)
you explicitly return None if msgs is falsy, and move the unsubscribe call into
its own try/except that catches and logs/unwarns errors without suppressing the
original exception (i.e., if an exception is active, call psub.unsubscribe()
inside a nested try/except and re-raise the original exception; if no exception,
safely await unsubscribe and log any unsubscribe errors). Update the block that
constructs PipelineProcessingTask and the handling around
psub.fetch/psub.unsubscribe accordingly so psub.unsubscribe failures do not hide
earlier exceptions.
- Line 217: Replace the deprecated Pydantic v1 call: in the line that builds
task_data (task_data = json.dumps(data.dict())), call data.model_dump() instead
so it uses Pydantic v2; update the expression that creates task_data in
nats_queue.py (the variable task_data and the object data) to use model_dump()
to avoid deprecation warnings and preserve the same serialized structure.
🧹 Nitpick comments (4)
ami/ml/orchestration/nats_connection.py (3)

59-68: Race between reset() and _ensure_lock() when self._lock is set to None.

reset() (line 137) sets self._lock = None. If a concurrent coroutine has already passed the self._lock is None check at line 66 but hasn't yet assigned the new lock, reset() could clear the newly-created lock. In practice, within a single-threaded asyncio event loop, cooperative scheduling means there's no true interleaving between lines 66–67. However, _ensure_lock should still be tightened to avoid fragility if reset() ever runs concurrently (e.g., from a callback):

Suggested improvement
     async def reset(self):
         ...
         self._nc = None
         self._js = None
-        self._lock = None  # Clear lock so new one is created for fresh connection
+        self._lock = None  # Will be lazily recreated on next get_connection()

Consider documenting that reset() must only be called from the same event loop that owns this pool (which is already implied by the design).


80-107: Fast-path state mutation outside the lock may confuse future maintainers.

Lines 86–89 set self._nc = None and self._js = None before acquiring the lock. While safe in a cooperative single-threaded asyncio context (the double-check at line 95 handles the reconnection), this pattern is unusual for a lock-protected resource and could lead to bugs if the code is ever adapted for truly concurrent access. Consider moving the clearing inside the lock:

Suggested refactor
-        # Connection is stale or doesn't exist — clear references before reconnecting
-        if self._nc is not None:
-            logger.warning("NATS connection is closed or disconnected, will reconnect")
-            self._nc = None
-            self._js = None
-
         # Slow path: acquire lock to prevent concurrent reconnection attempts
         lock = self._ensure_lock()
         async with lock:
             # Double-check after acquiring lock (another coroutine may have reconnected)
             if self._nc is not None and self._js is not None and not self._nc.is_closed and self._nc.is_connected:
                 return self._nc, self._js
 
+            # Connection is stale or doesn't exist — clear references before reconnecting
+            if self._nc is not None:
+                logger.warning("NATS connection is closed or disconnected, will reconnect")
+            self._nc = None
+            self._js = None
+
             nats_url = settings.NATS_URL

140-175: ContextManagerConnection is dead code — consider removing it.

This class is described as "archived" and "kept as a drop-in fallback," but it is never referenced anywhere in the codebase. Keeping dead code increases the maintenance surface. If it's valuable as documentation, a comment or ADR would serve better.

#!/bin/bash
# Verify ContextManagerConnection is not used anywhere
rg -n "ContextManagerConnection" --type=py
ami/ml/orchestration/nats_queue.py (1)

196-224: _ensure_stream and _ensure_consumer are called on every publish_task invocation.

For a 1000-image job, this results in ~2000 extra stream_info/consumer_info round trips to NATS, even though the stream and consumer only need to be created once. Consider caching the "ensured" status per job_id for the lifetime of the event loop (e.g., a set on the manager or pool), or moving the ensure calls to a separate setup method invoked once before the publish loop.

Sketch
+    _ensured_jobs: set[int] = set()  # class-level or instance-level cache
+
+    async def _ensure_resources(self, job_id: int):
+        if job_id in self._ensured_jobs:
+            return
+        await self._ensure_stream(job_id)
+        await self._ensure_consumer(job_id)
+        self._ensured_jobs.add(job_id)
+
     `@retry_on_connection_error`(max_retries=2, backoff_seconds=0.5)
     async def publish_task(self, job_id: int, data: PipelineProcessingTask) -> bool:
         _, js = await self._get_connection()
-        await self._ensure_stream(job_id)
-        await self._ensure_consumer(job_id)
+        await self._ensure_resources(job_id)
         ...

Comment on lines 186 to 192
try:
loop = asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError(
"get_connection() must be called from an async context with a running event loop. "
"If calling from sync code, use async_to_sync() to wrap the async function."
)
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use raise ... from err in the except clause (B904).

The static analysis hint is valid. This ensures the exception chain is preserved.

Fix
     try:
         loop = asyncio.get_running_loop()
     except RuntimeError:
-        raise RuntimeError(
+        raise RuntimeError(  
             "get_connection() must be called from an async context with a running event loop. "
             "If calling from sync code, use async_to_sync() to wrap the async function."
-        )
+        ) from None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
loop = asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError(
"get_connection() must be called from an async context with a running event loop. "
"If calling from sync code, use async_to_sync() to wrap the async function."
)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError(
"get_connection() must be called from an async context with a running event loop. "
"If calling from sync code, use async_to_sync() to wrap the async function."
) from None
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 189-192: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


[warning] 189-192: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@ami/ml/orchestration/nats_connection.py` around lines 186 - 192, The except
block that catches RuntimeError from asyncio.get_running_loop() should preserve
the original exception chain: capture the caught exception (e.g., "except
RuntimeError as err:") and re-raise the RuntimeError with your existing message
using "raise ... from err" so the original traceback is linked; update the block
inside get_connection()/where get_running_loop() is called to use "except
RuntimeError as err" and "raise RuntimeError(... ) from err".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we are catching the exception and returning our own. Wouldn't the exception chain just add noise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihow You're absolutely right that the chain would add noise here! Using from None (as suggested) explicitly suppresses the exception chain, which is exactly what you want.

Here's why from None is the right choice:

  1. The original exception doesn't add valueasyncio.get_running_loop() just raises "no running event loop", which your custom message already explains more clearly with actionable advice.

  2. Signals intentfrom None tells readers "I'm intentionally replacing this exception, not accidentally swallowing context."

  3. Satisfies the linter — B904 requires explicit chaining syntax (from err or from None) to distinguish intentional replacement from accidental context loss.

Without from None, the linter can't tell if you deliberately chose not to chain or simply forgot. The suggested fix makes your intent explicit while keeping the output clean.


✏️ Learnings added
Learnt from: mihow
Repo: RolnickLab/antenna PR: 1130
File: ami/ml/orchestration/nats_connection.py:186-192
Timestamp: 2026-02-13T23:12:21.720Z
Learning: In Python exception handling, use `raise ... from None` when intentionally replacing an exception with a more informative one where the original exception would add noise. Use `raise ... from err` when the original exception provides valuable debugging context. The B904 linting rule requires explicit chaining syntax to distinguish intentional replacement from accidental context loss.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +253 to +276
try:
# Fetch a single message
msgs = await psub.fetch(1, timeout=timeout)

# Parse the task data into PipelineProcessingTask
task = PipelineProcessingTask(**task_data)
# Set the reply_subject for acknowledgment
task.reply_subject = msg.reply
if msgs:
msg = msgs[0]
task_data = json.loads(msg.data.decode())
metadata = msg.metadata

logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
return task
# Parse the task data into PipelineProcessingTask
task = PipelineProcessingTask(**task_data)
# Set the reply_subject for acknowledgment
task.reply_subject = msg.reply

except nats.errors.TimeoutError:
# No messages available
logger.debug(f"No tasks available in stream for job '{job_id}'")
return None
finally:
# Always unsubscribe
await psub.unsubscribe()
logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
return task

except Exception as e:
logger.error(f"Failed to reserve task from stream for job '{job_id}': {e}")
except nats_errors.TimeoutError:
# No messages available (expected behavior)
logger.debug(f"No tasks available in stream for job '{job_id}'")
return None
finally:
# Always unsubscribe
await psub.unsubscribe()
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

psub.unsubscribe() failure in finally can mask the original exception.

If unsubscribe() raises (e.g., on a broken connection), it will mask whatever error was being propagated. Also note: when fetch returns an empty list (no messages and no timeout), the function falls through the if msgs: block and try without returning, reaching finally and then implicitly returning None. This works but is subtle.

Suggested improvement
         finally:
             # Always unsubscribe
-            await psub.unsubscribe()
+            try:
+                await psub.unsubscribe()
+            except Exception:
+                logger.debug("Failed to unsubscribe pull subscription", exc_info=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
# Fetch a single message
msgs = await psub.fetch(1, timeout=timeout)
# Parse the task data into PipelineProcessingTask
task = PipelineProcessingTask(**task_data)
# Set the reply_subject for acknowledgment
task.reply_subject = msg.reply
if msgs:
msg = msgs[0]
task_data = json.loads(msg.data.decode())
metadata = msg.metadata
logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
return task
# Parse the task data into PipelineProcessingTask
task = PipelineProcessingTask(**task_data)
# Set the reply_subject for acknowledgment
task.reply_subject = msg.reply
except nats.errors.TimeoutError:
# No messages available
logger.debug(f"No tasks available in stream for job '{job_id}'")
return None
finally:
# Always unsubscribe
await psub.unsubscribe()
logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
return task
except Exception as e:
logger.error(f"Failed to reserve task from stream for job '{job_id}': {e}")
except nats_errors.TimeoutError:
# No messages available (expected behavior)
logger.debug(f"No tasks available in stream for job '{job_id}'")
return None
finally:
# Always unsubscribe
await psub.unsubscribe()
try:
# Fetch a single message
msgs = await psub.fetch(1, timeout=timeout)
if msgs:
msg = msgs[0]
task_data = json.loads(msg.data.decode())
metadata = msg.metadata
# Parse the task data into PipelineProcessingTask
task = PipelineProcessingTask(**task_data)
# Set the reply_subject for acknowledgment
task.reply_subject = msg.reply
logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}")
return task
except nats_errors.TimeoutError:
# No messages available (expected behavior)
logger.debug(f"No tasks available in stream for job '{job_id}'")
return None
finally:
# Always unsubscribe
try:
await psub.unsubscribe()
except Exception:
logger.debug("Failed to unsubscribe pull subscription", exc_info=True)
🤖 Prompt for AI Agents
In `@ami/ml/orchestration/nats_queue.py` around lines 253 - 276, The finally block
currently awaits psub.unsubscribe() which can raise and mask earlier exceptions
and also makes the empty-msgs path implicit; change the flow so that after
calling psub.fetch(1, timeout=timeout) you explicitly return None if msgs is
falsy, and move the unsubscribe call into its own try/except that catches and
logs/unwarns errors without suppressing the original exception (i.e., if an
exception is active, call psub.unsubscribe() inside a nested try/except and
re-raise the original exception; if no exception, safely await unsubscribe and
log any unsubscribe errors). Update the block that constructs
PipelineProcessingTask and the handling around psub.fetch/psub.unsubscribe
accordingly so psub.unsubscribe failures do not hide earlier exceptions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @retry_on_connection_error decorator wrapping this method already catches connection-level errors (the likely
failure mode for unsubscribe()) and handles them with reset + backoff. If we suppress unsubscribe errors here, we'd
actually hide a signal that the retry decorator needs to trigger reconnection. The current finally pattern lets
cleanup errors propagate to the right layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

mihow and others added 3 commits February 13, 2026 15:13
The docstring previously implied the pool reused connections across all
async_to_sync() calls. In practice, each async_to_sync() creates a new
event loop, so reuse only happens within a single boundary. Updated to
be explicit about where the pool helps (bulk publishes, batch reserves)
and where it doesn't (single-operation calls like ACKs).

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@ami/ml/orchestration/nats_connection.py`:
- Around line 122-142: The reset() method currently sets self._lock = None which
allows another coroutine to create a new lock while reset is awaiting
self._nc.close(), enabling two coroutines to enter the critical "slow path" in
get_connection(); fix this by removing the line that clears the lock and only
reset connection state (keep self._lock intact), i.e., in reset() clear/close
self._nc and self._js but do not assign self._lock = None so _ensure_lock()
continues to use the same event-loop-bound lock instance.
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_connection.py (1)

85-112: Add is_reconnecting check to prevent discarding clients during automatic reconnection.

The fast-path check on line 87 rejects clients in the RECONNECTING state (is_connected=False), causing them to be cleared and forcibly reconnected instead of allowing nats.py's automatic reconnection logic to complete. This creates redundant TCP connections during temporary outages.

The suggested fix is correct and prevents this regression:

Suggested change
-        if self._nc is not None and self._js is not None and not self._nc.is_closed and self._nc.is_connected:
+        if self._nc is not None and self._js is not None and not self._nc.is_closed and (self._nc.is_connected or self._nc.is_reconnecting):
             return self._nc, self._js

Note: The same pattern exists in the double-check at line 100 and should also be updated for consistency.

Comment on lines +122 to +142
async def reset(self):
"""
Close the current connection and clear all state so the next call to
get_connection() creates a fresh one.

Called by retry_on_connection_error when an operation hits a connection
error (e.g. network blip, NATS restart). The lock is also cleared so it
gets recreated bound to the current event loop.
"""
logger.warning("Resetting NATS connection pool due to connection error")
if self._nc is not None:
try:
if not self._nc.is_closed:
await self._nc.close()
logger.debug("Successfully closed existing NATS connection during reset")
except Exception as e:
# Swallow errors - connection may already be broken
logger.debug(f"Error closing connection during reset (expected): {e}")
self._nc = None
self._js = None
self._lock = None # Clear lock so new one is created for fresh connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

self._lock = None on line 142 can cause concurrent coroutines to bypass mutual exclusion.

When reset() awaits self._nc.close() (line 135), it yields control. If another coroutine calls get_connection() during that yield, _ensure_lock() sees self._lock is None (set on line 142 — or even before, if ordering shifts) and creates a new lock. The original lock held by the first coroutine is now a different object, so two coroutines can enter the "slow path" critical section simultaneously and race to create connections.

Instead of clearing the lock, keep it and only reset the connection state:

Suggested fix
     async def reset(self):
         logger.warning("Resetting NATS connection pool due to connection error")
-        if self._nc is not None:
-            try:
-                if not self._nc.is_closed:
-                    await self._nc.close()
-                    logger.debug("Successfully closed existing NATS connection during reset")
-            except Exception as e:
-                # Swallow errors - connection may already be broken
-                logger.debug(f"Error closing connection during reset (expected): {e}")
-        self._nc = None
-        self._js = None
-        self._lock = None  # Clear lock so new one is created for fresh connection
+        lock = self._ensure_lock()
+        async with lock:
+            if self._nc is not None:
+                try:
+                    if not self._nc.is_closed:
+                        await self._nc.close()
+                        logger.debug("Successfully closed existing NATS connection during reset")
+                except Exception as e:
+                    logger.debug(f"Error closing connection during reset (expected): {e}")
+            self._nc = None
+            self._js = None

The lock is bound to the event loop at creation time. Since reset() runs on the same event loop as get_connection(), there's no cross-loop issue — the same lock instance remains valid throughout the pool's lifetime on that loop.

🧰 Tools
🪛 Ruff (0.15.0)

[warning] 137-137: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@ami/ml/orchestration/nats_connection.py` around lines 122 - 142, The reset()
method currently sets self._lock = None which allows another coroutine to create
a new lock while reset is awaiting self._nc.close(), enabling two coroutines to
enter the critical "slow path" in get_connection(); fix this by removing the
line that clears the lock and only reset connection state (keep self._lock
intact), i.e., in reset() clear/close self._nc and self._js but do not assign
self._lock = None so _ensure_lock() continues to use the same event-loop-bound
lock instance.

The _setup_mock_nats helper was configuring TaskQueueManager as an async
context manager (__aenter__/__aexit__), but _ack_task_via_nats uses plain
instantiation. The await on a non-awaitable MagicMock failed silently in
the except clause, causing acknowledge_task assertions to always fail.

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants