Skip to content

DynamicForkTask sets incompatible parameters causing validation error #377

@grafke

Description

@grafke

#349 issue is back up.

This PR #352 was undone by this commit: baadd20

A temporary workaround that I came up with is this:

"""
Conductor SDK extensions and workarounds.

This module is the canonical location for workarounds and extensions
to the conductor-python SDK. Keeping them separate from workflow
definitions avoids circular imports and makes the workarounds explicit.
"""

from copy import deepcopy

from conductor.client.workflow.task.join_task import JoinTask
from conductor.client.workflow.task.task import TaskInterface, WorkflowTask
from conductor.client.workflow.task.task_type import TaskType


class FixedDynamicForkTask(TaskInterface):
    """
    Fixed DynamicForkTask that correctly uses dynamicForkTasksParam instead of
    dynamicForkJoinTasksParam. The upstream SDK has a bug where it sets the wrong field.

    See: https://github.com/conductor-oss/python-sdk/pull/352
    """

    def __init__(
        self,
        task_ref_name: str,
        tasks_param: str = "dynamicTasks",
        tasks_input_param_name: str = "dynamicTasksInput",
        join_task: JoinTask = None,
    ):
        super().__init__(
            task_reference_name=task_ref_name, task_type=TaskType.FORK_JOIN_DYNAMIC
        )
        self.tasks_param = tasks_param
        self.tasks_input_param_name = tasks_input_param_name
        self._join_task = deepcopy(join_task)

    def to_workflow_task(self) -> list[WorkflowTask]:
        wf_task = super().to_workflow_task()
        # FIX: Use dynamic_fork_tasks_param (not dynamic_fork_join_tasks_param)
        wf_task.dynamic_fork_tasks_param = self.tasks_param
        wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
        tasks = [wf_task]
        if self._join_task is not None:
            tasks.append(self._join_task.to_workflow_task())
        return tasks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions