Skip to content

feat: RuntimeState event bus integration with checkpoint/resume#5241

Open
greysonlalonde wants to merge 49 commits intomainfrom
chore/runtime-state-event-bus
Open

feat: RuntimeState event bus integration with checkpoint/resume#5241
greysonlalonde wants to merge 49 commits intomainfrom
chore/runtime-state-event-bus

Conversation

@greysonlalonde
Copy link
Copy Markdown
Contributor

@greysonlalonde greysonlalonde commented Apr 2, 2026

Summary

  • Pass RuntimeState as optional third arg to event bus handlers
  • RuntimeState.checkpoint(dir) writes timestamped JSON snapshots
  • Crew.from_checkpoint(path) restores and resumes via kickoff()
  • _get_execution_start_index skips tasks with existing output
  • Convert CrewStructuredTool, StandardPromptResult, SystemPromptResult, TokenCalcHandler to BaseModel
  • CrewAgentExecutorMixin uses Field(exclude=True) for back-references

Test plan

  • Real LLM execution: checkpoint after task 1, restore, resume skips task 1 and runs task 2
  • 371 core tests pass
  • Backwards compatible: 2-arg event handlers still work

Note

Medium Risk
Medium risk because it changes core execution plumbing (event emission, executor serialization, and resume logic) and alters JSON serialization shapes for LLM/tool/executor objects, which could affect backward compatibility and runtime behavior.

Overview
Adds first-class checkpoint/resume support by introducing crewai.state.runtime.RuntimeState (with pluggable BaseProvider + default filesystem JsonProvider) and an EventRecord that captures event relationships during execution.

Integrates runtime state with the event system: the event bus now tracks/records emitted events into the active RuntimeState, can auto-register entities, and supports handlers that optionally accept a third state argument while remaining compatible with 2-arg handlers.

Enables restoring and resuming runs via Crew.from_checkpoint() / Flow.from_checkpoint() / BaseAgent.from_checkpoint(), including rebuilding event scope, rehydrating agent executors/message history, and skipping already-completed tasks when resuming.

Refactors several runtime objects to be Pydantic models (BaseAgentExecutor, CrewAgentExecutor, CrewStructuredTool, prompt result types, token tracking/callbacks) and adjusts LLM/tool serialization to structured dicts; also updates CI/pre-commit uv versions and bumps dependencies (e.g., litellm, openai) plus adds aiofiles for async checkpoint IO.

Reviewed by Cursor Bugbot for commit 167b609. Bugbot is set up for automated code reviews on this repo. Configure here.

@github-actions github-actions bot added the size/L label Apr 2, 2026
@greysonlalonde greysonlalonde changed the title feat: runtime state event bus feat: RuntimeState event bus integration with checkpoint/resume Apr 3, 2026
…ider pattern

- Move runtime_state.py to state/runtime.py
- Add acheckpoint async method using aiofiles
- Introduce BaseProvider protocol and JsonProvider for pluggable storage
- Add aiofiles dependency to crewai package
- Use PrivateAttr for provider on RootModel
@greysonlalonde greysonlalonde marked this pull request as ready for review April 3, 2026 20:01
Copy link
Copy Markdown
Contributor

@iris-clawd iris-clawd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full Review: RuntimeState event bus integration with checkpoint/resume

This is a substantial PR (~1.5K additions, 42 files). Overall the architecture is solid — RuntimeState + EventRecord + pluggable providers is a clean design. A few concerns worth addressing:


✅ What looks good

  1. EventRecord data structure — directed graph with O(1) lookups, typed edges, automatic wiring via add(). Clean design, well-tested (423-line test file).

  2. BaseAgentExecutor refactor — Converting CrewAgentExecutorMixin from a plain class to a Pydantic BaseModel is the right move for serialization. The Field(exclude=True) for back-references avoids circular serialization.

  3. Provider abstractionBaseProvider protocol with sync/async methods is clean. JsonProvider is a sensible default.

  4. 3-arg handler backward compat — Event handlers can accept 2 or 3 args. inspect.signature dispatch is pragmatic.

  5. Resume logic in Crew_get_execution_start_index checking for task.output is None to skip completed tasks is straightforward and correct.

  6. Test coverage — 423 lines of EventRecord tests covering edge wiring, symmetry, traversal, serialization roundtrips, and RuntimeState integration. Solid.


⚠️ Concerns

1. inspect.signature() on every sync handler call (hot path)

In _call_handlers and is_call_handler_safe, every handler invocation does inspect.signature(handler). This is the event emission hot path — could fire hundreds of times per crew execution. Consider caching the parameter count at registration time (@crewai_event_bus.on) instead.

2. New dependency: aiofiles~=24.1.0

This adds a runtime dependency to the core package for async file I/O in JsonProvider. Given that checkpointing is opt-in and the sync path uses plain open(), is the async path critical enough to justify a new dependency? An alternative: use asyncio.to_thread(Path.read_text, ...) for the async provider methods.

3. register_entity uses id(entity) for dedup

Python's id() can be reused after garbage collection. If an entity is GC'd and a new one allocated at the same address, it won't get registered. In practice this is unlikely during a single execution, but it's a subtle footgun. Consider using the entity's id field (UUID) instead of id().

4. CrewStructuredTool → BaseModel migration

The args_schema: Any and func: Any typing loses the previous type safety. Could these be typed more precisely? e.g., args_schema: type[BaseModel] | None and func: Callable[..., Any] | None.

5. LLM serialization change: str → dict

Changing _serialize_llm_ref to return dict instead of str is a breaking change for anyone consuming serialized agent/crew JSON. The _validate_llm_ref handles dict → LLM on deserialization, which is good, but this needs to be called out in release notes.

6. _restore_runtime matches agents by role string

task.agent is re-linked by matching agent.role == task.agent.role. If two agents share the same role (unusual but possible), this could mis-link. Consider matching on agent id (UUID) instead.

7. mypy: disable-error-code="union-attr,arg-type" added to two files

Both crew_agent_executor.py and experimental/agent_executor.py get blanket mypy suppressions. These are large files — would be better to use inline # type: ignore on specific lines rather than file-wide suppression.


🔍 Minor nits

  • StandardPromptResult and SystemPromptResult converted from TypedDict to BaseModel with get()/__getitem__/__contains__ methods — these duck-type as dicts for backward compat, which is clever but should be documented.
  • TokenCalcHandler.__hash__ = object.__hash__ — this is needed because BaseModel changes hash behavior, but it's non-obvious. A comment explaining why would help.
  • The EventRecord.descendants() uses queue.pop(0) (O(n)) — use collections.deque for proper BFS.
  • Empty __init__.py files for state/ and state/provider/ — fine, just noting.

Summary

The core design is strong and the test coverage is good. Main things I'd want addressed before merge:

  1. Cache handler param count at registration (perf)
  2. Consider dropping aiofiles dep (use asyncio.to_thread instead)
  3. Match entities by UUID not role string in _restore_runtime
  4. Remove file-wide mypy suppressions

None of these are show-stoppers, but #1 and #3 could cause real issues at scale.

💬 142

greysonlalonde and others added 2 commits April 4, 2026 04:11
- Return len(tasks) from _get_execution_start_index when all tasks
  complete, preventing full re-execution of finished checkpoints
- Add _get_execution_start_index call to _aexecute_tasks so async
  resume skips completed tasks like the sync path does
- Cache inspect.signature results per handler to avoid repeated
  introspection on every event emission
- Bump uv-pre-commit from 0.9.3 to 0.11.3 to support relative
  exclude-newer values in pyproject.toml
- Use checkpoint_kickoff_event_id to detect resume, preventing
  second kickoff() from skipping tasks or suppressing events
litellm 1.83.0 fixes CVE-2026-35029 (proxy config privilege escalation)
and CVE-2026-35030 (proxy JWT auth bypass), and is the first release
after the supply chain incident. Bump openai to 2.x to satisfy litellm's
dependency.
Extract _prepare_event to set previous_event_id, triggered_by_event_id,
emission_sequence, parent/child scoping, and event_record tracking.
Both emit and aemit now call it, fixing aemit's missing metadata.
Replay the event record during _restore_runtime to rebuild
_event_id_stack with correct event IDs. Remove manual push_event_scope
calls from task and crew resume paths that used task UUIDs instead
of event IDs.
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 167b609. Configure here.

if isinstance(value, dict):
from crewai.llm import LLM

return LLM(**value)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLM deserialization always creates LLM, losing provider type

Medium Severity

_validate_llm_ref always reconstructs a litellm-based LLM from a dict, even if the original object was a different BaseLLM subclass (e.g., OpenAICompletion). After a checkpoint/restore cycle, the LLM provider type is silently changed, which alters runtime behavior. The same issue applies to _validate_executor_ref, which always creates a CrewAgentExecutor even if the original was an AgentExecutor.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 167b609. Configure here.

return getattr(value, "model", str(value))
return {"model": value}
result: dict[str, Any] = value.model_dump()
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint files may contain plaintext API credentials

Medium Severity

_serialize_llm_ref calls value.model_dump() which serializes all LLM fields, potentially including api_key, api_base, and other credentials. Checkpoint JSON files written by RuntimeState.checkpoint() could contain sensitive secrets in plaintext on the filesystem.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 167b609. Configure here.

set_last_event_id(event.event_id)

if self._runtime_state is not None:
self._runtime_state.event_record.add(event)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent emit calls race on shared event_record

Low Severity

_prepare_event is called from the emitting thread without any locking, and self._runtime_state.event_record.add(event) mutates the shared nodes dict and modifies neighboring nodes' edge lists. When multiple threads call emit() concurrently, these compound mutations can race, potentially producing an inconsistent event record in the checkpoint.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 167b609. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants