Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from typing import Any

Expand Down Expand Up @@ -54,3 +55,33 @@ def to_json(self, current_attempt: int = 1) -> dict[str, Any]:
Returns:
Dictionary containing structured JSON data.
"""

async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async convert text to a Pydantic model instance.

Default implementation offloads the sync version to a thread pool
so that custom subclasses work without modification. Native async
subclasses should override this method.

Args:
current_attempt: Current attempt number for retry logic.

Returns:
Pydantic model instance with structured data.
"""
return await asyncio.to_thread(self.to_pydantic, current_attempt)

async def ato_json(self, current_attempt: int = 1) -> dict[str, Any]:
"""Async convert text to a JSON dictionary.

Default implementation offloads the sync version to a thread pool
so that custom subclasses work without modification. Native async
subclasses should override this method.

Args:
current_attempt: Current attempt number for retry logic.

Returns:
Dictionary containing structured JSON data.
"""
return await asyncio.to_thread(self.to_json, current_attempt)
39 changes: 35 additions & 4 deletions lib/crewai/src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.converter import Converter, aconvert_to_model, convert_to_model
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
Expand Down Expand Up @@ -620,7 +620,7 @@ async def _aexecute_core(
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
Expand Down Expand Up @@ -1071,6 +1071,37 @@ def _export_output(

return pydantic_output, json_output

async def _aexport_output(
self, result: str
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""Async version of ``_export_output``.

Uses ``aconvert_to_model`` so LLM calls never block the event loop.
"""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None

if self.output_pydantic or self.output_json:
model_output = await aconvert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)

if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None

return pydantic_output, json_output

def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
Expand Down Expand Up @@ -1286,7 +1317,7 @@ async def _ainvoke_guardrail_function(

if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
pydantic_output, json_output = await self._aexport_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
Expand Down Expand Up @@ -1331,7 +1362,7 @@ async def _ainvoke_guardrail_function(
tools=tools,
)

pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
Expand Down
258 changes: 258 additions & 0 deletions lib/crewai/src/crewai/utilities/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,99 @@ def to_json(self, current_attempt: int = 1) -> str | ConverterError | Any: # ty
return self.to_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")

async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async convert text to pydantic.

Uses ``llm.acall`` so the event loop is never blocked.

Args:
current_attempt: The current attempt number for conversion retries.

Returns:
A Pydantic BaseModel instance.

Raises:
ConverterError: If conversion fails after maximum attempts.
"""
try:
if self.llm.supports_function_calling():
response = await self.llm.acall(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
],
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
else:
response = await self.llm.acall(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
try:
result = self.model.model_validate_json(response)
except ValidationError:
# Try regex-based JSON extraction (CPU-only, no LLM).
# We avoid calling handle_partial_json here because its
# fallback path uses sync convert_with_instructions /
# llm.call(), which would block the event loop.
match = _JSON_PATTERN.search(response)
if match:
try:
result = self.model.model_validate_json(match.group())
except Exception:
raise # will be caught by the outer retry logic
else:
raise # will be caught by the outer retry logic
return result
except ValidationError as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to validation error: {e}"
) from e
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e

async def ato_json(self, current_attempt: int = 1) -> str | ConverterError: # type: ignore[override]
"""Async convert text to json.

Uses ``llm.acall`` so the event loop is never blocked.

Args:
current_attempt: The current attempt number for conversion retries.

Returns:
A JSON string or ConverterError if conversion fails.

Raises:
ConverterError: If conversion fails after maximum attempts.
"""
try:
if self.llm.supports_function_calling():
return await self._create_instructor().ato_json()
return json.dumps(
await self.llm.acall(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
)
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")

def _create_instructor(self) -> InternalInstructor[Any]:
"""Create an instructor."""

Expand Down Expand Up @@ -426,3 +519,168 @@ def create_converter(
raise Exception("No output converter found or set.")

return converter # type: ignore[no-any-return]


async def aconvert_to_model(
result: str,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async convert a result string to a Pydantic model or JSON.

Mirrors ``convert_to_model`` but uses async converter methods so that
LLM calls never block the event loop.

Args:
result: The result string to convert.
output_pydantic: The Pydantic model class to convert to.
output_json: The Pydantic model class to convert to JSON.
agent: The agent instance.
converter_cls: The converter class to use.

Returns:
The converted result as a dict, BaseModel, or original string.
"""
model = output_pydantic or output_json
if model is None:
return result

if converter_cls:
return await aconvert_with_instructions(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)

try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(
result=escaped_result, model=model, is_json_output=bool(output_json)
)
except (json.JSONDecodeError, ValidationError):
return await ahandle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result


async def ahandle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async handle partial JSON in a result string.

Mirrors ``handle_partial_json`` but delegates to
``aconvert_with_instructions`` for async LLM calls.

Args:
result: The result string to process.
model: The Pydantic model class to convert to.
is_json_output: Whether to return a dict (True) or Pydantic model (False).
agent: The agent instance.
converter_cls: The converter class to use.

Returns:
The converted result as a dict, BaseModel, or original string.
"""
match = _JSON_PATTERN.search(result)
if match:
try:
exported_result = model.model_validate_json(match.group())
if is_json_output:
return exported_result.model_dump()
return exported_result
except json.JSONDecodeError:
pass
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)

return await aconvert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)


async def aconvert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async convert a result string using instructions and an LLM.

Mirrors ``convert_with_instructions`` but calls async converter methods
(``ato_pydantic`` / ``ato_json``) so the event loop is never blocked.

Args:
result: The result string to convert.
model: The Pydantic model class to convert to.
is_json_output: Whether to return a dict (True) or Pydantic model (False).
agent: The agent instance.
converter_cls: The converter class to use.

Returns:
The converted result as a dict, BaseModel, or original string.

Raises:
TypeError: If agent is not provided.
"""
if agent is None:
raise TypeError("Agent must be provided for LLM-based conversion.")

llm = getattr(agent, "function_calling_llm", None) or agent.llm

if llm is None:
raise ValueError("Agent must have a valid LLM instance for conversion")

instructions = get_conversion_instructions(model=model, llm=llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
await converter.ato_pydantic()
if not is_json_output
else await converter.ato_json()
)

if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result

return exported_result
Loading