Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from collections import Counter
from collections.abc import Iterable
from typing import Any

import structlog

Expand All @@ -32,15 +33,22 @@
log = structlog.get_logger(logger_name=__name__)


def _merge_node_dicts(current, new) -> None:
def _merge_node_dicts(current: list[dict[str, Any]], new: list[dict[str, Any]] | None) -> None:
"""Merge node dictionaries from different DAG versions, handling structure changes."""
# Handle None case - can occur when merging old DAG versions
# where a TaskGroup was converted to a task or vice versa
if new is None:
return

current_nodes_by_id = {node["id"]: node for node in current}
for node in new:
node_id = node["id"]
current_node = current_nodes_by_id.get(node_id)
if current_node is not None:
# if we have children, merge those as well
if current_node.get("children"):
_merge_node_dicts(current_node["children"], node.get("children", []))
# Only merge children if current node already has children
# This preserves the structure of the latest DAG version
if current_node.get("children") is not None:
_merge_node_dicts(current_node["children"], node.get("children"))
else:
current.append(node)
current_nodes_by_id[node_id] = node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.models.dagbag import DBDagBag
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import task_group
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -874,6 +875,72 @@ def test_structure_includes_historical_removed_task_with_proper_shape(self, sess
assert "is_mapped" not in t4
assert "children" not in t4

def test_task_converted_to_task_group_doesnt_crash(self, session, dag_maker, test_client):
"""Test that converting a Task to a TaskGroup with same name doesn't crash grid view.

Regression test for https://github.com/apache/airflow/issues/61208
"""

dag_id = "test_task_to_group_conversion"

# Version 1: task_a is a simple task
with dag_maker(
dag_id=dag_id,
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule=None,
):
PythonOperator(task_id="task_a", python_callable=lambda: True)
PythonOperator(task_id="task_b", python_callable=lambda: True)

# Create another DagRun with the new version
dag_maker.create_dagrun(
run_id="test_run_1",
run_type=DagRunType.MANUAL,
logical_date=pendulum.datetime(2024, 1, 3, tz="UTC"),
)

Comment thread
pierrejeambrun marked this conversation as resolved.
response_v1 = test_client.get(f"/grid/structure/{dag_id}")
assert response_v1.status_code == 200
nodes_v1 = response_v1.json()
assert nodes_v1 == [
{"id": "task_a", "label": "task_a"},
{"id": "task_b", "label": "task_b"},
]

# Version 2: task_a is a TaskGroup with subtasks
with dag_maker(
dag_id=dag_id,
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule=None,
serialized=True,
):
with TaskGroup(group_id="task_a"):
PythonOperator(task_id="task_a1", python_callable=lambda: True)
PythonOperator(task_id="task_a2", python_callable=lambda: True)
PythonOperator(task_id="task_b", python_callable=lambda: True)

dag_maker.create_dagrun(
run_id="test_run_2",
run_type=DagRunType.MANUAL,
logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
)

# Verify v2 structure shows TaskGroup with children
response_v2 = test_client.get(f"/grid/structure/{dag_id}")
assert response_v2.status_code == 200
nodes_v2 = response_v2.json()
assert nodes_v2 == [
{
"id": "task_a",
"label": "task_a",
"children": [
{"id": "task_a.task_a1", "label": "task_a1"},
{"id": "task_a.task_a2", "label": "task_a2"},
],
},
{"id": "task_b", "label": "task_b"},
]

# Tests for root, include_upstream, and include_downstream parameters
@pytest.mark.parametrize(
("params", "expected_task_ids", "description"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,72 @@
from airflow.api_fastapi.core_api.services.ui.grid import _merge_node_dicts


def test_merge_node_dicts_with_none_new_list():
"""Test merging with None new list doesn't crash.

Regression test for https://github.com/apache/airflow/issues/61208
When a TaskGroup is converted to a task, new can be None for some runs.
"""
current = [{"id": "task1", "label": "Task 1"}]
new = None

_merge_node_dicts(current, new)

assert len(current) == 1
assert current[0]["id"] == "task1"


def test_merge_node_dicts_preserves_taskgroup_structure():
"""Test TaskGroup structure is preserved when converting to task."""
current = [
{
"id": "task_a",
"label": "Task A",
"children": [
{"id": "task_a.subtask1", "label": "Subtask 1"},
],
}
]
new = [{"id": "task_a", "label": "Task A", "children": None}]

_merge_node_dicts(current, new)

# Current structure (TaskGroup) is preserved
assert len(current) == 1
assert current[0]["id"] == "task_a"
assert current[0]["children"] is not None
assert len(current[0]["children"]) == 1


def test_merge_node_dicts_merges_nested_children():
"""Test merging nodes with nested children."""
current = [
{
"id": "group1",
"label": "Group 1",
"children": [
{"id": "group1.task1", "label": "Task 1"},
],
}
]
new = [
{
"id": "group1",
"label": "Group 1",
"children": [
{"id": "group1.task1", "label": "Task 1"},
{"id": "group1.task2", "label": "Task 2"},
],
}
]

_merge_node_dicts(current, new)

assert len(current) == 1
assert current[0]["id"] == "group1"
assert len(current[0]["children"]) == 2


def test_merge_node_dicts_merges_children_and_appends_new_nodes():
current = [
{
Expand Down
Loading