-
Notifications
You must be signed in to change notification settings - Fork 38
Open
Description
#349 issue is back up.
This PR #352 was undone by this commit: baadd20
A temporary workaround that I came up with is this:
"""
Conductor SDK extensions and workarounds.
This module is the canonical location for workarounds and extensions
to the conductor-python SDK. Keeping them separate from workflow
definitions avoids circular imports and makes the workarounds explicit.
"""
from copy import deepcopy
from conductor.client.workflow.task.join_task import JoinTask
from conductor.client.workflow.task.task import TaskInterface, WorkflowTask
from conductor.client.workflow.task.task_type import TaskType
class FixedDynamicForkTask(TaskInterface):
"""
Fixed DynamicForkTask that correctly uses dynamicForkTasksParam instead of
dynamicForkJoinTasksParam. The upstream SDK has a bug where it sets the wrong field.
See: https://github.com/conductor-oss/python-sdk/pull/352
"""
def __init__(
self,
task_ref_name: str,
tasks_param: str = "dynamicTasks",
tasks_input_param_name: str = "dynamicTasksInput",
join_task: JoinTask = None,
):
super().__init__(
task_reference_name=task_ref_name, task_type=TaskType.FORK_JOIN_DYNAMIC
)
self.tasks_param = tasks_param
self.tasks_input_param_name = tasks_input_param_name
self._join_task = deepcopy(join_task)
def to_workflow_task(self) -> list[WorkflowTask]:
wf_task = super().to_workflow_task()
# FIX: Use dynamic_fork_tasks_param (not dynamic_fork_join_tasks_param)
wf_task.dynamic_fork_tasks_param = self.tasks_param
wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
tasks = [wf_task]
if self._join_task is not None:
tasks.append(self._join_task.to_workflow_task())
return tasks
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels