Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions infra/vscode_web/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
AZURE_EXISTING_AGENT_ID="<%= agentId %>"
AZURE_ENV_NAME="<%= playgroundName %>"
# AZURE_LOCATION="<%= location %>"
AZURE_SUBSCRIPTION_ID="<%= subscriptionId %>"
AZURE_EXISTING_AIPROJECT_ENDPOINT="<%= endpoint %>"
AZURE_EXISTING_AIPROJECT_RESOURCE_ID="<%= projectResourceId %>"
AZD_ALLOW_NON_EMPTY_FOLDER=true
86 changes: 59 additions & 27 deletions src/ContentProcessor/src/libs/pipeline/queue_handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,68 @@ def _get_artifact_type(step_name: str) -> ArtifactType:
container_name=self.application_context.configuration.app_cps_processes,
)

ContentProcess(
process_id=self._current_message_context.data_pipeline.process_id,
processed_file_name=self._current_message_context.data_pipeline.files[
0
].name,
status="Error",
processed_file_mime_type=self._current_message_context.data_pipeline.files[
0
].mime_type,
last_modified_time=datetime.datetime.now(datetime.UTC),
last_modified_by=step_name,
imported_time=datetime.datetime.strptime(
self._current_message_context.data_pipeline.pipeline_status.creation_time,
"%Y-%m-%dT%H:%M:%S.%fZ",
),
process_output=[
Step_Outputs(
step_name=self.handler_name,
step_result=exception_result.result,
)
],
).update_status_to_cosmos(
connection_string=self.application_context.configuration.app_cosmos_connstr,
database_name=self.application_context.configuration.app_cosmos_database,
collection_name=self.application_context.configuration.app_cosmos_container_process,
)
# Only mark as terminal "Error" when retries are
# exhausted. While retries remain, use "Retrying"
# so the workflow poller keeps waiting instead of
# treating the first transient failure as final.
has_retries_remaining = queue_message.dequeue_count <= 5

if has_retries_remaining:
# Lightweight status-only update — avoids
# overwriting the document with null result /
# scores that a previous successful step may
# have written.
ContentProcess(
process_id=self._current_message_context.data_pipeline.process_id,
processed_file_name=self._current_message_context.data_pipeline.files[
0
].name,
status="Retrying",
processed_file_mime_type=self._current_message_context.data_pipeline.files[
0
].mime_type,
last_modified_time=datetime.datetime.now(datetime.UTC),
last_modified_by=step_name,
imported_time=datetime.datetime.strptime(
self._current_message_context.data_pipeline.pipeline_status.creation_time,
"%Y-%m-%dT%H:%M:%S.%fZ",
),
).update_process_status_to_cosmos(
connection_string=self.application_context.configuration.app_cosmos_connstr,
database_name=self.application_context.configuration.app_cosmos_database,
collection_name=self.application_context.configuration.app_cosmos_container_process,
)
else:
ContentProcess(
process_id=self._current_message_context.data_pipeline.process_id,
processed_file_name=self._current_message_context.data_pipeline.files[
0
].name,
status="Error",
processed_file_mime_type=self._current_message_context.data_pipeline.files[
0
].mime_type,
last_modified_time=datetime.datetime.now(datetime.UTC),
last_modified_by=step_name,
imported_time=datetime.datetime.strptime(
self._current_message_context.data_pipeline.pipeline_status.creation_time,
"%Y-%m-%dT%H:%M:%S.%fZ",
),
process_output=[
Step_Outputs(
step_name=self.handler_name,
step_result=exception_result.result,
)
],
).update_status_to_cosmos(
connection_string=self.application_context.configuration.app_cosmos_connstr,
database_name=self.application_context.configuration.app_cosmos_database,
collection_name=self.application_context.configuration.app_cosmos_container_process,
)

process_outputs: list[Step_Outputs] = []

if queue_message.dequeue_count > 5:
if not has_retries_remaining:
logging.info(
"Message will be moved to the Dead Letter Queue."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,35 @@ async def poll_status(
poll_interval_seconds: float = 5.0,
timeout_seconds: float = 600.0,
on_poll: Callable[[dict], Awaitable[None] | None] | None = None,
error_confirmation_polls: int = 3,
) -> dict:
"""Poll Cosmos for status until a terminal state or timeout.

When an ``Error`` status is observed, the poller does not return
immediately. Instead it re-polls up to *error_confirmation_polls*
additional times (with the same interval) to confirm the error is
persistent and not a transient state during a ContentProcessor
retry cycle. If the status changes away from ``Error`` (e.g.
back to a step name or ``Retrying``), the normal polling loop
resumes.

Args:
process_id: The content process ID to poll.
poll_interval_seconds: Delay between poll attempts.
timeout_seconds: Maximum elapsed time before giving up.
on_poll: Optional callback invoked on each iteration with
the current status dict. Accepts sync or async callables.
error_confirmation_polls: Number of additional polls to
perform after first observing ``Error`` before accepting
it as terminal. Defaults to 3.

Returns:
Final status dict with keys ``status``, ``process_id``,
``file_name``, and ``terminal``.
"""
elapsed = 0.0
result: dict | None = None
consecutive_error_polls = 0
while elapsed < timeout_seconds:
result = await self.get_status(process_id)
if result is None:
Expand All @@ -299,10 +312,27 @@ async def poll_status(
await poll_handler

status = result.get("status", "processing")
if status in ("Completed", "Error"):
if status == "Completed":
result["terminal"] = True
return result

if status == "Error":
consecutive_error_polls += 1
if consecutive_error_polls > error_confirmation_polls:
result["terminal"] = True
return result
logger.info(
"Process %s reported Error (confirmation %d/%d), "
"re-polling to confirm retries are exhausted.",
process_id,
consecutive_error_polls,
error_confirmation_polls,
)
else:
# Status changed away from Error (e.g. retry started),
# reset the confirmation counter.
consecutive_error_polls = 0

await asyncio.sleep(poll_interval_seconds)
elapsed += poll_interval_seconds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,69 @@ async def _run():
record.processed_file_name = "test.pdf"
svc._process_repo.get_async.return_value = record

result = await svc.poll_status("p1", poll_interval_seconds=0.01)
result = await svc.poll_status(
"p1",
poll_interval_seconds=0.01,
error_confirmation_polls=3,
)
assert result["status"] == "Error"
assert result["terminal"] is True
# Should have been called 1 (initial) + 3 (confirmation) = 4 times
assert svc._process_repo.get_async.call_count == 4

asyncio.run(_run())

def test_error_recovers_to_retrying(self):
"""When Error is seen but status changes to Retrying, polling continues."""

async def _run():
svc = _make_service()
# Error -> Retrying -> extract -> Completed
statuses = iter(["Error", "Retrying", "extract", "Completed"])

async def _get_async(pid):
s = next(statuses)
rec = MagicMock()
rec.status = s
rec.processed_file_name = "test.pdf"
return rec

svc._process_repo.get_async.side_effect = _get_async

result = await svc.poll_status(
"p1",
poll_interval_seconds=0.01,
error_confirmation_polls=3,
)
assert result["status"] == "Completed"
assert result["terminal"] is True

asyncio.run(_run())

def test_error_recovers_to_step_name(self):
"""When Error is seen but status changes to a step name, polling continues."""

async def _run():
svc = _make_service()
# Error -> map (retry started) -> Completed
statuses = iter(["Error", "map", "Completed"])

async def _get_async(pid):
s = next(statuses)
rec = MagicMock()
rec.status = s
rec.processed_file_name = "test.pdf"
return rec

svc._process_repo.get_async.side_effect = _get_async

result = await svc.poll_status(
"p1",
poll_interval_seconds=0.01,
error_confirmation_polls=3,
)
assert result["status"] == "Completed"
assert result["terminal"] is True

asyncio.run(_run())

Expand Down
Loading