[BREAKING] Python: Add bulk executor registration method to WorkflowBuilder#3565
[BREAKING] Python: Add bulk executor registration method to WorkflowBuilder#3565holtvogt wants to merge 17 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a bulk executor registration method to the Python WorkflowBuilder class to reduce boilerplate when registering multiple executors. The change addresses issue #3551 by providing a dict-based API as an alternative to multiple individual register_executor() calls.
Changes:
- Added
register_executors()method that accepts adict[str, Callable[[], Executor]]for bulk registration - Added unit test
test_register_executors_bulk()to verify the new method works correctly and supports method chaining
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_workflow_builder.py |
Added register_executors() method with proper validation, documentation, and examples following existing patterns |
python/packages/core/tests/workflow/test_workflow_builder.py |
Added basic test for bulk registration functionality and chaining behavior |
|
Hi, thank you for your contribution! This is actually something we wanted to improve too! |
Introduces build_async() to support async executor factories while minimizing code duplication. The sync and async build paths differ only in how they instantiate executors (sync raises on awaitables, async awaits them). All other logic (validation, edge resolution, workflow creation) is extracted into shared helpers to ensure maintainability and avoid duplicating the logic.
|
@moonbox3 The async support involved some heavier lifting, mainly to avoid duplicating existing logic and to cleanly separate reusable helpers. Does this approach look reasonable, or would you prefer handling the duplication differently? Let me know what you think! |
moonbox3
left a comment
There was a problem hiding this comment.
Thanks for working on this. Some comments/questions.
| if inspect.isawaitable(instance): | ||
| # Close un-awaited coroutines to avoid runtime warnings or memory leaks | ||
| if hasattr(instance, "close"): | ||
| instance.close() # type: ignore[reportGeneralTypeIssues] |
There was a problem hiding this comment.
if instance.close() ever throws, the ValueError is never raised. Should we wrap it in a try/except?
There was a problem hiding this comment.
Can we intentionally overwrite the error from close() using a finally block instead?
There was a problem hiding this comment.
It might be worth logging close() failures at WARNING level in an except block to explicitly flag possible resource leaks. This seems safer than silently swallowing the error. Thoughts?
| if hasattr(instance, "close"): | ||
| instance.close() # type: ignore[reportGeneralTypeIssues] | ||
| raise ValueError("Async executor factories were detected. Use build_async() instead.") | ||
| factory_name_to_instance[name] = instance |
There was a problem hiding this comment.
Should we add a isinstance(instance, Executor) check here like:
if not isinstance(instance, Executor):
raise TypeError(
f"Factory '{name}' returned {type(instance).__name__} instead of an Executor."
)Same for the new async path?
Also, when a factory in a large dict raises an exception during build(), the error has no indication of which factory name caused it. A try/except that adds the factory name would help improve debuggability, I think.
| start_executor = self._start_executor | ||
|
|
||
| deferred_edge_groups: list[EdgeGroup] = [] | ||
| executor_ids_seen: set[str] = set() |
There was a problem hiding this comment.
When a duplicate ID is found, the error says which ID collided but not which two factory names produced it. What if we use a dict[str, str] (id -> name) so we could say: "ID 'agent' from factory 'B' conflicts with factory 'A'"?
There was a problem hiding this comment.
Do we have tests for:
- mixed sync+async factories with build_async()?
- coroutine
.close()actually called on sync build with async factory? build_async()when async factory raises exception?- multiple
register_executors()calls with non-overlapping names? - whitespace-only names (like " ") failing as rejected?
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
For duplicate IDs, the new messages specify the conflicting factories, improving debugging. Added tests to verify that conflicts between factory-produced and directly added executors raise appropriate errors.
|
@moonbox3 Thanks for the detailed feedback and the sharp eye. I addressed the points and will happily hand this back to you for another round 🔍 |
| | _FanInEdgeRegistration | ||
| ] = [] | ||
| self._executor_registry: dict[str, Callable[[], Executor]] = {} | ||
| self._executor_registry: dict[str, Callable[[], Executor | Awaitable[Executor]]] = {} |
There was a problem hiding this comment.
Should we allow async factories? This will force us to have an async build method.
I intentionally left out async factories when adding the factory pattern because I'd much prefer a single build method.
AzureAIAgent requires async methods to create or retrieve an agent, which makes this very tricky.
The AIProjectClient does support none-async agent creation. Should we add support for that too? @eavanvalkenburg
There was a problem hiding this comment.
So it's nuanced,if you want the agent with Foundry to be created them you need the async 'create_agent' method, but you can always use the non-async 'get_agent' method, and then we create on the portal at first run, so I wouldn't let that determine this design because I agree, having non asyn build is preferred. @dmytrostruk can share additional insights, he did most of the work
There was a problem hiding this comment.
The reason I suggested an asynchronous factory is because of the need for AzureAIClient. We need this to be an easy experience - why are we forcing users to first have to define an agent in the portal, AND then be able to use it in code? How is that a good dev experience?
There was a problem hiding this comment.
The AIProjectClient does support none-async agent creation. Should we add support for that too?
I don't think we can do this, using sync methods will block the event loop, can impact performance etc. I also don't think we need to force users to first have to define an agent in the portal. As @eavanvalkenburg mentioned, we have this ability in AzureAIClient to create an agent on first await agent.run() instead of doing it before the ChatAgent instance is created, which could be useful here, but from AzureAIClient perspective - it's unexpected design, which already created some confusion around it, so we should definitely revisit.
But I also don't see a problem to have async factory. There are local agents as well as remote agents, and there should be a mechanism to create both, so you can work with them.
ead3f1c to
6648453
Compare
|
Hi @holtvogt, we appreciate all the work and time you put into this PR. We're going to go a different direction by removing the factory methods. We will do this in a different PR. I will close this PR right now. |
Motivation and Context
Registering many executors currently needs repetitive
register_executor()calls. This PR replaces the existing API with a dict-based bulk registration method (register_executors()) to reduce boilerplate and verbosity. This is most useful in complex workflows with many executors.Description
Replaced the existing executor registration API with a bulk registration API on
WorkflowBuilderthat accepts adict[str, Callable[[], Executor | Awaitable[Executor]]], reducing boilerplate when registering multiple executors. This PR also adds async factory support to workflows. Executor factories may now return either anExecutoror anAwaitable[Executor], which is resolved viabuild_async().Contribution Checklist
Reference