UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849
UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849harini-venkataraman wants to merge 88 commits intomainfrom
Conversation
Conflicts resolved: - docker-compose.yaml: Use main's dedicated dashboard_metric_events queue for worker-metrics - PromptCard.jsx: Keep tool_id matching condition from our async socket feature - PromptRun.jsx: Merge useEffect import from main with our branch - ToolIde.jsx: Keep fire-and-forget socket approach (spinner waits for socket event) - SocketMessages.js: Keep both session-store and socket-custom-tool imports + updateCusToolMessages dep - SocketContext.js: Keep simpler path-based socket connection approach - usePromptRun.js: Keep Celery fire-and-forget with socket delivery over polling - setupProxy.js: Accept main's deletion (migrated to Vite)
for more information, see https://pre-commit.ci
… into feat/execution-backend
for more information, see https://pre-commit.ci
… into feat/execution-backend
Greptile SummaryThis PR introduces a substantial architectural shift for Prompt Studio IDE: replacing blocking HTTP-based tool execution with an async Celery + Socket.IO pipeline. The core design — Key findings:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant FE as Frontend (PromptRun.jsx)
participant DV as Django View (views.py)
participant PSH as PromptStudioHelper
participant RMQ as RabbitMQ
participant EW as Executor Worker (execute_extraction)
participant LX as LegacyExecutor
participant CBW as Callback Worker (prompt_studio_callback)
participant SIO as Socket.IO
FE->>DV: POST /fetch_response/{tool_id}
DV->>PSH: build_fetch_response_payload()
Note over PSH: ORM loads, extraction, indexing (blocking)
PSH-->>DV: (ExecutionContext, cb_kwargs)
DV->>DV: pre-generate executor_task_id
DV->>RMQ: send_task("execute_extraction", link=ide_prompt_complete, task_id=executor_task_id)
DV-->>FE: HTTP 202 {task_id, run_id, status:"accepted"}
RMQ->>EW: execute_extraction(context_dict)
EW->>LX: ExecutionOrchestrator.execute(context)
LX->>LX: _handle_answer_prompt() → RetrievalService → LLM
LX-->>EW: ExecutionResult
EW-->>RMQ: result.to_dict() [stores in PG result backend]
RMQ->>CBW: ide_prompt_complete(result_dict, callback_kwargs)
CBW->>CBW: OutputManagerHelper.handle_prompt_output_update()
CBW->>SIO: emit "prompt_studio_result" to log_events_id room
SIO-->>FE: prompt_studio_result event
FE->>FE: usePromptStudioSocket.onResult()
FE->>FE: updatePromptOutputState(data)
FE->>FE: clearResultStatuses(data) [remove spinners]
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 484-491
Comment:
**`AttributeError` on `None` default profile in `_resolve_llm_ids`**
When `tool.monitor_llm` or `tool.challenge_llm` is `None` (not explicitly set), the code falls through to `ProfileManager.get_default_llm_profile(tool)`. If that also returns `None` (no default profile configured), the next line `str(dp.llm.id)` immediately raises `AttributeError: 'NoneType' object has no attribute 'llm'`. This surfaces as an opaque 500 error rather than the expected `DefaultProfileError`.
This can happen when a user creates a Prompt Studio tool, sets a per-prompt profile manager, but has never configured a default tool-level profile AND has not set explicit `monitor_llm`/`challenge_llm` adapters.
Note that `_resolve_llm_ids` is called in `build_fetch_response_payload` **before** the `if not profile_manager: raise DefaultProfileError()` guard (line 536), so a missing default profile causes an `AttributeError` that bypasses the intended error handling entirely.
```suggestion
if monitor_llm_instance:
monitor_llm = str(monitor_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
monitor_llm = str(dp.llm.id)
if challenge_llm_instance:
challenge_llm = str(challenge_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
challenge_llm = str(dp.llm.id)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/tasks.py
Line: 74-94
Comment:
**`_emit_error` missing `_json_safe()` serialization**
`_emit_result` wraps its payload through `_json_safe()` to handle `uuid.UUID` and `datetime` objects before emitting via Socket.IO. `_emit_error` has no equivalent sanitization — it calls `_emit_websocket_event` directly with raw data.
In practice the current callers store only strings in `extra` (e.g. from `cb.get("document_id", "")`), so this is latent rather than immediately broken. However, future callers could pass ORM values or UUIDs into `extra`, causing `json.dumps` inside `_emit_websocket_event` to raise a `TypeError` and leave the frontend without any error feedback at all.
```suggestion
if extra:
data.update(extra)
_emit_websocket_event(
room=log_events_id,
event=PROMPT_STUDIO_RESULT_EVENT,
data=_json_safe(data),
)
```
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: "Merge branch 'main' ..." |
backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Outdated
Show resolved
Hide resolved
Greptile SummaryThis PR introduces a pluggable async executor framework that replaces blocking Django HTTP calls to the prompt-service with a fire-and-forget Celery dispatch chain ( Key issues found during review:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant FE as Frontend (PromptRun.jsx)
participant DJV as Django View (views.py)
participant PSH as PromptStudioHelper
participant DISP as ExecutionDispatcher
participant RMQ as RabbitMQ
participant EW as Executor Worker (execute_extraction)
participant LE as LegacyExecutor
participant CBW as Callback Worker (prompt_studio_callback)
participant SIO as Socket.IO
participant FEH as Frontend (usePromptStudioSocket)
FE->>DJV: POST /fetch_response/{tool_id}
DJV->>PSH: build_fetch_response_payload() [ORM + extract + index - blocking]
PSH-->>DJV: (ExecutionContext, cb_kwargs)
DJV->>DISP: dispatch_with_callback(context, on_success=ide_prompt_complete, on_error=ide_prompt_error)
DISP->>RMQ: send_task("execute_extraction", queue="celery_executor_legacy")
DJV-->>FE: HTTP 202 {task_id, run_id, status:"accepted"}
RMQ->>EW: execute_extraction(context_dict)
EW->>LE: LegacyExecutor.execute(context)
LE-->>EW: ExecutionResult
EW-->>RMQ: result.to_dict() [Celery link callback triggered]
RMQ->>CBW: ide_prompt_complete(result_dict, callback_kwargs)
CBW->>CBW: OutputManagerHelper.handle_prompt_output_update() [ORM write]
CBW->>SIO: _emit_websocket_event(room=log_events_id, event="prompt_studio_result")
SIO-->>FEH: "prompt_studio_result" {status:"completed", operation, result}
FEH->>FEH: handleCompleted() → updatePromptOutputState() + clearResultStatuses()
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 364-595
Comment:
**Missing feature flag gate on async endpoints**
The PR description states that all three IDE endpoints (`index_document`, `fetch_response`, `single_pass_extraction`) are gated behind the `async_prompt_execution` Flipt feature flag, with the old synchronous path preserved as a fallback when the flag is `OFF`. However, none of the three view methods contain any feature flag check — they unconditionally invoke the async/Celery path and return HTTP 202.
This means the breaking architectural change (fire-and-forget + Socket.IO result delivery) is deployed for **all users** regardless of the feature flag, directly contradicting the safe-rollout strategy described in the PR. When `async_prompt_execution=false`, users would still receive HTTP 202 with no result, because the old synchronous code path is never reached.
The sync fallback (e.g. delegating to the old `run_index_document` / `run_fetch_response` / `run_single_pass_extraction` Celery tasks or the direct helper methods) should be invoked when the flag is off.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 597-629
Comment:
**`task_status` lacks task-ownership verification (IDOR risk)**
The endpoint looks up `task_id` directly in the Celery result backend without verifying that the task belongs to the tool identified by `pk`. A user who has legitimate access to any Prompt Studio tool can supply an arbitrary `task_id` from a different tool/user's execution and retrieve that execution's `result` (the full `ExecutionResult` dict, which may contain extracted document data).
For example:
```
GET /prompt-studio/<my_tool_pk>/task-status/<other_users_task_id>
```
The permission check only validates access to `pk` (via `IsOwnerOrSharedUserOrSharedToOrg`), not whether `task_id` was produced by operations on that tool.
Consider either (a) storing a `(tool_id, task_id)` mapping server-side and validating the lookup, or (b) returning only the task's `status` from this endpoint (omitting the full `result` payload, since the real result is already delivered via Socket.IO).
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 401-403
Comment:
**Redundant `import uuid as _uuid` inside method bodies**
`uuid` is already imported at the module level (line 2). The three identical inner imports (`import uuid as _uuid` in `index_document`, `fetch_response`, and `single_pass_extraction`) are redundant. Simply use the already-imported `uuid.uuid4()`.
```suggestion
executor_task_id = str(uuid.uuid4())
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 520-532
Comment:
**Null guard after the variable is already dereferenced**
`validate_adapter_status(profile_manager)` and `validate_profile_manager_owner_access(profile_manager)` are both called **before** the `if not profile_manager` guard. If `profile_manager` is `None` (e.g. when `prompt.profile_manager` is unset and no `profile_manager_id` is passed), those helper calls will raise an `AttributeError` inside them, not the intended `DefaultProfileError`. The guard at line 531–532 is effectively dead code for the `None` case.
The null check should be moved to immediately after `profile_manager` is resolved:
```python
profile_manager = prompt.profile_manager
if profile_manager_id:
profile_manager = ProfileManagerHelper.get_profile_manager(
profile_manager_id=profile_manager_id
)
if not profile_manager:
raise DefaultProfileError()
# Only then call validators
PromptStudioHelper.validate_adapter_status(profile_manager)
PromptStudioHelper.validate_profile_manager_owner_access(profile_manager)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 733-747
Comment:
**Null guard on `default_profile` comes after it is already used**
`default_profile.chunk_size = 0` mutates the object **before** the `if not default_profile: raise DefaultProfileError()` check. If `ProfileManager.get_default_llm_profile(tool)` returns `None`, the assignment at line 744 would raise `AttributeError` rather than the intended `DefaultProfileError`. The guard is dead code for the `None` case.
Move the null check to immediately after `default_profile` is assigned (before the validators and the `chunk_size` assignment):
```python
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
default_profile.chunk_size = 0
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 85-92
Comment:
**Configured queue name `"executor"` doesn't match the actual dispatch queue**
`get_worker_celery_app()` registers `task_queues=[Queue("executor")]`, but `ExecutionDispatcher._get_queue()` (in `sdk1/execution/dispatcher.py`) constructs the actual queue name as `celery_executor_{executor_name}` — for the legacy executor this becomes `"celery_executor_legacy"`.
The queue declared on the app (`"executor"`) never matches the queue used by `send_task`, so this `task_queues` setting has no practical effect. While `send_task` with an explicit `queue` parameter bypasses queue routing and the task is delivered correctly, the misconfigured `task_queues` setting means any queue-routing policies (e.g. prefetch limits, fair scheduling) configured on `"executor"` will not apply.
Either align the queue name to `"celery_executor_legacy"` (or the appropriate prefix), or remove the stale `task_queues` declaration from this app's config if it is intentionally unused.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 4200ac1 |
backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Outdated
Show resolved
Hide resolved
| # Check if highlight data should be removed using configuration registry | ||
| # Ensure workflow identification keys are always in item metadata | ||
| organization = api.organization if api else None | ||
| org_id = str(organization.organization_id) if organization else "" |
There was a problem hiding this comment.
I don’t think this should be allowed when the organization is missing. Also how it works with an empty org_id?
cc: @vishnuszipstack
| ) -> None: | ||
| """Inject per-model usage breakdown into item['result']['metadata'].""" | ||
| inner_result = item.get("result") | ||
| if not isinstance(inner_result, dict): |
There was a problem hiding this comment.
NIT: improve/enhance class ExecutionResponse by adding a dto for result
| ) | ||
| return APIExecutionResponseSerializer(result).data | ||
|
|
||
| @staticmethod |
There was a problem hiding this comment.
Hope there is no structure change of result here .. Can you please add the model/sample in descreption. or along the class ExecutionResponse
| _worker_app: Celery | None = None | ||
|
|
||
|
|
||
| class _WorkerDispatchCelery(Celery): |
There was a problem hiding this comment.
Why celery here ? We already moved it from backend . What this methods do here?
There was a problem hiding this comment.
@muhammad-ali-e The backend Celery worker handles fire-and-forget callback tasks that run after the executor worker finishes. Here's the flow:
Backend dispatches task → Executor Worker (does the heavy lifting)
↓ (Celery link/link_error)
Backend Callback Task (lightweight)
├── ORM writes (persist results to DB)
└── WebSocket push (notify frontend in real-time)
Why these run on the backend (not the executor worker):
- They need Django ORM access (database models, services) — the executor worker doesn't have Django loaded
- They need the Socket.IO emitter to push real-time updates to the frontend
- They're lightweight — just DB writes + WebSocket emit, no heavy computation
- Keeps the executor worker stateless and focused on execution only
| the action. | ||
| """ | ||
| profile_manager_owner = profile_manager.created_by | ||
| if profile_manager_owner is None: |
There was a problem hiding this comment.
is this created_by a default value , right? when will it be None?
| global _worker_app | ||
| if _worker_app is not None: | ||
| return _worker_app |
There was a problem hiding this comment.
Unsynchronised singleton initialisation — race condition under concurrent requests
get_worker_celery_app() uses the classic double-check-without-lock pattern:
if _worker_app is not None:
return _worker_appUnder gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see _worker_app is None and both proceed to create a new _WorkerDispatchCelery instance. The second assignment overwrites the first (last-writer-wins), so each thread may end up holding a reference to a different object than what ends up in the module global. This is benign in practice because both instances are configured identically, but it is wasteful and could cause subtle issues if Celery connection pools are per-instance.
The idiomatic Python fix is to use a module-level lock:
import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_appPrompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 61-63
Comment:
**Unsynchronised singleton initialisation — race condition under concurrent requests**
`get_worker_celery_app()` uses the classic double-check-without-lock pattern:
```python
if _worker_app is not None:
return _worker_app
```
Under gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see `_worker_app is None` and both proceed to create a new `_WorkerDispatchCelery` instance. The second assignment overwrites the first (last-writer-wins), so each thread may end up holding a reference to a *different* object than what ends up in the module global. This is benign in practice because both instances are configured identically, but it is wasteful and could cause subtle issues if Celery connection pools are per-instance.
The idiomatic Python fix is to use a module-level lock:
```python
import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_app
```
How can I resolve this? If you propose a fix, please make it concise.
Code reviewFound 1 issue:
unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 364 to 424 in 1c58eb9 unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 426 to 505 in 1c58eb9 unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 507 to 596 in 1c58eb9 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
| def _is_safe_public_url(url: str) -> bool: | ||
| """Validate webhook URL for SSRF protection. | ||
|
|
||
| Only allows HTTPS and blocks private/loopback/internal addresses. | ||
| """ | ||
| try: | ||
| p = urlparse(url) | ||
| if p.scheme not in ("https",): | ||
| return False | ||
| host = p.hostname or "" | ||
| if host in ("localhost",): | ||
| return False | ||
|
|
||
| addrs = _resolve_host_addresses(host) | ||
| if not addrs: | ||
| return False | ||
|
|
||
| for addr in addrs: | ||
| try: | ||
| ip = ipaddress.ip_address(addr) | ||
| except ValueError: | ||
| return False | ||
| if ( | ||
| ip.is_private | ||
| or ip.is_loopback | ||
| or ip.is_link_local | ||
| or ip.is_reserved | ||
| or ip.is_multicast | ||
| ): | ||
| return False | ||
| return True | ||
| except Exception: | ||
| return False |
There was a problem hiding this comment.
DNS rebinding (TOCTOU) bypasses SSRF protection
_is_safe_public_url resolves the webhook hostname via DNS at validation time, but the actual HTTP request to the webhook (via postprocess_data) happens afterwards. An attacker who controls a DNS server can perform a DNS rebinding attack:
- During validation:
attacker.com→1.2.3.4(a public IP) — validation passes. - Attacker flips DNS TTL to 0 and rebinds
attacker.com→169.254.169.254(AWS metadata),10.0.0.1(internal service), etc. - During the actual HTTP request: the OS re-resolves
attacker.comand connects to the internal address.
The current is_private / is_loopback / etc. checks are bypassed entirely because they only guard the validation-time resolution. This is a real risk in a multi-tenant environment where arbitrary webhook URLs can be registered.
The standard mitigation is to make the HTTP request through a socket-level wrapper that re-validates the resolved IP at connection time — for example by overriding the socket's connect() to check the destination address immediately before the connection is established, or by using a library such as urllib3-pyOpenSSL with a custom ProxyManager, or setting a fixed DNS-resolved IP on the requests session host header.
Prompt To Fix With AI
This is a comment left during a code review.
Path: workers/executor/executors/answer_prompt.py
Line: 44-76
Comment:
**DNS rebinding (TOCTOU) bypasses SSRF protection**
`_is_safe_public_url` resolves the webhook hostname via DNS at *validation time*, but the actual HTTP request to the webhook (via `postprocess_data`) happens *afterwards*. An attacker who controls a DNS server can perform a DNS rebinding attack:
1. During validation: `attacker.com` → `1.2.3.4` (a public IP) — validation passes.
2. Attacker flips DNS TTL to 0 and rebinds `attacker.com` → `169.254.169.254` (AWS metadata), `10.0.0.1` (internal service), etc.
3. During the actual HTTP request: the OS re-resolves `attacker.com` and connects to the internal address.
The current `is_private` / `is_loopback` / etc. checks are bypassed entirely because they only guard the validation-time resolution. This is a real risk in a multi-tenant environment where arbitrary webhook URLs can be registered.
The standard mitigation is to make the HTTP request through a socket-level wrapper that re-validates the resolved IP *at connection time* — for example by overriding the socket's `connect()` to check the destination address immediately before the connection is established, or by using a library such as [urllib3-pyOpenSSL](https://github.com/urllib3/urllib3) with a custom `ProxyManager`, or setting a fixed DNS-resolved IP on the `requests` session host header.
How can I resolve this? If you propose a fix, please make it concise.Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com>
for more information, see https://pre-commit.ci
|
@claude review |
| DocumentIndexingService.set_document_indexing( | ||
| org_id=org_id, user_id=user_id, doc_id_key=doc_id_key | ||
| ) | ||
|
|
There was a problem hiding this comment.
set_document_indexing not rolled back on broker failure
DocumentIndexingService.set_document_indexing(...) is called at the end of build_index_payload (before dispatch_with_callback is called in the view). If dispatch_with_callback subsequently raises — for example, because the broker is unavailable, the Celery app is not configured, or any other exception — the document is permanently left in the "indexing in progress" state. The ide_index_error errback is never invoked because no task was dispatched, so there is no mechanism to clear the stuck flag.
The view code does not wrap dispatch_with_callback in a try/except that would call DocumentIndexingService.remove_document_indexing(...) on failure. The result is that the user sees an infinite "indexing in progress" indicator and cannot re-index the document without manual DB intervention.
To fix this, either:
- Move
set_document_indexingto after successful dispatch (wrap the dispatch, set the flag only on success), or - Add a
try/exceptarounddispatch_with_callbackin the view that callsremove_document_indexingon error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 392-395
Comment:
**`set_document_indexing` not rolled back on broker failure**
`DocumentIndexingService.set_document_indexing(...)` is called at the end of `build_index_payload` (before `dispatch_with_callback` is called in the view). If `dispatch_with_callback` subsequently raises — for example, because the broker is unavailable, the Celery app is not configured, or any other exception — the document is permanently left in the "indexing in progress" state. The `ide_index_error` errback is never invoked because no task was dispatched, so there is no mechanism to clear the stuck flag.
The view code does not wrap `dispatch_with_callback` in a try/except that would call `DocumentIndexingService.remove_document_indexing(...)` on failure. The result is that the user sees an infinite "indexing in progress" indicator and cannot re-index the document without manual DB intervention.
To fix this, either:
1. Move `set_document_indexing` to after successful dispatch (wrap the dispatch, set the flag only on success), or
2. Add a `try/except` around `dispatch_with_callback` in the view that calls `remove_document_indexing` on error.
How can I resolve this? If you propose a fix, please make it concise.| cb_kwargs = { | ||
| "log_events_id": log_events_id, | ||
| "request_id": request_id, | ||
| "org_id": org_id, | ||
| "operation": "single_pass_extraction", | ||
| "run_id": run_id, | ||
| "document_id": document_id, | ||
| "tool_id": tool_id, | ||
| "prompt_ids": [str(p.prompt_id) for p in prompts], | ||
| "is_single_pass": True, | ||
| } |
There was a problem hiding this comment.
Missing
profile_manager_id in single_pass_extraction cb_kwargs
build_single_pass_payload does not include profile_manager_id in its cb_kwargs (lines 855–865). When ide_prompt_complete processes this callback it reads:
profile_manager_id = cb.get("profile_manager_id") # always None for single-passand passes profile_manager_id=None to OutputManagerHelper.handle_prompt_output_update. Depending on how that helper uses the field, single-pass outputs may not be correctly associated with the profile manager, producing a different storage behavior than the fetch_response path (which always passes the explicit profile_manager_id).
More concretely, when ide_prompt_error fires for a single-pass failure, the emitted error event also lacks profile_manager_id. The frontend's handleFailed falls through to the broad clearPromptStatusById(promptId) fallback, which clears ALL doc/profile status combinations for those prompts — not just the one that was actually running. This means an error in one single-pass run would cancel the loading spinners for unrelated concurrent runs.
Consider adding the default profile's profile_id to cb_kwargs:
cb_kwargs = {
...
"profile_manager_id": str(default_profile.profile_id),
...
}Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 855-865
Comment:
**Missing `profile_manager_id` in `single_pass_extraction` `cb_kwargs`**
`build_single_pass_payload` does not include `profile_manager_id` in its `cb_kwargs` (lines 855–865). When `ide_prompt_complete` processes this callback it reads:
```python
profile_manager_id = cb.get("profile_manager_id") # always None for single-pass
```
and passes `profile_manager_id=None` to `OutputManagerHelper.handle_prompt_output_update`. Depending on how that helper uses the field, single-pass outputs may not be correctly associated with the profile manager, producing a different storage behavior than the `fetch_response` path (which always passes the explicit `profile_manager_id`).
More concretely, when `ide_prompt_error` fires for a single-pass failure, the emitted error event also lacks `profile_manager_id`. The frontend's `handleFailed` falls through to the broad `clearPromptStatusById(promptId)` fallback, which clears ALL doc/profile status combinations for those prompts — not just the one that was actually running. This means an error in one single-pass run would cancel the loading spinners for unrelated concurrent runs.
Consider adding the default profile's `profile_id` to `cb_kwargs`:
```python
cb_kwargs = {
...
"profile_manager_id": str(default_profile.profile_id),
...
}
```
How can I resolve this? If you propose a fix, please make it concise.| const onResult = useCallback( | ||
| (payload) => { | ||
| try { | ||
| const msg = payload?.data || payload; | ||
| const { status, operation, result, error, ...extra } = msg; | ||
|
|
||
| if (status === "completed") { | ||
| handleCompleted(operation, result); | ||
| } else if (status === "failed") { | ||
| handleFailed(operation, error, extra); | ||
| } | ||
| } catch (err) { | ||
| setAlertDetails( | ||
| handleException(err, "Failed to process prompt studio result"), | ||
| ); | ||
| } | ||
| }, | ||
| [handleCompleted, handleFailed, setAlertDetails, handleException], | ||
| ); |
There was a problem hiding this comment.
Socket result event not scoped to the current tool — multi-tab state corruption
prompt_studio_result events are emitted to the log_events_id Socket.IO room, which is per-user-session, not per-tool or per-tab. If a user has two Prompt Studio tools open simultaneously in separate tabs (both sharing the same Socket.IO connection and log_events_id), a result from Tool A's execution will be received and processed by Tab B's usePromptStudioSocket listener as well.
In handleCompleted("fetch_response", result):
updatePromptOutputState(data, false); // writes Tool A's outputs into Tab B's store
clearResultStatuses(data); // tries to clear statuses using Tool A's prompt IDsupdatePromptOutputState in Tab B would overwrite prompt output state with data belonging to Tool A's prompts. This can cause phantom outputs to appear under the wrong tool and leave Tab B in an inconsistent state.
The socket event payload (_emit_result in tasks.py) does not include a tool_id field, so the frontend has no way to discard irrelevant events. Consider adding tool_id (or custom_tool_id) to the emitted payload and filtering it in onResult:
const onResult = useCallback((payload) => {
const msg = payload?.data || payload;
if (msg.tool_id && msg.tool_id !== details?.tool_id) return; // ignore events for other tools
...
}, [..., details?.tool_id]);Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptStudioSocket.js
Line: 131-149
Comment:
**Socket result event not scoped to the current tool — multi-tab state corruption**
`prompt_studio_result` events are emitted to the `log_events_id` Socket.IO room, which is per-user-session, not per-tool or per-tab. If a user has two Prompt Studio tools open simultaneously in separate tabs (both sharing the same Socket.IO connection and `log_events_id`), a result from Tool A's execution will be received and processed by Tab B's `usePromptStudioSocket` listener as well.
In `handleCompleted("fetch_response", result)`:
```js
updatePromptOutputState(data, false); // writes Tool A's outputs into Tab B's store
clearResultStatuses(data); // tries to clear statuses using Tool A's prompt IDs
```
`updatePromptOutputState` in Tab B would overwrite prompt output state with data belonging to Tool A's prompts. This can cause phantom outputs to appear under the wrong tool and leave Tab B in an inconsistent state.
The socket event payload (`_emit_result` in `tasks.py`) does not include a `tool_id` field, so the frontend has no way to discard irrelevant events. Consider adding `tool_id` (or `custom_tool_id`) to the emitted payload and filtering it in `onResult`:
```js
const onResult = useCallback((payload) => {
const msg = payload?.data || payload;
if (msg.tool_id && msg.tool_id !== details?.tool_id) return; // ignore events for other tools
...
}, [..., details?.tool_id]);
```
How can I resolve this? If you propose a fix, please make it concise.| const clearResultStatuses = useCallback( | ||
| (data) => { | ||
| if (!Array.isArray(data)) { | ||
| return; | ||
| } | ||
| data.forEach((item) => { | ||
| const promptId = item?.prompt_id; | ||
| const docId = item?.document_manager; | ||
| const profileId = item?.profile_manager; | ||
| if (promptId && docId && profileId) { | ||
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| removePromptStatus(promptId, statusKey); | ||
| } | ||
| }); | ||
| }, | ||
| [removePromptStatus], | ||
| ); |
There was a problem hiding this comment.
clearResultStatuses spinner-clearing may permanently fail
clearResultStatuses derives the status key from item.profile_manager on the result data items. The status was originally stored using a profileId taken directly from the queue item string — a raw UUID string. For clearResultStatuses to match and call removePromptStatus, item.profile_manager in the result data must be the exact same UUID string.
If OutputManagerHelper.handle_prompt_output_update returns serialized objects where profile_manager is an integer PK, a nested object, or null, the condition if (promptId && docId && profileId) will be false, removePromptStatus will never be called, and the loading spinner for the prompt will remain active forever. The user would be unable to re-run the prompt without a page refresh.
The old polling path avoided this by explicitly removing the status with the IDs already available in the callback closure. The new socket path has no such explicit fallback.
Consider including prompt_ids, document_id, and profile_manager_id in the socket event payload (they are already present in cb_kwargs) so the frontend can always do a direct cleanup regardless of the result data format, rather than relying on parsing the ORM-serialized result items.
Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptStudioSocket.js
Line: 28-44
Comment:
**`clearResultStatuses` spinner-clearing may permanently fail**
`clearResultStatuses` derives the status key from `item.profile_manager` on the result data items. The status was originally stored using a `profileId` taken directly from the queue item string — a raw UUID string. For `clearResultStatuses` to match and call `removePromptStatus`, `item.profile_manager` in the result data must be the exact same UUID string.
If `OutputManagerHelper.handle_prompt_output_update` returns serialized objects where `profile_manager` is an integer PK, a nested object, or `null`, the condition `if (promptId && docId && profileId)` will be false, `removePromptStatus` will never be called, and the loading spinner for the prompt will remain active forever. The user would be unable to re-run the prompt without a page refresh.
The old polling path avoided this by explicitly removing the status with the IDs already available in the callback closure. The new socket path has no such explicit fallback.
Consider including `prompt_ids`, `document_id`, and `profile_manager_id` in the socket event payload (they are already present in `cb_kwargs`) so the frontend can always do a direct cleanup regardless of the result data format, rather than relying on parsing the ORM-serialized result items.
How can I resolve this? If you propose a fix, please make it concise.…t drift
- Remove redundant inline `import uuid as _uuid` in views.py (use module-level uuid)
- URL-encode DB_USER in worker_celery.py result backend connection string
- Remove misleading task_queues=[Queue("executor")] from dispatch-only Celery app
- Remove dead `if not tool:` guards after objects.get() (already raises DoesNotExist)
- Move profile_manager/default_profile null checks before first dereference
- Reorder ProfileManager.objects.get before mark_document_indexed in tasks.py
- Handle ProfileManager.DoesNotExist as warning, not hard failure
- Wrap PostHog analytics in try/catch so failures don't block prompt execution
- Handle pending-indexing 200 response in usePromptRun.js (clear RUNNING status)
- Reset formData when metadata is missing in ConfigureDs.jsx
- Fix test_should_skip_extraction tests: function now takes 1 arg (outputs only)
- Fix agentic routing tests: mock X2Text.process, remove stale platform_helper kwarg
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| removePromptStatus(promptId, statusKey); | ||
| setAlertDetails({ | ||
| type: "info", | ||
| content: | ||
| res?.data?.message || "Document is being indexed. Please wait.", | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| // Timeout safety net: clear stale status if socket event never arrives. | ||
| setTimeout(() => { | ||
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| const current = usePromptRunStatusStore.getState().promptRunStatus; | ||
| if ( |
There was a problem hiding this comment.
5-minute timeout can falsely cancel a later re-run of the same prompt
The setTimeout closure captures promptId, docId, and profileId from run N. When it fires 5 minutes later it reads the current store state and checks whether current?.[promptId]?.[statusKey] === PROMPT_RUN_API_STATUSES.RUNNING. If the user triggered another run of the same (promptId, docId, profileId) combination (run N+1) within that 5-minute window, the timer from run N will see run N+1's RUNNING state, clear it, and display a spurious "timed out" warning — even though run N+1 may still be processing and will later receive a valid socket event. The result is a permanently stuck spinner for run N+1 (status cleared by the stale timer) while the socket result arrives and tries to call clearResultStatuses on an already-removed entry.
Mitigation: tag each dispatch with a unique runId and include it in the status store key, so the timeout only clears the specific run it corresponds to:
const runNonce = generateUUID();
addPromptStatus(promptId, statusKey, PROMPT_RUN_API_STATUSES.RUNNING, runNonce);
setTimeout(() => {
const current = usePromptRunStatusStore.getState().promptRunStatus;
if (current?.[promptId]?.[statusKey]?.nonce === runNonce) {
removePromptStatus(promptId, statusKey);
setAlertDetails({ type: "warning", content: "Prompt execution timed out. Please try again." });
}
}, SOCKET_TIMEOUT_MS);Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptRun.js
Line: 56-70
Comment:
**5-minute timeout can falsely cancel a later re-run of the same prompt**
The `setTimeout` closure captures `promptId`, `docId`, and `profileId` from run N. When it fires 5 minutes later it reads the *current* store state and checks whether `current?.[promptId]?.[statusKey] === PROMPT_RUN_API_STATUSES.RUNNING`. If the user triggered another run of the same `(promptId, docId, profileId)` combination (run N+1) within that 5-minute window, the timer from run N will see run N+1's `RUNNING` state, clear it, and display a spurious "timed out" warning — even though run N+1 may still be processing and will later receive a valid socket event. The result is a permanently stuck spinner for run N+1 (status cleared by the stale timer) while the socket result arrives and tries to call `clearResultStatuses` on an already-removed entry.
Mitigation: tag each dispatch with a unique `runId` and include it in the status store key, so the timeout only clears the *specific* run it corresponds to:
```js
const runNonce = generateUUID();
addPromptStatus(promptId, statusKey, PROMPT_RUN_API_STATUSES.RUNNING, runNonce);
setTimeout(() => {
const current = usePromptRunStatusStore.getState().promptRunStatus;
if (current?.[promptId]?.[statusKey]?.nonce === runNonce) {
removePromptStatus(promptId, statusKey);
setAlertDetails({ type: "warning", content: "Prompt execution timed out. Please try again." });
}
}, SOCKET_TIMEOUT_MS);
```
How can I resolve this? If you propose a fix, please make it concise.| ) | ||
| return str(platform_key.key) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Phase 5B — Payload builders for fire-and-forget dispatch | ||
| # ------------------------------------------------------------------ |
There was a problem hiding this comment.
default_profile dereferenced before null guard in build_index_payload
build_index_payload calls ProfileManager.get_default_llm_profile(tool), then immediately passes the result to validate_adapter_status and validate_profile_manager_owner_access without any null check. If no default LLM profile is configured for the tool, get_default_llm_profile returns None and both validators will raise AttributeError deep inside the helper, surfacing as an opaque 500 error instead of the intended DefaultProfileError.
The same defensiveness present in build_single_pass_payload (if not default_profile: raise DefaultProfileError()) should be applied here:
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 297-302
Comment:
**`default_profile` dereferenced before null guard in `build_index_payload`**
`build_index_payload` calls `ProfileManager.get_default_llm_profile(tool)`, then immediately passes the result to `validate_adapter_status` and `validate_profile_manager_owner_access` without any null check. If no default LLM profile is configured for the tool, `get_default_llm_profile` returns `None` and both validators will raise `AttributeError` deep inside the helper, surfacing as an opaque 500 error instead of the intended `DefaultProfileError`.
The same defensiveness present in `build_single_pass_payload` (`if not default_profile: raise DefaultProfileError()`) should be applied here:
```python
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
```
How can I resolve this? If you propose a fix, please make it concise.|
|
||
| Raises: | ||
| FilenameMissingError: _description_ | ||
| Args: | ||
| request (HttpRequest) | ||
|
|
||
| Returns: | ||
| Response | ||
| """ | ||
| custom_tool = self.get_object() | ||
| tool_id: str = str(custom_tool.tool_id) | ||
| document_id: str = request.data.get(ToolStudioPromptKeys.DOCUMENT_ID) | ||
| id: str = request.data.get(ToolStudioPromptKeys.ID) | ||
| prompt_id: str = request.data.get(ToolStudioPromptKeys.ID) | ||
| run_id: str = request.data.get(ToolStudioPromptKeys.RUN_ID) | ||
| profile_manager: str = request.data.get(ToolStudioPromptKeys.PROFILE_MANAGER_ID) | ||
| profile_manager_id: str = request.data.get( | ||
| ToolStudioPromptKeys.PROFILE_MANAGER_ID | ||
| ) |
There was a problem hiding this comment.
HubSpot first-run analytics event silently dropped in async path
The old sync fetch_response path tracked output_count_before and called notify_hubspot_event(user, "PROMPT_RUN", is_first_for_org=..., ...) to fire a business analytics event on the first prompt run for an organisation. The new async path removes both the count query and the notification call entirely with no comment or TODO.
If this is intentional (e.g., to be re-added once the async path is stable), a comment noting this would prevent it from being permanently lost. If it is unintentional, first-run HubSpot events will silently stop firing for any organisation that has async_prompt_execution enabled, skewing adoption metrics.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 430-443
Comment:
**HubSpot first-run analytics event silently dropped in async path**
The old sync `fetch_response` path tracked `output_count_before` and called `notify_hubspot_event(user, "PROMPT_RUN", is_first_for_org=..., ...)` to fire a business analytics event on the first prompt run for an organisation. The new async path removes both the count query and the notification call entirely with no comment or TODO.
If this is intentional (e.g., to be re-added once the async path is stable), a comment noting this would prevent it from being permanently lost. If it is unintentional, first-run HubSpot events will silently stop firing for any organisation that has `async_prompt_execution` enabled, skewing adoption metrics.
How can I resolve this? If you propose a fix, please make it concise.
Frontend Lint Report (Biome)✅ All checks passed! No linting or formatting issues found. |
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
| dp = ProfileManager.get_default_llm_profile(tool) | ||
| monitor_llm = str(dp.llm.id) | ||
| if challenge_llm_instance: | ||
| challenge_llm = str(challenge_llm_instance.id) | ||
| else: | ||
| dp = ProfileManager.get_default_llm_profile(tool) | ||
| challenge_llm = str(dp.llm.id) | ||
| return monitor_llm, challenge_llm |
There was a problem hiding this comment.
AttributeError on None default profile in _resolve_llm_ids
When tool.monitor_llm or tool.challenge_llm is None (not explicitly set), the code falls through to ProfileManager.get_default_llm_profile(tool). If that also returns None (no default profile configured), the next line str(dp.llm.id) immediately raises AttributeError: 'NoneType' object has no attribute 'llm'. This surfaces as an opaque 500 error rather than the expected DefaultProfileError.
This can happen when a user creates a Prompt Studio tool, sets a per-prompt profile manager, but has never configured a default tool-level profile AND has not set explicit monitor_llm/challenge_llm adapters.
Note that _resolve_llm_ids is called in build_fetch_response_payload before the if not profile_manager: raise DefaultProfileError() guard (line 536), so a missing default profile causes an AttributeError that bypasses the intended error handling entirely.
| dp = ProfileManager.get_default_llm_profile(tool) | |
| monitor_llm = str(dp.llm.id) | |
| if challenge_llm_instance: | |
| challenge_llm = str(challenge_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| challenge_llm = str(dp.llm.id) | |
| return monitor_llm, challenge_llm | |
| if monitor_llm_instance: | |
| monitor_llm = str(monitor_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| if not dp: | |
| raise DefaultProfileError() | |
| monitor_llm = str(dp.llm.id) | |
| if challenge_llm_instance: | |
| challenge_llm = str(challenge_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| if not dp: | |
| raise DefaultProfileError() | |
| challenge_llm = str(dp.llm.id) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 484-491
Comment:
**`AttributeError` on `None` default profile in `_resolve_llm_ids`**
When `tool.monitor_llm` or `tool.challenge_llm` is `None` (not explicitly set), the code falls through to `ProfileManager.get_default_llm_profile(tool)`. If that also returns `None` (no default profile configured), the next line `str(dp.llm.id)` immediately raises `AttributeError: 'NoneType' object has no attribute 'llm'`. This surfaces as an opaque 500 error rather than the expected `DefaultProfileError`.
This can happen when a user creates a Prompt Studio tool, sets a per-prompt profile manager, but has never configured a default tool-level profile AND has not set explicit `monitor_llm`/`challenge_llm` adapters.
Note that `_resolve_llm_ids` is called in `build_fetch_response_payload` **before** the `if not profile_manager: raise DefaultProfileError()` guard (line 536), so a missing default profile causes an `AttributeError` that bypasses the intended error handling entirely.
```suggestion
if monitor_llm_instance:
monitor_llm = str(monitor_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
monitor_llm = str(dp.llm.id)
if challenge_llm_instance:
challenge_llm = str(challenge_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
challenge_llm = str(dp.llm.id)
```
How can I resolve this? If you propose a fix, please make it concise.| def _emit_error( | ||
| log_events_id: str, | ||
| task_id: str, | ||
| operation: str, | ||
| error: str, | ||
| extra: dict[str, Any] | None = None, | ||
| ) -> None: | ||
| """Push a failure event to the frontend via Socket.IO.""" | ||
| data: dict[str, Any] = { | ||
| "task_id": task_id, | ||
| "status": "failed", | ||
| "operation": operation, | ||
| "error": error, | ||
| } | ||
| if extra: | ||
| data.update(extra) | ||
| _emit_websocket_event( | ||
| room=log_events_id, | ||
| event=PROMPT_STUDIO_RESULT_EVENT, | ||
| data=data, | ||
| ) |
There was a problem hiding this comment.
_emit_error missing _json_safe() serialization
_emit_result wraps its payload through _json_safe() to handle uuid.UUID and datetime objects before emitting via Socket.IO. _emit_error has no equivalent sanitization — it calls _emit_websocket_event directly with raw data.
In practice the current callers store only strings in extra (e.g. from cb.get("document_id", "")), so this is latent rather than immediately broken. However, future callers could pass ORM values or UUIDs into extra, causing json.dumps inside _emit_websocket_event to raise a TypeError and leave the frontend without any error feedback at all.
| def _emit_error( | |
| log_events_id: str, | |
| task_id: str, | |
| operation: str, | |
| error: str, | |
| extra: dict[str, Any] | None = None, | |
| ) -> None: | |
| """Push a failure event to the frontend via Socket.IO.""" | |
| data: dict[str, Any] = { | |
| "task_id": task_id, | |
| "status": "failed", | |
| "operation": operation, | |
| "error": error, | |
| } | |
| if extra: | |
| data.update(extra) | |
| _emit_websocket_event( | |
| room=log_events_id, | |
| event=PROMPT_STUDIO_RESULT_EVENT, | |
| data=data, | |
| ) | |
| if extra: | |
| data.update(extra) | |
| _emit_websocket_event( | |
| room=log_events_id, | |
| event=PROMPT_STUDIO_RESULT_EVENT, | |
| data=_json_safe(data), | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/tasks.py
Line: 74-94
Comment:
**`_emit_error` missing `_json_safe()` serialization**
`_emit_result` wraps its payload through `_json_safe()` to handle `uuid.UUID` and `datetime` objects before emitting via Socket.IO. `_emit_error` has no equivalent sanitization — it calls `_emit_websocket_event` directly with raw data.
In practice the current callers store only strings in `extra` (e.g. from `cb.get("document_id", "")`), so this is latent rather than immediately broken. However, future callers could pass ORM values or UUIDs into `extra`, causing `json.dumps` inside `_emit_websocket_event` to raise a `TypeError` and leave the frontend without any error feedback at all.
```suggestion
if extra:
data.update(extra)
_emit_websocket_event(
room=log_events_id,
event=PROMPT_STUDIO_RESULT_EVENT,
data=_json_safe(data),
)
```
How can I resolve this? If you propose a fix, please make it concise.

What
Introduces a pluggable executor system that replaces Docker-container-based tool execution with Celery worker tasks, and migrates the Prompt Studio IDE to an async execution model using Socket.IO for result delivery. Gated behind the
async_prompt_executionfeature flag for safe rollout.Why
The existing architecture has several limitations:
How
Backend (65 files)
index_document,fetch_response,single_pass_extractionnow return HTTP 202 (accepted) with atask_idinstead of blocking. Gated byasync_prompt_executionfeature flag — old sync path preserved as fallbackbackend/prompt_studio/prompt_studio_core_v2/tasks.py):ide_index_complete,ide_prompt_complete,ide_prompt_erroretc. run onprompt_studio_callbackqueue, perform ORM writes viaOutputManagerHelper, and emitprompt_studio_resultSocket.IO eventsbackend/backend/worker_celery.py): A second Celery app instance that coexists with Django's Celery app, configured to route tasks to executor workersprompt_studio_helper.pyrewrite: RemovedPromptToolHTTP calls entirely. Newbuild_index_payload(),build_fetch_response_payload(),build_single_pass_payload()methods constructExecutionContextobjects with all ORM data pre-loadedbackend/backend/workers/,file_execution_tasks.py,celery_task.py(old in-process workers)Workers (70 files, ~19,500 new lines)
workers/executor/): NewWorkerType.EXECUTORCelery worker withLegacyExecutorhandling all operations:extract,index,answer_prompt,single_pass_extraction,summarize,agentic_extraction,structure_pipelineBaseExecutor→ExecutorRegistry(class-decorator self-registration) →ExecutionOrchestrator→ExecutionDispatcher(Celerysend_task)ExecutorToolShim: Lightweight stand-in forBaseToolthat satisfies SDK1 adapter interfaces without Docker contextworkers/file_processing/structure_tool_task.py): Celery-native replacement for Docker-basedStructureTool.run()with profile overrides, smart table detection, and output file managementSDK1 (22 files)
unstract/sdk1/src/unstract/sdk1/execution/):ExecutionContext,ExecutionResult(serializable DTOs for Celery JSON transport),ExecutionDispatcher(dispatch()+dispatch_with_callback()),BaseExecutor,ExecutorRegistryFrontend (275 files)
usePromptStudioSockethook listens forprompt_studio_resultSocket.IO events.usePromptRunrewritten from polling to fire-and-forget.PromptRun.jsxconditionally renders async or sync path based on feature flagDocker / Infrastructure
worker-executor-v2,worker-prompt-studio-callback,worker-metricsworkers-v2services from opt-in (profiles: [workers-v2]) to defaultArchitecture Change
Can this PR break any existing features? If yes, please list possible items. If no, please explain why.
Yes, potential breaking changes — mitigated by feature flag:
Prompt Studio IDE async path — gated by
async_prompt_executionfeature flag. When flag is OFF (default), all 3 endpoints (index_document,fetch_response,single_pass_extraction) use the old sync path returning HTTP 200. No behavior change for existing users.Review Guidelines
This PR touches 441 files across backend, frontend, workers, and SDK1. Below is a structured review path to navigate it efficiently.
Code Structure Overview
Recommended Review Order
Review in dependency order — each layer builds on the previous:
execution/context.py,result.py,dispatcher.py,registry.pyto_dict()/from_dict()round-trips correct? Is theOperationenum complete? Queue naming (celery_executor_{name}).executor/tasks.py,executor/worker.pyexecute_extraction: retry policy, error handling, log correlation.executors/legacy_executor.py(focus on_OPERATION_MAP+execute())answer_prompt.py,index.py,retrieval.pyexecutor_paramsmatch whatbuild_*_payload()sends? Lazy import pattern (_get_prompt_deps(),_get_indexing_deps()).views.pylines 351–583dispatch_with_callbackusage with correct callback task names and queue.prompt_studio_helper.py(build_index_payload,build_fetch_response_payload,build_single_pass_payload)executor_params? Key compatibility with executor handlers.tasks.py(callback tasks)ide_prompt_complete: ORM writes viaOutputManagerHelper. Socket.IO emission shape. Error callback cleanup. State store setup/teardown.usePromptRun.js,usePromptStudioSocket.js,PromptRun.jsx_emit_result(). Timeout handling. Status cleanup on failure.docker/docker-compose.yamlworker-executor-v2,worker-prompt-studio-callback. Removed old workers. Queue bindings.workers/tests/test_sanity_phase*.pyData Flow (End-to-End)
Known Code Duplication
views.py— 3 view actionsbuild_payload → get_dispatcher → dispatch_with_callback → return 202tasks.py— callback taskside_index_completeandide_prompt_completefollow same structure: extract kwargs → setup state → check result → ORM work → emit → cleanuptasks.py— legacy tasksrun_index_document,run_fetch_response,run_single_pass_extractionkept alongside new callback tasksFiles Safe to Skim
workers/tests/— 24 test files, ~10,000 lines. Well-structured but high volume. Focus ontest_sanity_phase2.py(full Celery chain) andtest_sanity_phase4.py(IDE payload compatibility) as representative examples.workers/executor/executors/retrievers/— 7 retriever implementations. All follow the same pattern. Reviewing one (simple.py) covers the pattern.architecture-*.md,phase*.md) — Reference material, not code.Relevant Docs
architecture-executor-system.md,architecture-flow-diagram.md,architecture-sequence-diagrams.mdin repo rootarchitecture-migration-phases.mdrollout-plan.mdRelated Issues or PRs
Dependencies Versions / Env Variables
New env variables:
FLIPT_SERVICE_AVAILABLEfalseNotes on Testing
cd workers && uv run pytest -v— 490+ tests (444 inworkers/tests/+ extras)cd unstract/sdk1 && uv run pytest -v— 146+ testscd backend && python -m pytest prompt_studio/prompt_studio_core_v2/test_tasks.py -vasync_prompt_execution=true), trigger prompt runs in IDE, verify Socket.IO events deliver results via Network → WS → Messages tabScreenshots
N/A (primarily backend/worker architecture change; frontend UX unchanged when feature flag is off)
Checklist
I have read and understood the Contribution Guidelines.