feat: RuntimeState event bus integration with checkpoint/resume#5241
feat: RuntimeState event bus integration with checkpoint/resume#5241greysonlalonde wants to merge 49 commits intomainfrom
Conversation
…resume via kickoff()
…ert TokenProcess to BaseModel
…ider pattern - Move runtime_state.py to state/runtime.py - Add acheckpoint async method using aiofiles - Introduce BaseProvider protocol and JsonProvider for pluggable storage - Add aiofiles dependency to crewai package - Use PrivateAttr for provider on RootModel
iris-clawd
left a comment
There was a problem hiding this comment.
Full Review: RuntimeState event bus integration with checkpoint/resume
This is a substantial PR (~1.5K additions, 42 files). Overall the architecture is solid — RuntimeState + EventRecord + pluggable providers is a clean design. A few concerns worth addressing:
✅ What looks good
-
EventRecord data structure — directed graph with O(1) lookups, typed edges, automatic wiring via
add(). Clean design, well-tested (423-line test file). -
BaseAgentExecutor refactor — Converting
CrewAgentExecutorMixinfrom a plain class to a PydanticBaseModelis the right move for serialization. TheField(exclude=True)for back-references avoids circular serialization. -
Provider abstraction —
BaseProviderprotocol with sync/async methods is clean.JsonProvideris a sensible default. -
3-arg handler backward compat — Event handlers can accept 2 or 3 args.
inspect.signaturedispatch is pragmatic. -
Resume logic in Crew —
_get_execution_start_indexchecking fortask.output is Noneto skip completed tasks is straightforward and correct. -
Test coverage — 423 lines of EventRecord tests covering edge wiring, symmetry, traversal, serialization roundtrips, and RuntimeState integration. Solid.
⚠️ Concerns
1. inspect.signature() on every sync handler call (hot path)
In _call_handlers and is_call_handler_safe, every handler invocation does inspect.signature(handler). This is the event emission hot path — could fire hundreds of times per crew execution. Consider caching the parameter count at registration time (@crewai_event_bus.on) instead.
2. New dependency: aiofiles~=24.1.0
This adds a runtime dependency to the core package for async file I/O in JsonProvider. Given that checkpointing is opt-in and the sync path uses plain open(), is the async path critical enough to justify a new dependency? An alternative: use asyncio.to_thread(Path.read_text, ...) for the async provider methods.
3. register_entity uses id(entity) for dedup
Python's id() can be reused after garbage collection. If an entity is GC'd and a new one allocated at the same address, it won't get registered. In practice this is unlikely during a single execution, but it's a subtle footgun. Consider using the entity's id field (UUID) instead of id().
4. CrewStructuredTool → BaseModel migration
The args_schema: Any and func: Any typing loses the previous type safety. Could these be typed more precisely? e.g., args_schema: type[BaseModel] | None and func: Callable[..., Any] | None.
5. LLM serialization change: str → dict
Changing _serialize_llm_ref to return dict instead of str is a breaking change for anyone consuming serialized agent/crew JSON. The _validate_llm_ref handles dict → LLM on deserialization, which is good, but this needs to be called out in release notes.
6. _restore_runtime matches agents by role string
task.agent is re-linked by matching agent.role == task.agent.role. If two agents share the same role (unusual but possible), this could mis-link. Consider matching on agent id (UUID) instead.
7. mypy: disable-error-code="union-attr,arg-type" added to two files
Both crew_agent_executor.py and experimental/agent_executor.py get blanket mypy suppressions. These are large files — would be better to use inline # type: ignore on specific lines rather than file-wide suppression.
🔍 Minor nits
StandardPromptResultandSystemPromptResultconverted from TypedDict to BaseModel withget()/__getitem__/__contains__methods — these duck-type as dicts for backward compat, which is clever but should be documented.TokenCalcHandler.__hash__ = object.__hash__— this is needed because BaseModel changes hash behavior, but it's non-obvious. A comment explaining why would help.- The
EventRecord.descendants()usesqueue.pop(0)(O(n)) — usecollections.dequefor proper BFS. - Empty
__init__.pyfiles forstate/andstate/provider/— fine, just noting.
Summary
The core design is strong and the test coverage is good. Main things I'd want addressed before merge:
- Cache handler param count at registration (perf)
- Consider dropping
aiofilesdep (useasyncio.to_threadinstead) - Match entities by UUID not role string in
_restore_runtime - Remove file-wide mypy suppressions
None of these are show-stoppers, but #1 and #3 could cause real issues at scale.
💬 142
- Return len(tasks) from _get_execution_start_index when all tasks complete, preventing full re-execution of finished checkpoints - Add _get_execution_start_index call to _aexecute_tasks so async resume skips completed tasks like the sync path does - Cache inspect.signature results per handler to avoid repeated introspection on every event emission
- Bump uv-pre-commit from 0.9.3 to 0.11.3 to support relative exclude-newer values in pyproject.toml - Use checkpoint_kickoff_event_id to detect resume, preventing second kickoff() from skipping tasks or suppressing events
…ewAIInc/crewAI into chore/runtime-state-event-bus
litellm 1.83.0 fixes CVE-2026-35029 (proxy config privilege escalation) and CVE-2026-35030 (proxy JWT auth bypass), and is the first release after the supply chain incident. Bump openai to 2.x to satisfy litellm's dependency.
Extract _prepare_event to set previous_event_id, triggered_by_event_id, emission_sequence, parent/child scoping, and event_record tracking. Both emit and aemit now call it, fixing aemit's missing metadata.
Replay the event record during _restore_runtime to rebuild _event_id_stack with correct event IDs. Remove manual push_event_scope calls from task and crew resume paths that used task UUIDs instead of event IDs.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 167b609. Configure here.
| if isinstance(value, dict): | ||
| from crewai.llm import LLM | ||
|
|
||
| return LLM(**value) |
There was a problem hiding this comment.
LLM deserialization always creates LLM, losing provider type
Medium Severity
_validate_llm_ref always reconstructs a litellm-based LLM from a dict, even if the original object was a different BaseLLM subclass (e.g., OpenAICompletion). After a checkpoint/restore cycle, the LLM provider type is silently changed, which alters runtime behavior. The same issue applies to _validate_executor_ref, which always creates a CrewAgentExecutor even if the original was an AgentExecutor.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 167b609. Configure here.
| return getattr(value, "model", str(value)) | ||
| return {"model": value} | ||
| result: dict[str, Any] = value.model_dump() | ||
| return result |
There was a problem hiding this comment.
Checkpoint files may contain plaintext API credentials
Medium Severity
_serialize_llm_ref calls value.model_dump() which serializes all LLM fields, potentially including api_key, api_base, and other credentials. Checkpoint JSON files written by RuntimeState.checkpoint() could contain sensitive secrets in plaintext on the filesystem.
Reviewed by Cursor Bugbot for commit 167b609. Configure here.
| set_last_event_id(event.event_id) | ||
|
|
||
| if self._runtime_state is not None: | ||
| self._runtime_state.event_record.add(event) |
There was a problem hiding this comment.
Concurrent emit calls race on shared event_record
Low Severity
_prepare_event is called from the emitting thread without any locking, and self._runtime_state.event_record.add(event) mutates the shared nodes dict and modifies neighboring nodes' edge lists. When multiple threads call emit() concurrently, these compound mutations can race, potentially producing an inconsistent event record in the checkpoint.
Reviewed by Cursor Bugbot for commit 167b609. Configure here.


Summary
RuntimeStateas optional third arg to event bus handlersRuntimeState.checkpoint(dir)writes timestamped JSON snapshotsCrew.from_checkpoint(path)restores and resumes viakickoff()_get_execution_start_indexskips tasks with existing outputCrewStructuredTool,StandardPromptResult,SystemPromptResult,TokenCalcHandlerto BaseModelCrewAgentExecutorMixinusesField(exclude=True)for back-referencesTest plan
Note
Medium Risk
Medium risk because it changes core execution plumbing (event emission, executor serialization, and resume logic) and alters JSON serialization shapes for LLM/tool/executor objects, which could affect backward compatibility and runtime behavior.
Overview
Adds first-class checkpoint/resume support by introducing
crewai.state.runtime.RuntimeState(with pluggableBaseProvider+ default filesystemJsonProvider) and anEventRecordthat captures event relationships during execution.Integrates runtime state with the event system: the event bus now tracks/records emitted events into the active
RuntimeState, can auto-register entities, and supports handlers that optionally accept a thirdstateargument while remaining compatible with 2-arg handlers.Enables restoring and resuming runs via
Crew.from_checkpoint()/Flow.from_checkpoint()/BaseAgent.from_checkpoint(), including rebuilding event scope, rehydrating agent executors/message history, and skipping already-completed tasks when resuming.Refactors several runtime objects to be Pydantic models (
BaseAgentExecutor,CrewAgentExecutor,CrewStructuredTool, prompt result types, token tracking/callbacks) and adjusts LLM/tool serialization to structured dicts; also updates CI/pre-commituvversions and bumps dependencies (e.g.,litellm,openai) plus addsaiofilesfor async checkpoint IO.Reviewed by Cursor Bugbot for commit 167b609. Bugbot is set up for automated code reviews on this repo. Configure here.