Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions azure/durable_functions/models/TaskOrchestrationExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def initialize(self):
self.exception: Optional[Exception] = None
self.orchestrator_returned: bool = False

# History-based replay detection: tracks whether we are currently
# processing old (replayed) events or new events in the current episode.
# This is used as an additional signal for backends that don't set IsPlayed.
self._is_processing_new_events: bool = False

def execute(self, context: DurableOrchestrationContext,
history: List[HistoryEvent], fn) -> str:
"""Execute an orchestration via its history to evaluate Tasks and replay events.
Expand Down Expand Up @@ -80,16 +85,28 @@ def execute(self, context: DurableOrchestrationContext,
+ "https://github.com/Azure/azure-functions-durable-python/issues."
raise Exception(err_message)

# Set initial is_replaing state.
# Pre-scan history to find the start of new (non-replayed) events.
# The last OrchestratorStarted event marks the boundary: events before
# it are old/replayed, events from it onwards are new.
# This provides a reliable replay signal for backends that don't set IsPlayed.
self._new_events_start_index = self._find_new_events_start_index(history)

# Set initial is_replaying state.
# Combine the is_played field with the history-based signal:
# we are replaying if is_played says so OR if we haven't reached new events yet.
execution_started_event = history[1]
self.current_task.is_played = execution_started_event.is_played
history_is_replaying = self._new_events_start_index > 0
self.current_task.is_played = execution_started_event.is_played or history_is_replaying

# If user code is a generator, then it uses `yield` statements (the DF API)
# and so we iterate through the DF history, generating tasks and populating
# them with values when the history provides them
if isinstance(evaluated_user_code, GeneratorType):
self.generator = evaluated_user_code
for event in history:
for index, event in enumerate(history):
# Update whether we've crossed into the new events portion of the history.
if index >= self._new_events_start_index:
self._is_processing_new_events = True
self.process_event(event)
if self.has_execution_completed:
break
Expand Down Expand Up @@ -209,8 +226,12 @@ def parse_history_event(directive_result):
# generate exception
new_value = Exception(f"{event.Reason} \n {event.Details}")

# with a yielded task now evaluated, we can try to resume the user code
task.set_is_played(event._is_played)
# With a yielded task now evaluated, we can try to resume the user code.
# Combine the event's is_played field with the history-based signal:
# a task is considered "played" (replayed) if either is_played is set
# OR we are still processing old events (not yet in the new events section).
is_played = event._is_played or not self._is_processing_new_events
task.set_is_played(is_played)
task.set_value(is_error=not is_success, value=new_value)

def resume_user_code(self):
Expand Down Expand Up @@ -254,6 +275,31 @@ def resume_user_code(self):
# until a new/not-previously-yielded task is encountered
self.resume_user_code()

def _find_new_events_start_index(self, history: List[HistoryEvent]) -> int:
"""Find the index in history where new (non-replayed) events begin.

The history is structured in episodes delimited by OrchestratorStarted
and OrchestratorCompleted events. The last OrchestratorStarted event
(which has no matching OrchestratorCompleted after it) marks the start
of the current episode containing new events.

Parameters
----------
history : List[HistoryEvent]
The orchestration history.

Returns
-------
int
The index of the last OrchestratorStarted event, which is the
boundary between old (replayed) and new events.
"""
last_orchestrator_started_index = -1
for i, event in enumerate(history):
if event.event_type == HistoryEventType.ORCHESTRATOR_STARTED:
last_orchestrator_started_index = i
return last_orchestrator_started_index

def _mark_as_scheduled(self, task: TaskBase):
if isinstance(task, CompoundTask):
for task in task.children:
Expand Down
163 changes: 163 additions & 0 deletions tests/orchestrator/test_is_replaying_without_is_played.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""Tests that the is_replaying flag is correctly determined from event history structure
alone, without relying on the is_played field. This covers Durable backends that never
set IsPlayed on history events.
"""

from azure.durable_functions.models.ReplaySchema import ReplaySchema
from tests.test_utils.ContextBuilder import ContextBuilder
from .orchestrator_test_utils import get_orchestration_property
from azure.durable_functions.models.OrchestratorState import OrchestratorState
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
from datetime import datetime, timedelta, timezone


def generator_function(context):
"""Orchestrator that creates 3 sequential timers."""
timestamp = "2020-07-23T21:56:54.936700Z"
deadline = datetime.strptime(timestamp, DATETIME_STRING_FORMAT)
deadline = deadline.replace(tzinfo=timezone.utc)

for _ in range(0, 3):
deadline = deadline + timedelta(seconds=30)
yield context.create_timer(deadline)


def add_timer_fired_events_without_is_played(context_builder: ContextBuilder, id_: int, timestamp: str):
"""Add a complete timer episode without setting is_played (always False).

Adds: TimerCreated, OrchestratorCompleted, OrchestratorStarted, TimerFired.
This simulates a backend that never sets IsPlayed.
"""
fire_at: str = context_builder.add_timer_created_event(id_, timestamp)
context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()
context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at, is_played=False)


def add_activity_completed_events_without_is_played(
context_builder: ContextBuilder, name: str, id_: int, result: str):
"""Add a complete activity episode without setting is_played (always False).

Adds: TaskScheduled, OrchestratorCompleted, OrchestratorStarted, TaskCompleted.
This simulates a backend that never sets IsPlayed.
"""
context_builder.add_task_scheduled_event(name, id_)
context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()
context_builder.add_task_completed_event(id_=id_, result=result, is_played=False)

# ---------- Tests verifying replaying=True for mid-replay tasks ----------

class IsReplayingTracker:
"""Tracks is_replaying values observed at each yield point during orchestration."""

def __init__(self):
self.values_at_yield = []


def generator_function_tracking_replay(context):
"""Orchestrator that records is_replaying at each yield point."""
tracker = context._tracker

result1 = yield context.call_activity("Hello", "Tokyo")
tracker.values_at_yield.append(context.is_replaying)

result2 = yield context.call_activity("Hello", "Seattle")
tracker.values_at_yield.append(context.is_replaying)

result3 = yield context.call_activity("Hello", "London")
tracker.values_at_yield.append(context.is_replaying)

return [result1, result2, result3]


def test_hello_cities_is_replaying_mid_execution_without_is_played():
"""Verify that is_replaying is True for old events and False for new events,
even when is_played is never set.
"""
tracker = IsReplayingTracker()

context_builder = ContextBuilder("", is_replaying=False)
add_activity_completed_events_without_is_played(context_builder, "Hello", 0, '"Hello Tokyo!"')
add_activity_completed_events_without_is_played(context_builder, "Hello", 1, '"Hello Seattle!"')

context_as_string = context_builder.to_json_string()

from azure.durable_functions.models import DurableOrchestrationContext
from azure.durable_functions.orchestrator import Orchestrator

context = DurableOrchestrationContext.from_json(context_as_string)
context._tracker = tracker # type: ignore

orchestrator = Orchestrator(generator_function_tracking_replay)
orchestrator.handle(context)

# After first activity (old episode): replaying
assert tracker.values_at_yield[0] == True
# After second activity (new episode): not replaying
assert tracker.values_at_yield[1] == False


def test_hello_cities_is_replaying_completed_without_is_played():
"""Verify intermediate is_replaying states when all three activities are completed.
"""
tracker = IsReplayingTracker()

context_builder = ContextBuilder("", is_replaying=False)
add_activity_completed_events_without_is_played(context_builder, "Hello", 0, '"Hello Tokyo!"')
add_activity_completed_events_without_is_played(context_builder, "Hello", 1, '"Hello Seattle!"')
add_activity_completed_events_without_is_played(context_builder, "Hello", 2, '"Hello London!"')

context_as_string = context_builder.to_json_string()

from azure.durable_functions.models import DurableOrchestrationContext
from azure.durable_functions.orchestrator import Orchestrator

context = DurableOrchestrationContext.from_json(context_as_string)
context._tracker = tracker # type: ignore

orchestrator = Orchestrator(generator_function_tracking_replay)
orchestrator.handle(context)

# After first activity (old episode): replaying
assert tracker.values_at_yield[0] == True
# After second activity (old episode): replaying
assert tracker.values_at_yield[1] == True
# After third activity (new episode): not replaying
assert tracker.values_at_yield[2] == False


def test_is_played_does_not_affect_is_replaying_behavior():
"""Verify that history-based detection produces the same is_replaying result
as the is_played-based detection for one replayed timer event.
"""
timestamp = "2020-07-23T21:56:54.9367Z"
fire_at_1 = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + timedelta(seconds=30)
fire_at_str_1 = fire_at_1.strftime(DATETIME_STRING_FORMAT)
fire_at_2 = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + timedelta(seconds=60)
fire_at_str_2 = fire_at_2.strftime(DATETIME_STRING_FORMAT)

# Traditional backend: sets is_played=True on old events, False on new events
traditional_backend_ctx = ContextBuilder("")
scheduled_fire_at_1: str = traditional_backend_ctx.add_timer_created_event(0, fire_at_str_1)
traditional_backend_ctx.add_orchestrator_completed_event()
traditional_backend_ctx.add_orchestrator_started_event()
traditional_backend_ctx.add_timer_fired_event(id_=0, fire_at=scheduled_fire_at_1, is_played=True)
scheduled_fire_at_2: str = traditional_backend_ctx.add_timer_created_event(1, fire_at_str_2)
traditional_backend_ctx.add_orchestrator_completed_event()
traditional_backend_ctx.add_orchestrator_started_event()
traditional_backend_ctx.add_timer_fired_event(id_=1, fire_at=scheduled_fire_at_2, is_played=False)

result_traditional = get_orchestration_property(
traditional_backend_ctx, generator_function, "durable_context")

# Backend that never sets is_played (always False), relies on history structure
history_based_ctx = ContextBuilder("", is_replaying=False)
add_timer_fired_events_without_is_played(history_based_ctx, 0, fire_at_str_1)
add_timer_fired_events_without_is_played(history_based_ctx, 1, fire_at_str_2)

result_history_based = get_orchestration_property(
history_based_ctx, generator_function, "durable_context")

# Both approaches should agree on the final is_replaying state
assert result_traditional.is_replaying == result_history_based.is_replaying == False