From 1e6f5182f44cd2400dc7f7bd3aaf33b406101068 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 07:15:21 -0700 Subject: [PATCH 01/22] Introduce parent task spans and nest worker and trigger spans under them This lets us tie together the worker and trigger phases of task execution. Also lets us see the delta between task queued time and task start time. --- .../execution_api/routes/task_instances.py | 30 +++++++++++++++++++ airflow-core/src/airflow/models/dagrun.py | 6 +++- .../src/airflow/models/taskinstance.py | 28 ++++++++++++++++- .../airflow/sdk/execution_time/task_runner.py | 2 +- 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 60dd868c2e335..31283015bd694 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -29,6 +29,9 @@ import structlog from cadwyn import VersionedAPIRouter from fastapi import Body, HTTPException, Query, Security, status +from opentelemetry import trace +from opentelemetry.trace import StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from pydantic import JsonValue from sqlalchemy import and_, func, or_, tuple_, update from sqlalchemy.engine import CursorResult @@ -87,6 +90,7 @@ log = structlog.get_logger(__name__) +tracer = trace.get_tracer(__name__) @ti_id_router.patch( @@ -431,6 +435,31 @@ def ti_update_state( ) +def _emit_task_span(ti, state): + log.info("making task span", ti=ti) + ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) + if not ti.start_date: + log.warning("ti has no start date", ti=ti) + span = tracer.start_span( + name=f"task_run.{ti.dag_id}", + start_time=int((ti.start_date or timezone.utcnow()).timestamp() * 1e9), + context=ctx, + ) + span.set_attributes( + { + "airflow.dag_id": ti.dag_id, + "airflow.task_id": ti.task_id, + "airflow.dag_run.run_id": ti.run_id, + "airflow.task_instance.try_number": ti.try_number, + "airflow.task_instance.map_index": ti.map_index if ti.map_index is not None else -1, + "airflow.task_instance.state": state, + } + ) + status_code = StatusCode.OK if state == TaskInstanceState.SUCCESS else StatusCode.ERROR + span.set_status(status_code) + span.end() + + def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> None: dr = ti.dag_run @@ -479,6 +508,7 @@ def _create_ti_state_update_query_and_update_state( ti_patch_payload.outlet_events, session, ) + _emit_task_span(ti, state=updated_state) elif isinstance(ti_patch_payload, TIDeferredStatePayload): # Calculate timeout if it was passed timeout = None diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 23fdfb7256453..603d3e785dd67 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1763,7 +1763,11 @@ def create_ti_mapping(task: Operator, indexes: Iterable[int]) -> Iterator[dict[s created_counts[task.task_type] += 1 for map_index in indexes: yield TI.insert_mapping( - self.run_id, task, map_index=map_index, dag_version_id=dag_version_id + self.run_id, + task, + map_index=map_index, + dag_version_id=dag_version_id, + dag_run=self, ) creator = create_ti_mapping diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index c8a9c05bd9fd3..e0d3e444fa89a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -32,6 +32,9 @@ import attrs import dill import uuid6 +from opentelemetry import trace +from opentelemetry.trace import SpanContext, TraceFlags +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import ( JSON, Float, @@ -486,6 +489,26 @@ def uuid7() -> UUID: return uuid6.uuid7() +def _make_task_carrier(dag_run_context_carrier): + parent_otel_ctx = ( + TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None + ) + parent_span = trace.get_current_span(parent_otel_ctx) if parent_otel_ctx is not None else None + parent_span_ctx = parent_span.get_span_context() if parent_span is not None else None + ctx = None + if parent_span_ctx is not None and parent_span_ctx.is_valid: + span_ctx = SpanContext( + trace_id=parent_span_ctx.trace_id, + span_id=parent_span_ctx.span_id, + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_ctx)) + carrier: dict[str, str] = {} + TraceContextTextMapPropagator().inject(carrier, context=ctx) + return carrier + + class TaskInstance(Base, LoggingMixin, BaseWorkload): """ Task instances store the state of a task instance. @@ -679,7 +702,7 @@ def stats_tags(self) -> dict[str, str]: @staticmethod def insert_mapping( - run_id: str, task: Operator, map_index: int, dag_version_id: UUID | None + run_id: str, task: Operator, map_index: int, *, dag_version_id: UUID | None, dag_run: DagRun ) -> dict[str, Any]: """ Insert mapping. @@ -689,6 +712,7 @@ def insert_mapping( priority_weight = task.weight_rule.get_weight( TaskInstance(task=task, run_id=run_id, map_index=map_index, dag_version_id=dag_version_id) ) + context_carrier = _make_task_carrier(dag_run.context_carrier) return { "dag_id": task.dag_id, @@ -710,6 +734,8 @@ def insert_mapping( "map_index": map_index, "_task_display_property_value": task.task_display_name, "dag_version_id": dag_version_id, + "parent_context_carrier": dag_run.context_carrier, + "context_carrier": context_carrier, } @reconstructor diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 611c4fc28ec19..76b7724dff03f 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -146,7 +146,7 @@ def _make_task_span(msg: StartupDetails): TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if msg.ti.context_carrier else None ) ti = msg.ti - span_name = f"task_run.{ti.task_id}" + span_name = f"task_worker_run.{ti.task_id}" if ti.map_index is not None and ti.map_index >= 0: span_name += f"_{ti.map_index}" with tracer.start_as_current_span(span_name, context=parent_context) as span: From 1546540094f8ec7b923898772b831c0163b39c09 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 09:58:20 -0700 Subject: [PATCH 02/22] curren tstate --- .../api_fastapi/common/http_access_log.py | 2 +- .../execution_api/routes/task_instances.py | 45 ++++++++++--------- .../src/airflow/executors/workloads/task.py | 2 +- .../src/airflow/models/taskinstance.py | 33 +++++++------- 4 files changed, 44 insertions(+), 38 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py index 581c359379011..55e0c73ea2fb6 100644 --- a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py +++ b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py @@ -95,7 +95,7 @@ async def capture_send(message: Message) -> None: client = scope.get("client") client_addr = f"{client[0]}:{client[1]}" if client else None - logger.info( + logger.debug( "request finished", method=method, path=path, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 31283015bd694..9e47d5c40cea7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -40,6 +40,7 @@ from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars +from airflow._shared.observability.traces import override_ids from airflow._shared.timezones import timezone from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag from airflow.api_fastapi.common.db.common import SessionDep @@ -437,27 +438,31 @@ def ti_update_state( def _emit_task_span(ti, state): log.info("making task span", ti=ti) + ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) - if not ti.start_date: - log.warning("ti has no start date", ti=ti) - span = tracer.start_span( - name=f"task_run.{ti.dag_id}", - start_time=int((ti.start_date or timezone.utcnow()).timestamp() * 1e9), - context=ctx, - ) - span.set_attributes( - { - "airflow.dag_id": ti.dag_id, - "airflow.task_id": ti.task_id, - "airflow.dag_run.run_id": ti.run_id, - "airflow.task_instance.try_number": ti.try_number, - "airflow.task_instance.map_index": ti.map_index if ti.map_index is not None else -1, - "airflow.task_instance.state": state, - } - ) - status_code = StatusCode.OK if state == TaskInstanceState.SUCCESS else StatusCode.ERROR - span.set_status(status_code) - span.end() + span = trace.get_current_span(context=ctx) + span_context = span.get_span_context() + with override_ids(span_context.trace_id, span_context.span_id): + if not ti.start_date: + log.warning("ti has no start date", ti=ti) + span = tracer.start_span( + name=f"task_run.{ti.dag_id}", + start_time=int((ti.start_date or timezone.utcnow()).timestamp() * 1e9), + context=ctx, + ) + span.set_attributes( + { + "airflow.dag_id": ti.dag_id, + "airflow.task_id": ti.task_id, + "airflow.dag_run.run_id": ti.run_id, + "airflow.task_instance.try_number": ti.try_number, + "airflow.task_instance.map_index": ti.map_index if ti.map_index is not None else -1, + "airflow.task_instance.state": state, + } + ) + status_code = StatusCode.OK if state == TaskInstanceState.SUCCESS else StatusCode.ERROR + span.set_status(status_code) + span.end() def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> None: diff --git a/airflow-core/src/airflow/executors/workloads/task.py b/airflow-core/src/airflow/executors/workloads/task.py index a5939cf424412..b0c7187a385a4 100644 --- a/airflow-core/src/airflow/executors/workloads/task.py +++ b/airflow-core/src/airflow/executors/workloads/task.py @@ -86,7 +86,7 @@ def make( from airflow.utils.helpers import log_filename_template_renderer ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True) - ser_ti.context_carrier = ti.dag_run.context_carrier + # ser_ti.context_carrier = ti.dag_run.context_carrier if not bundle_info: bundle_info = BundleInfo( name=ti.dag_model.bundle_name, diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index e0d3e444fa89a..a19d422f98461 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -20,7 +20,6 @@ import hashlib import itertools import json -import logging import math from collections import defaultdict from collections.abc import Collection, Iterable @@ -31,6 +30,7 @@ import attrs import dill +import structlog import uuid6 from opentelemetry import trace from opentelemetry.trace import SpanContext, TraceFlags @@ -104,8 +104,8 @@ TR = TaskReschedule -log = logging.getLogger(__name__) - +log = structlog.get_logger(__name__) +tracer = trace.get_tracer(__name__) if TYPE_CHECKING: from datetime import datetime @@ -490,22 +490,23 @@ def uuid7() -> UUID: def _make_task_carrier(dag_run_context_carrier): - parent_otel_ctx = ( + parent_context = ( TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None ) - parent_span = trace.get_current_span(parent_otel_ctx) if parent_otel_ctx is not None else None - parent_span_ctx = parent_span.get_span_context() if parent_span is not None else None - ctx = None - if parent_span_ctx is not None and parent_span_ctx.is_valid: - span_ctx = SpanContext( - trace_id=parent_span_ctx.trace_id, - span_id=parent_span_ctx.span_id, - is_remote=True, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_ctx)) + + parent_span = trace.get_current_span(parent_context) + parent_span_ctx = parent_span.get_span_context() + span_ctx = SpanContext( + trace_id=parent_span_ctx.trace_id, + span_id=parent_span_ctx.span_id, + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + span = tracer.start_span("notused", context=parent_context) # intentionally never closed + new_ctx = trace.set_span_in_context(span) carrier: dict[str, str] = {} - TraceContextTextMapPropagator().inject(carrier, context=ctx) + TraceContextTextMapPropagator().inject(carrier, context=new_ctx) + log.warning("making ti carrier", dag_run_carrier=dag_run_context_carrier, ti_carrier=carrier) return carrier From edc298e692707fc3d21bca235235728097b1a549 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 09:58:47 -0700 Subject: [PATCH 03/22] current state --- airflow-core/src/airflow/models/taskinstance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index a19d422f98461..20a93a91b7f8e 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -735,7 +735,6 @@ def insert_mapping( "map_index": map_index, "_task_display_property_value": task.task_display_name, "dag_version_id": dag_version_id, - "parent_context_carrier": dag_run.context_carrier, "context_carrier": context_carrier, } From 065882bd080354e37d5621572dba71b3b96ec6d9 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:25:52 -0700 Subject: [PATCH 04/22] --wip-- [skip ci] --- .../execution_api/routes/task_instances.py | 6 +- .../src/airflow/jobs/triggerer_job_runner.py | 84 +++++++++++++------ .../airflow/sdk/execution_time/task_runner.py | 2 +- 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 9e47d5c40cea7..d083058324a22 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -438,17 +438,17 @@ def ti_update_state( def _emit_task_span(ti, state): log.info("making task span", ti=ti) - + log.info("ti.context_carrier", context_carrier=ti.context_carrier) ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) span = trace.get_current_span(context=ctx) span_context = span.get_span_context() with override_ids(span_context.trace_id, span_context.span_id): + log.info("overriding ids", trace_id=span_context.trace_id, span_id=span_context.span_id) if not ti.start_date: log.warning("ti has no start date", ti=ti) span = tracer.start_span( - name=f"task_run.{ti.dag_id}", + name=f"task_run.{ti.task_id}", start_time=int((ti.start_date or timezone.utcnow()).timestamp() * 1e9), - context=ctx, ) span.set_attributes( { diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 1406283c05cb3..edb9354883f73 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -35,6 +35,10 @@ import anyio import attrs import structlog +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.util._decorator import _AgnosticContextManager from pydantic import BaseModel, Field, TypeAdapter from sqlalchemy import func, select from structlog.contextvars import bind_contextvars as bind_log_contextvars @@ -96,6 +100,33 @@ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + + +def _prepare_span( + ti: TaskInstanceDTO | None, trigger_id: int, name: str +) -> _AgnosticContextManager[trace.Span]: + parent_context = ( + TraceContextTextMapPropagator().extract(ti.context_carrier) if ti and ti.context_carrier else None + ) + span_name = f"trigger_run.{ti.task_id}" if ti else f"trigger_run.{trigger_id}" + if ti and ti.map_index >= 0: + span_name += f"_{ti.map_index}" + attributes: dict[str, str | int] = { + "airflow.trigger.name": name, + } + if ti: + attributes = { + **attributes, + "airflow.dag_id": ti.dag_id, + "airflow.task_id": ti.task_id, + "airflow.dag_run.run_id": ti.run_id, + "airflow.task_instance.try_number": ti.try_number, + "airflow.task_instance.map_index": ti.map_index, + } + + return tracer.start_as_current_span(span_name, attributes=attributes, context=parent_context) + __all__ = [ "TriggerRunner", @@ -1179,30 +1210,35 @@ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after name = self.triggers[trigger_id]["name"] self.log.info("trigger %s starting", name) - try: - async for event in trigger.run(): - await self.log.ainfo( - "Trigger fired event", name=self.triggers[trigger_id]["name"], result=event - ) - self.triggers[trigger_id]["events"] += 1 - self.events.append((trigger_id, event)) - except asyncio.CancelledError: - # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error - # message about it - if timeout := timeout_after: - timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout - if timeout < timezone.utcnow(): - await self.log.aerror("Trigger cancelled due to timeout") - raise - finally: - # CancelledError will get injected when we're stopped - which is - # fine, the cleanup process will understand that, but we want to - # allow triggers a chance to cleanup, either in that case or if - # they exit cleanly. Exception from cleanup methods are ignored. - with suppress(Exception): - await trigger.cleanup() - - await self.log.ainfo("trigger completed", name=name) + with _prepare_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span: + try: + async for event in trigger.run(): + await self.log.ainfo( + "Trigger fired event", name=self.triggers[trigger_id]["name"], result=event + ) + self.triggers[trigger_id]["events"] += 1 + self.events.append((trigger_id, event)) + span.set_status(Status(StatusCode.OK)) + except asyncio.CancelledError as e: + # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error + # message about it + if timeout := timeout_after: + timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout + if timeout < timezone.utcnow(): + await self.log.aerror("Trigger cancelled due to timeout") + span.set_status(Status(StatusCode.ERROR), description=str(e)) + raise + except Exception as e: + span.set_status(Status(StatusCode.ERROR), description=str(e)) + finally: + # CancelledError will get injected when we're stopped - which is + # fine, the cleanup process will understand that, but we want to + # allow triggers a chance to cleanup, either in that case or if + # they exit cleanly. Exception from cleanup methods are ignored. + with suppress(Exception): + await trigger.cleanup() + + await self.log.ainfo("trigger completed", name=name) def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]: """ diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 76b7724dff03f..9513177f364d0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -146,7 +146,7 @@ def _make_task_span(msg: StartupDetails): TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if msg.ti.context_carrier else None ) ti = msg.ti - span_name = f"task_worker_run.{ti.task_id}" + span_name = f"worker_run.{ti.task_id}" if ti.map_index is not None and ti.map_index >= 0: span_name += f"_{ti.map_index}" with tracer.start_as_current_span(span_name, context=parent_context) as span: From 18590df9a2fb56f7e4ca663200be11e927f6c1e8 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 11:03:05 -0700 Subject: [PATCH 05/22] support clearing --- airflow-core/src/airflow/models/taskinstance.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 20a93a91b7f8e..43424f0542d6f 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -33,7 +33,6 @@ import structlog import uuid6 from opentelemetry import trace -from opentelemetry.trace import SpanContext, TraceFlags from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import ( JSON, @@ -101,6 +100,7 @@ from airflow.utils.span_status import SpanStatus from airflow.utils.sqlalchemy import ExecutorConfigType, ExtendedJSON, UtcDateTime from airflow.utils.state import DagRunState, State, TaskInstanceState +from airflow_shared.observability.traces import new_dagrun_trace_carrier TR = TaskReschedule @@ -385,7 +385,7 @@ def clear_task_instances( for instance in tis: run_ids_by_dag_id[instance.dag_id].add(instance.run_id) - drs = session.scalars( + drs: Iterable[DagRun] = session.scalars( select(DagRun).where( or_( *( @@ -400,6 +400,7 @@ def clear_task_instances( # Always update clear_number and queued_at when clearing tasks, regardless of state dr.clear_number += 1 dr.queued_at = timezone.utcnow() + dr.context_carrier = new_dagrun_trace_carrier() _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session) @@ -428,6 +429,8 @@ def clear_task_instances( if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None dr.start_date = None + for ti in tis: + ti.context_carrier = _make_task_carrier(ti.dag_run.context_carrier) session.flush() @@ -493,15 +496,6 @@ def _make_task_carrier(dag_run_context_carrier): parent_context = ( TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None ) - - parent_span = trace.get_current_span(parent_context) - parent_span_ctx = parent_span.get_span_context() - span_ctx = SpanContext( - trace_id=parent_span_ctx.trace_id, - span_id=parent_span_ctx.span_id, - is_remote=True, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) span = tracer.start_span("notused", context=parent_context) # intentionally never closed new_ctx = trace.set_span_in_context(span) carrier: dict[str, str] = {} From 714102daba83a20de2fb7f1c958aa1cf5c82f033 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 11:08:18 -0700 Subject: [PATCH 06/22] use queued dttm for span start --- .../airflow/api_fastapi/execution_api/routes/task_instances.py | 2 +- airflow-core/src/airflow/models/taskinstance.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index d083058324a22..d0c4220183e1f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -448,7 +448,7 @@ def _emit_task_span(ti, state): log.warning("ti has no start date", ti=ti) span = tracer.start_span( name=f"task_run.{ti.task_id}", - start_time=int((ti.start_date or timezone.utcnow()).timestamp() * 1e9), + start_time=int((ti.queued_dttm or timezone.utcnow()).timestamp() * 1e9), ) span.set_attributes( { diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 43424f0542d6f..dfbeef5e665ac 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -69,6 +69,7 @@ from airflow import settings from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager from airflow._shared.observability.metrics.stats import Stats +from airflow._shared.observability.traces import new_dagrun_trace_carrier from airflow._shared.timezones import timezone from airflow.assets.manager import asset_manager from airflow.configuration import conf @@ -100,7 +101,6 @@ from airflow.utils.span_status import SpanStatus from airflow.utils.sqlalchemy import ExecutorConfigType, ExtendedJSON, UtcDateTime from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow_shared.observability.traces import new_dagrun_trace_carrier TR = TaskReschedule From 6709732f2528d129b83ade977c916f754c751a4f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 14:07:15 -0700 Subject: [PATCH 07/22] fix typing --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index edb9354883f73..de28d7579ff10 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -38,7 +38,6 @@ from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.util._decorator import _AgnosticContextManager from pydantic import BaseModel, Field, TypeAdapter from sqlalchemy import func, select from structlog.contextvars import bind_contextvars as bind_log_contextvars @@ -91,6 +90,7 @@ from airflow.utils.session import provide_session if TYPE_CHECKING: + from opentelemetry.util._decorator import _AgnosticContextManager from sqlalchemy.orm import Session from structlog.typing import FilteringBoundLogger, WrappedLogger From c1cce2648338daca1c1c76bdc62893837954a91c Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 14:46:04 -0700 Subject: [PATCH 08/22] improve dag run span attrs --- airflow-core/src/airflow/models/dagrun.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 603d3e785dd67..4c5f7c7ccb882 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1026,14 +1026,18 @@ def _emit_dagrun_span(self, state: DagRunState): attributes = { "airflow.dag_id": str(self.dag_id), "airflow.dag_run.run_id": self.run_id, + "airflow.dag_run.start_date": self.start_date and str(self.start_date) or None, + "airflow.dag_run.end_date": self.end_date and str(self.end_date) or None, + "airflow.dag_run.queued_at": self.queued_at and str(self.queued_at) or None, + "airflow.dag_run.created_at": self.created_at and str(self.created_at) or None, } if self.logical_date: - attributes["airflow.dag_run.logical_date"] = str(self.logical_date) + attributes["airflow.dag_run.logical_date"] = self.logical_date if self.partition_key: - attributes["airflow.dag_run.partition_key"] = str(self.partition_key) + attributes["airflow.dag_run.partition_key"] = self.partition_key span = tracer.start_span( name=f"dag_run.{self.dag_id}", - start_time=int((self.start_date or timezone.utcnow()).timestamp() * 1e9), + start_time=int((self.queued_at or self.start_date or timezone.utcnow()).timestamp() * 1e9), attributes=attributes, context=context.Context(), ) From 1745c7b5b4a7d9004d98193399cc28a55209248a Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 14:56:24 -0700 Subject: [PATCH 09/22] fix tests, and a bug --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 1 + airflow-core/tests/unit/jobs/test_triggerer_job.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index de28d7579ff10..99cf6228bb5f6 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1230,6 +1230,7 @@ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after raise except Exception as e: span.set_status(Status(StatusCode.ERROR), description=str(e)) + raise finally: # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 3d77880427929..88454418a20d8 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -349,11 +349,12 @@ def test_run_inline_trigger_canceled(self, session) -> None: mock_trigger = MagicMock(spec=BaseTrigger) mock_trigger.timeout_after = None mock_trigger.run.side_effect = asyncio.CancelledError() + mock_trigger.task_instance = MagicMock() + mock_trigger.task_instance.map_index = -1 with pytest.raises(asyncio.CancelledError): asyncio.run(trigger_runner.run_trigger(1, mock_trigger)) - # @pytest.mark.asyncio def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: trigger_runner = TriggerRunner() trigger_runner.triggers = { @@ -361,6 +362,8 @@ def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: } mock_trigger = MagicMock(spec=BaseTrigger) mock_trigger.run.side_effect = asyncio.CancelledError() + mock_trigger.task_instance = MagicMock() + mock_trigger.task_instance.map_index = -1 with pytest.raises(asyncio.CancelledError): asyncio.run( From f62463235779164e98009c92065feeb65711b501 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:01:30 -0700 Subject: [PATCH 10/22] small reverts --- .../src/airflow/api_fastapi/common/http_access_log.py | 2 +- airflow-core/src/airflow/executors/workloads/task.py | 1 - airflow-core/src/airflow/models/dagrun.py | 4 ++-- airflow-core/src/airflow/models/taskinstance.py | 4 ++-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py index 55e0c73ea2fb6..581c359379011 100644 --- a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py +++ b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py @@ -95,7 +95,7 @@ async def capture_send(message: Message) -> None: client = scope.get("client") client_addr = f"{client[0]}:{client[1]}" if client else None - logger.debug( + logger.info( "request finished", method=method, path=path, diff --git a/airflow-core/src/airflow/executors/workloads/task.py b/airflow-core/src/airflow/executors/workloads/task.py index b0c7187a385a4..4ca8c310fb5c2 100644 --- a/airflow-core/src/airflow/executors/workloads/task.py +++ b/airflow-core/src/airflow/executors/workloads/task.py @@ -86,7 +86,6 @@ def make( from airflow.utils.helpers import log_filename_template_renderer ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True) - # ser_ti.context_carrier = ti.dag_run.context_carrier if not bundle_info: bundle_info = BundleInfo( name=ti.dag_model.bundle_name, diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 4c5f7c7ccb882..4c05747417979 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1032,9 +1032,9 @@ def _emit_dagrun_span(self, state: DagRunState): "airflow.dag_run.created_at": self.created_at and str(self.created_at) or None, } if self.logical_date: - attributes["airflow.dag_run.logical_date"] = self.logical_date + attributes["airflow.dag_run.logical_date"] = str(self.logical_date) if self.partition_key: - attributes["airflow.dag_run.partition_key"] = self.partition_key + attributes["airflow.dag_run.partition_key"] = str(self.partition_key) span = tracer.start_span( name=f"dag_run.{self.dag_id}", start_time=int((self.queued_at or self.start_date or timezone.utcnow()).timestamp() * 1e9), diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index dfbeef5e665ac..6ee3ef1eb44b1 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -20,6 +20,7 @@ import hashlib import itertools import json +import logging import math from collections import defaultdict from collections.abc import Collection, Iterable @@ -30,7 +31,6 @@ import attrs import dill -import structlog import uuid6 from opentelemetry import trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator @@ -104,7 +104,7 @@ TR = TaskReschedule -log = structlog.get_logger(__name__) +log = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) if TYPE_CHECKING: From 0d3fe473a0ac722f65ff335e68352cd07904df59 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:04:23 -0700 Subject: [PATCH 11/22] small fix --- airflow-core/src/airflow/models/taskinstance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 6ee3ef1eb44b1..2e6e44eb3c7a0 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -500,7 +500,6 @@ def _make_task_carrier(dag_run_context_carrier): new_ctx = trace.set_span_in_context(span) carrier: dict[str, str] = {} TraceContextTextMapPropagator().inject(carrier, context=new_ctx) - log.warning("making ti carrier", dag_run_carrier=dag_run_context_carrier, ti_carrier=carrier) return carrier From 43c991311808b82d92081b89de5a57f1e67a4775 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:28:55 -0700 Subject: [PATCH 12/22] try fix parent span --- .../api_fastapi/execution_api/routes/task_instances.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index d0c4220183e1f..2d68e8ff2ef8e 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -440,7 +440,9 @@ def _emit_task_span(ti, state): log.info("making task span", ti=ti) log.info("ti.context_carrier", context_carrier=ti.context_carrier) ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) - span = trace.get_current_span(context=ctx) + log.info("dag_run.context_carrier", context_carrier=ti.dag_run.context_carrier) + parent_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) + span = trace.get_current_span(context=parent_ctx) span_context = span.get_span_context() with override_ids(span_context.trace_id, span_context.span_id): log.info("overriding ids", trace_id=span_context.trace_id, span_id=span_context.span_id) @@ -449,7 +451,9 @@ def _emit_task_span(ti, state): span = tracer.start_span( name=f"task_run.{ti.task_id}", start_time=int((ti.queued_dttm or timezone.utcnow()).timestamp() * 1e9), + context=parent_ctx, ) + span.set_attributes( { "airflow.dag_id": ti.dag_id, From 747724b94be07186857d49a8cdf011dec9de153f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:29:04 -0700 Subject: [PATCH 13/22] Revert "try fix parent span" This reverts commit 4cfc2ce7ee63552fd50b517052eea06dab323396. --- .../api_fastapi/execution_api/routes/task_instances.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 2d68e8ff2ef8e..d0c4220183e1f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -440,9 +440,7 @@ def _emit_task_span(ti, state): log.info("making task span", ti=ti) log.info("ti.context_carrier", context_carrier=ti.context_carrier) ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) - log.info("dag_run.context_carrier", context_carrier=ti.dag_run.context_carrier) - parent_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) - span = trace.get_current_span(context=parent_ctx) + span = trace.get_current_span(context=ctx) span_context = span.get_span_context() with override_ids(span_context.trace_id, span_context.span_id): log.info("overriding ids", trace_id=span_context.trace_id, span_id=span_context.span_id) @@ -451,9 +449,7 @@ def _emit_task_span(ti, state): span = tracer.start_span( name=f"task_run.{ti.task_id}", start_time=int((ti.queued_dttm or timezone.utcnow()).timestamp() * 1e9), - context=parent_ctx, ) - span.set_attributes( { "airflow.dag_id": ti.dag_id, From 3ed122bfa3b81f10df90160e836daf67752e5564 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:41:18 -0700 Subject: [PATCH 14/22] fix spans parent! --- .../execution_api/routes/task_instances.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index d0c4220183e1f..347e751e71fc2 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -437,19 +437,19 @@ def ti_update_state( def _emit_task_span(ti, state): - log.info("making task span", ti=ti) - log.info("ti.context_carrier", context_carrier=ti.context_carrier) - ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) - span = trace.get_current_span(context=ctx) - span_context = span.get_span_context() + dr_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) + + ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) + ti_span = trace.get_current_span(context=ti_ctx) + span_context = ti_span.get_span_context() + with override_ids(span_context.trace_id, span_context.span_id): - log.info("overriding ids", trace_id=span_context.trace_id, span_id=span_context.span_id) - if not ti.start_date: - log.warning("ti has no start date", ti=ti) span = tracer.start_span( name=f"task_run.{ti.task_id}", start_time=int((ti.queued_dttm or timezone.utcnow()).timestamp() * 1e9), + context=dr_ctx, ) + span.set_attributes( { "airflow.dag_id": ti.dag_id, From 8bd301b59bb39c4cbc4aaaacc02fc3af58abb28a Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:53:52 -0700 Subject: [PATCH 15/22] fix start date --- .../api_fastapi/execution_api/routes/task_instances.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 347e751e71fc2..aaa7cef5d81cd 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -442,11 +442,11 @@ def _emit_task_span(ti, state): ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) ti_span = trace.get_current_span(context=ti_ctx) span_context = ti_span.get_span_context() - + start_time_candidates = (x for x in (ti.queued_dttm, ti.start_date, timezone.utcnow()) if x) with override_ids(span_context.trace_id, span_context.span_id): span = tracer.start_span( name=f"task_run.{ti.task_id}", - start_time=int((ti.queued_dttm or timezone.utcnow()).timestamp() * 1e9), + start_time=int(min(start_time_candidates).timestamp() * 1e9), context=dr_ctx, ) From 9c4172434aa392cd4184230dd60195fc93306cd4 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:39:15 -0700 Subject: [PATCH 16/22] fix runtime-expanded tasks --- .../execution_api/routes/task_instances.py | 5 ++++- airflow-core/src/airflow/models/taskmap.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index aaa7cef5d81cd..4cb1e7db16944 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -443,9 +443,12 @@ def _emit_task_span(ti, state): ti_span = trace.get_current_span(context=ti_ctx) span_context = ti_span.get_span_context() start_time_candidates = (x for x in (ti.queued_dttm, ti.start_date, timezone.utcnow()) if x) + name = f"task_run.{ti.task_id}" + if ti.map_index >= 0: + name += f"_{ti.map_index}" with override_ids(span_context.trace_id, span_context.span_id): span = tracer.start_span( - name=f"task_run.{ti.task_id}", + name=name, start_time=int(min(start_time_candidates).timestamp() * 1e9), context=dr_ctx, ) diff --git a/airflow-core/src/airflow/models/taskmap.py b/airflow-core/src/airflow/models/taskmap.py index 18d09d6aa564f..1ff12655eaf57 100644 --- a/airflow-core/src/airflow/models/taskmap.py +++ b/airflow-core/src/airflow/models/taskmap.py @@ -24,6 +24,8 @@ from collections.abc import Collection, Iterable, Sequence from typing import TYPE_CHECKING, Any +from opentelemetry import trace +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import CheckConstraint, ForeignKeyConstraint, Integer, String, func, or_, select from sqlalchemy.orm import Mapped, mapped_column @@ -38,6 +40,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.serialization.definitions.mappedoperator import Operator +tracer = trace.get_tracer(__name__) class TaskMapVariant(enum.Enum): @@ -52,6 +55,17 @@ class TaskMapVariant(enum.Enum): LIST = "list" +def _make_task_carrier(dag_run_context_carrier): + parent_context = ( + TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None + ) + span = tracer.start_span("notused", context=parent_context) # intentionally never closed + new_ctx = trace.set_span_in_context(span) + carrier: dict[str, str] = {} + TraceContextTextMapPropagator().inject(carrier, context=new_ctx) + return carrier + + class TaskMap(TaskInstanceDependencies): """ Model to track dynamic task-mapping information. @@ -254,6 +268,7 @@ def expand_mapped_task( task.log.debug("Expanding TIs upserted %s", ti) task_instance_mutation_hook(ti) ti = session.merge(ti) + ti.context_carrier = _make_task_carrier(unmapped_ti.dag_run.context_carrier) ti.refresh_from_task(task) # session.merge() loses task information. all_expanded_tis.append(ti) From 943ff3163f792f55689cadab835c8375d1c314d5 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:53:06 -0700 Subject: [PATCH 17/22] add some safety guards --- .../api_fastapi/execution_api/routes/task_instances.py | 4 ++++ airflow-core/src/airflow/jobs/triggerer_job_runner.py | 4 ++-- airflow-core/src/airflow/models/dagrun.py | 4 ++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 4cb1e7db16944..8467b6f009527 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -437,6 +437,10 @@ def ti_update_state( def _emit_task_span(ti, state): + # just to be safe + if not (ti.dag_run and ti.dag_run.context_carrier and ti.context_carrier): + return + dr_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 99cf6228bb5f6..12985cc2bd6e4 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -103,7 +103,7 @@ tracer = trace.get_tracer(__name__) -def _prepare_span( +def _make_trigger_span( ti: TaskInstanceDTO | None, trigger_id: int, name: str ) -> _AgnosticContextManager[trace.Span]: parent_context = ( @@ -1210,7 +1210,7 @@ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after name = self.triggers[trigger_id]["name"] self.log.info("trigger %s starting", name) - with _prepare_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span: + with _make_trigger_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span: try: async for event in trigger.run(): await self.log.ainfo( diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 4c05747417979..5c7242a2bbd42 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1019,6 +1019,10 @@ def is_effective_leaf(task): return leaf_tis def _emit_dagrun_span(self, state: DagRunState): + # just to be safe + if not self.context_carrier: + return + ctx = TraceContextTextMapPropagator().extract(self.context_carrier) span = trace.get_current_span(context=ctx) span_context = span.get_span_context() From c89c5f809e7eafab87ff63447120a1fc6c310c20 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:07:43 -0700 Subject: [PATCH 18/22] fix tests --- .../api_fastapi/execution_api/routes/task_instances.py | 7 +++++-- airflow-core/src/airflow/models/dagrun.py | 2 +- airflow-core/src/airflow/models/taskmap.py | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 8467b6f009527..5bc9bb748cc56 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -438,9 +438,12 @@ def ti_update_state( def _emit_task_span(ti, state): # just to be safe - if not (ti.dag_run and ti.dag_run.context_carrier and ti.context_carrier): + if not ti.dag_run: + return + if not isinstance(ti.dag_run.context_carrier, dict): + return + if not isinstance(ti.context_carrier, dict): return - dr_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 5c7242a2bbd42..6bbeaef19f227 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1020,7 +1020,7 @@ def is_effective_leaf(task): def _emit_dagrun_span(self, state: DagRunState): # just to be safe - if not self.context_carrier: + if not isinstance(self.context_carrier, dict): return ctx = TraceContextTextMapPropagator().extract(self.context_carrier) diff --git a/airflow-core/src/airflow/models/taskmap.py b/airflow-core/src/airflow/models/taskmap.py index 1ff12655eaf57..2b2d99344b3cf 100644 --- a/airflow-core/src/airflow/models/taskmap.py +++ b/airflow-core/src/airflow/models/taskmap.py @@ -268,7 +268,8 @@ def expand_mapped_task( task.log.debug("Expanding TIs upserted %s", ti) task_instance_mutation_hook(ti) ti = session.merge(ti) - ti.context_carrier = _make_task_carrier(unmapped_ti.dag_run.context_carrier) + if unmapped_ti.dag_run: + ti.context_carrier = _make_task_carrier(unmapped_ti.dag_run.context_carrier) ti.refresh_from_task(task) # session.merge() loses task information. all_expanded_tis.append(ti) From 590894540ce924d6c2831c4e66216d5ef7f5977b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:11:09 -0700 Subject: [PATCH 19/22] fix otel int test --- airflow-core/tests/integration/otel/test_otel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index c9bd4ad8b789d..0a5ebe1d768e8 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -503,9 +503,10 @@ def get_parent_span_id(span): nested = get_span_hierarchy() assert nested == { - "sub_span1": "task_run.task1", - "task_run.task1": "dag_run.otel_test_dag", "dag_run.otel_test_dag": None, + "sub_span1": "worker_run.task1", + "task_run.task1": "dag_run.otel_test_dag", + "worker_run.task1": "task_run.task1", } def start_scheduler(self, capture_output: bool = False): From 506d1cbaed40a40bbba3adecc6692739b001036b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:15:09 -0700 Subject: [PATCH 20/22] add tests --- .../versions/head/test_task_instances.py | 142 ++++++++++++++++++ .../tests/unit/jobs/test_triggerer_job.py | 100 ++++++++++++ .../tests/unit/models/test_taskinstance.py | 107 +++++++++++++ 3 files changed, 349 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 835a8c46139f7..44e4f35702b06 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -3153,3 +3153,145 @@ def test_no_scope_defaults_to_execution(self, client, session, create_task_insta payload = {"state": "success", "end_date": "2024-10-31T13:00:00Z"} resp = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) assert resp.status_code in [200, 204] + + +class TestEmitTaskSpan: + """Tests for the _emit_task_span function in the execution API task-instance route.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from airflow._shared.observability.traces import OverrideableRandomIdGenerator + + self.exporter = InMemorySpanExporter() + provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) + provider.add_span_processor(SimpleSpanProcessor(self.exporter)) + test_tracer = provider.get_tracer("test") + with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.tracer", test_tracer): + yield + + def _make_carriers(self): + """Return a (dr_carrier, ti_carrier) pair built with a real SDK provider.""" + from opentelemetry import trace as otel_trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + p = TracerProvider() + t = p.get_tracer("setup") + dr_span = t.start_span("dr") + dr_ctx = otel_trace.set_span_in_context(dr_span) + dr_carrier: dict = {} + TraceContextTextMapPropagator().inject(dr_carrier, context=dr_ctx) + ti_span = t.start_span("ti", context=dr_ctx) + ti_ctx = otel_trace.set_span_in_context(ti_span) + ti_carrier: dict = {} + TraceContextTextMapPropagator().inject(ti_carrier, context=ti_ctx) + return dr_carrier, ti_carrier + + def _make_ti(self, task_id="my_task", map_index=-1, queued_dttm=None, start_date=None): + from unittest.mock import MagicMock + + dr_carrier, ti_carrier = self._make_carriers() + ti = MagicMock() + ti.dag_id = "test_dag" + ti.task_id = task_id + ti.run_id = "test_run" + ti.try_number = 1 + ti.map_index = map_index + ti.queued_dttm = queued_dttm + ti.start_date = start_date or DEFAULT_START_DATE + ti.dag_run.context_carrier = dr_carrier + ti.context_carrier = ti_carrier + return ti + + def test_emit_task_span_success_sets_ok_status(self): + from opentelemetry.trace import StatusCode + + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + _emit_task_span(self._make_ti(), TaskInstanceState.SUCCESS) + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + def test_emit_task_span_failed_sets_error_status(self): + from opentelemetry.trace import StatusCode + + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + _emit_task_span(self._make_ti(), TaskInstanceState.FAILED) + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + def test_emit_task_span_sets_attributes(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + ti = self._make_ti(task_id="my_task", map_index=2) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.dag_id"] == "test_dag" + assert attrs["airflow.task_id"] == "my_task" + assert attrs["airflow.dag_run.run_id"] == "test_run" + assert attrs["airflow.task_instance.try_number"] == 1 + assert attrs["airflow.task_instance.map_index"] == 2 + assert attrs["airflow.task_instance.state"] == TaskInstanceState.SUCCESS + + def test_emit_task_span_name_unmapped(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + _emit_task_span(self._make_ti(task_id="my_task", map_index=-1), TaskInstanceState.SUCCESS) + assert self.exporter.get_finished_spans()[0].name == "task_run.my_task" + + def test_emit_task_span_name_mapped(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + _emit_task_span(self._make_ti(task_id="my_task", map_index=3), TaskInstanceState.SUCCESS) + assert self.exporter.get_finished_spans()[0].name == "task_run.my_task_3" + + def test_emit_task_span_start_time_uses_queued_dttm(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + queued_dttm = timezone.parse("2024-01-01T10:00:00Z") + start_date = timezone.parse("2024-01-01T10:05:00Z") + ti = self._make_ti(queued_dttm=queued_dttm, start_date=start_date) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + assert self.exporter.get_finished_spans()[0].start_time == int(queued_dttm.timestamp() * 1e9) + + def test_emit_task_span_start_time_falls_back_to_start_date(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + start_date = timezone.parse("2024-01-01T10:05:00Z") + ti = self._make_ti(queued_dttm=None, start_date=start_date) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + assert self.exporter.get_finished_spans()[0].start_time == int(start_date.timestamp() * 1e9) + + def test_emit_task_span_skips_if_no_ti_carrier(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + ti = mock.MagicMock() + ti.dag_run.context_carrier = { + "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + } + ti.context_carrier = None + + _emit_task_span(ti, TaskInstanceState.SUCCESS) + assert len(self.exporter.get_finished_spans()) == 0 + + def test_emit_task_span_skips_if_no_dagrun_carrier(self): + from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span + + ti = mock.MagicMock() + ti.dag_run.context_carrier = None + ti.context_carrier = {"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"} + + _emit_task_span(ti, TaskInstanceState.SUCCESS) + assert len(self.exporter.get_finished_spans()) == 0 diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 88454418a20d8..619cd07193b97 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -27,6 +27,7 @@ from collections.abc import AsyncIterator from socket import socket from typing import TYPE_CHECKING, Any +from unittest import mock from unittest.mock import ANY, AsyncMock, MagicMock, patch import pendulum @@ -45,6 +46,7 @@ TriggerLoggingFactory, TriggerRunner, TriggerRunnerSupervisor, + _make_trigger_span, messages, ) from airflow.models import Connection, DagModel, DagRun, Trigger, Variable @@ -1359,3 +1361,101 @@ def get_type_names(union_type): + "\n".join(f" - {t}" for t in sorted(task_diff)) + "\n\nEither handle these types in ToTriggerRunner or update in_task_but_not_in_trigger_runner list." ) + + +class TestMakeTriggerSpan: + """Tests for the _make_trigger_span helper in the triggerer job runner.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + self.exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(self.exporter)) + test_tracer = provider.get_tracer("test") + with mock.patch("airflow.jobs.triggerer_job_runner.tracer", test_tracer): + yield + + def _make_ti_dto(self, task_id="my_task", map_index=-1, context_carrier=None): + import uuid + + from airflow.executors.workloads.task import TaskInstanceDTO + + return TaskInstanceDTO( + id=uuid.uuid4(), + dag_version_id=uuid.uuid4(), + task_id=task_id, + dag_id="test_dag", + run_id="test_run", + try_number=1, + map_index=map_index, + pool_slots=1, + queue="default", + priority_weight=1, + context_carrier=context_carrier, + ) + + def test_make_trigger_span_name_with_task_instance(self): + ti = self._make_ti_dto(task_id="sensor_task", map_index=-1) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger_run.sensor_task" + + def test_make_trigger_span_name_with_mapped_task(self): + ti = self._make_ti_dto(task_id="sensor_task", map_index=2) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger_run.sensor_task_2" + + def test_make_trigger_span_name_without_task_instance(self): + with _make_trigger_span(ti=None, trigger_id=42, name="MySensor"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger_run.42" + + def test_make_trigger_span_uses_task_context_carrier(self): + from opentelemetry import trace as otel_trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + # Build a valid ti carrier from a separate provider so we have a known parent span. + setup_provider = TracerProvider() + setup_tracer = setup_provider.get_tracer("setup") + parent_span = setup_tracer.start_span("ti_parent") + parent_ctx = otel_trace.set_span_in_context(parent_span) + ti_carrier: dict = {} + TraceContextTextMapPropagator().inject(ti_carrier, context=parent_ctx) + expected_parent_span_id = parent_span.get_span_context().span_id + + ti = self._make_ti_dto(context_carrier=ti_carrier) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].parent is not None + assert spans[0].parent.span_id == expected_parent_span_id + + def test_make_trigger_span_sets_attributes_with_ti(self): + ti = self._make_ti_dto(task_id="my_task", map_index=1) + with _make_trigger_span(ti=ti, trigger_id=5, name="MyTrigger"): + pass + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.trigger.name"] == "MyTrigger" + assert attrs["airflow.dag_id"] == "test_dag" + assert attrs["airflow.task_id"] == "my_task" + assert attrs["airflow.dag_run.run_id"] == "test_run" + assert attrs["airflow.task_instance.try_number"] == 1 + assert attrs["airflow.task_instance.map_index"] == 1 + + def test_make_trigger_span_sets_only_trigger_name_without_ti(self): + with _make_trigger_span(ti=None, trigger_id=99, name="OnlyTrigger"): + pass + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.trigger.name"] == "OnlyTrigger" + assert "airflow.dag_id" not in attrs + assert "airflow.task_id" not in attrs diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 82b9adc3162ae..eb13db6480a71 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -59,6 +59,7 @@ TaskInstance, TaskInstance as TI, TaskInstanceNote, + _make_task_carrier, clear_task_instances, find_relevant_relatives, ) @@ -3298,3 +3299,109 @@ def test_get_dagrun_loaded_but_none_returns_dagrun(dag_maker, session): assert dr_from_ti is not None assert dr_from_ti == dr + + +class TestMakeTaskCarrier: + """Tests for the _make_task_carrier helper.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + from opentelemetry.sdk.trace import TracerProvider + + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with mock.patch("airflow.models.taskinstance.tracer", real_tracer): + yield + + def test_make_task_carrier_returns_traceparent(self): + from airflow._shared.observability.traces import new_dagrun_trace_carrier + + carrier = _make_task_carrier(new_dagrun_trace_carrier()) + assert isinstance(carrier, dict) + assert "traceparent" in carrier + + def test_make_task_carrier_child_of_parent(self): + from opentelemetry import trace as otel_trace + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + from airflow._shared.observability.traces import new_dagrun_trace_carrier + + parent_carrier = new_dagrun_trace_carrier() + child_carrier = _make_task_carrier(parent_carrier) + + propagator = TraceContextTextMapPropagator() + parent_trace_id = ( + otel_trace.get_current_span(context=propagator.extract(parent_carrier)) + .get_span_context() + .trace_id + ) + child_trace_id = ( + otel_trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id + ) + assert child_trace_id == parent_trace_id + assert child_trace_id != 0 + + def test_make_task_carrier_with_none_carrier(self): + carrier = _make_task_carrier(None) + assert isinstance(carrier, dict) + assert "traceparent" in carrier + + +@pytest.mark.db_test +def test_insert_mapping_includes_context_carrier(dag_maker, session): + """insert_mapping should include a context_carrier with a traceparent derived from the dag run.""" + from opentelemetry.sdk.trace import TracerProvider + + from airflow._shared.observability.traces import new_dagrun_trace_carrier + + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with mock.patch("airflow.models.taskinstance.tracer", real_tracer): + with dag_maker("test_insert_mapping_carrier"): + EmptyOperator(task_id="t1") + session.flush() + + # Get the scheduler-side operator (has a proper PriorityWeightStrategy, not the enum weight_rule). + op = create_scheduler_operator(dag_maker.dag.get_task("t1")) + + # Mock the DagRun to avoid inserting into the dag_run table (schema migrations may be pending). + dag_run = mock.MagicMock() + dag_run.context_carrier = new_dagrun_trace_carrier() + + mapping = TaskInstance.insert_mapping( + run_id="test_run", + task=op, + map_index=0, + dag_version_id=None, + dag_run=dag_run, + ) + + assert "context_carrier" in mapping + assert mapping["context_carrier"] is not None + assert "traceparent" in mapping["context_carrier"] + + +@pytest.mark.db_test +def test_clear_task_instances_resets_context_carrier(dag_maker, session): + """clear_task_instances should assign fresh context carriers to both the TI and its dag run.""" + from opentelemetry.sdk.trace import TracerProvider + + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with mock.patch("airflow.models.taskinstance.tracer", real_tracer): + with dag_maker("test_clear_carrier"): + EmptyOperator(task_id="t1") + dag_run = dag_maker.create_dagrun() + ti = dag_run.get_task_instance("t1", session=session) + ti.state = TaskInstanceState.SUCCESS + # Set an explicit carrier so we can verify it changes. + ti.context_carrier = {"traceparent": "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaa0001-bbbbbbbbbbbbbbbb-01"} + session.flush() + + original_ti_traceparent = ti.context_carrier["traceparent"] + original_dr_traceparent = dag_run.context_carrier["traceparent"] + + clear_task_instances([ti], session) + + assert ti.context_carrier["traceparent"] != original_ti_traceparent + assert dag_run.context_carrier["traceparent"] != original_dr_traceparent From 069071d284d98ddbf4161e9207a682e15cd4c662 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:20:46 -0700 Subject: [PATCH 21/22] rename worker_run and trigger_run to remove _run --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 2 +- airflow-core/tests/integration/otel/test_otel.py | 4 ++-- airflow-core/tests/unit/jobs/test_triggerer_job.py | 6 +++--- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 12985cc2bd6e4..9be1da1168607 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -109,7 +109,7 @@ def _make_trigger_span( parent_context = ( TraceContextTextMapPropagator().extract(ti.context_carrier) if ti and ti.context_carrier else None ) - span_name = f"trigger_run.{ti.task_id}" if ti else f"trigger_run.{trigger_id}" + span_name = f"trigger.{ti.task_id}" if ti else f"trigger.{trigger_id}" if ti and ti.map_index >= 0: span_name += f"_{ti.map_index}" attributes: dict[str, str | int] = { diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 0a5ebe1d768e8..d042f5b7927ed 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -504,9 +504,9 @@ def get_parent_span_id(span): nested = get_span_hierarchy() assert nested == { "dag_run.otel_test_dag": None, - "sub_span1": "worker_run.task1", + "sub_span1": "worker.task1", "task_run.task1": "dag_run.otel_test_dag", - "worker_run.task1": "task_run.task1", + "worker.task1": "task_run.task1", } def start_scheduler(self, capture_output: bool = False): diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 619cd07193b97..c16d5acbb2a52 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1402,18 +1402,18 @@ def test_make_trigger_span_name_with_task_instance(self): ti = self._make_ti_dto(task_id="sensor_task", map_index=-1) with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): pass - assert self.exporter.get_finished_spans()[0].name == "trigger_run.sensor_task" + assert self.exporter.get_finished_spans()[0].name == "trigger.sensor_task" def test_make_trigger_span_name_with_mapped_task(self): ti = self._make_ti_dto(task_id="sensor_task", map_index=2) with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): pass - assert self.exporter.get_finished_spans()[0].name == "trigger_run.sensor_task_2" + assert self.exporter.get_finished_spans()[0].name == "trigger.sensor_task_2" def test_make_trigger_span_name_without_task_instance(self): with _make_trigger_span(ti=None, trigger_id=42, name="MySensor"): pass - assert self.exporter.get_finished_spans()[0].name == "trigger_run.42" + assert self.exporter.get_finished_spans()[0].name == "trigger.42" def test_make_trigger_span_uses_task_context_carrier(self): from opentelemetry import trace as otel_trace diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9513177f364d0..6782a8b992880 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -146,7 +146,7 @@ def _make_task_span(msg: StartupDetails): TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if msg.ti.context_carrier else None ) ti = msg.ti - span_name = f"worker_run.{ti.task_id}" + span_name = f"worker.{ti.task_id}" if ti.map_index is not None and ti.map_index >= 0: span_name += f"_{ti.map_index}" with tracer.start_as_current_span(span_name, context=parent_context) as span: From bd46602d50295f36e5c4ad08a30de680b5941003 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:56:43 -0700 Subject: [PATCH 22/22] fix mypy --- airflow-core/src/airflow/models/taskmap.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/taskmap.py b/airflow-core/src/airflow/models/taskmap.py index 2b2d99344b3cf..1b160b381a852 100644 --- a/airflow-core/src/airflow/models/taskmap.py +++ b/airflow-core/src/airflow/models/taskmap.py @@ -29,6 +29,7 @@ from sqlalchemy import CheckConstraint, ForeignKeyConstraint, Integer, String, func, or_, select from sqlalchemy.orm import Mapped, mapped_column +from airflow.models import DagRun from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies from airflow.models.dag_version import DagVersion from airflow.utils.db import exists_query @@ -268,8 +269,16 @@ def expand_mapped_task( task.log.debug("Expanding TIs upserted %s", ti) task_instance_mutation_hook(ti) ti = session.merge(ti) - if unmapped_ti.dag_run: - ti.context_carrier = _make_task_carrier(unmapped_ti.dag_run.context_carrier) + if unmapped_ti: + dr = unmapped_ti.dag_run + else: + dr = session.scalar( + select(DagRun).where( + DagRun.dag_id == task.dag_id, + DagRun.run_id == run_id, + ) + ) + ti.context_carrier = _make_task_carrier(dr.context_carrier) ti.refresh_from_task(task) # session.merge() loses task information. all_expanded_tis.append(ti)