Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
:param dataset_id: The dataset id.
:param group_id: The workspace id.
:param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication.
:param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_completion`` is True.
:param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits.
:param check_interval: Number of seconds to wait before rechecking the
refresh status.
:param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.
:param wait_for_completion: If True, wait for the dataset refresh to complete. If False, trigger the refresh and return immediately without waiting.
:param deferrable: This parameter is deprecated and no longer has any effect. The operator now always uses deferrable execution when ``wait_for_completion=True``.
:param deferrable: This parameter is deprecated and no longer has any effect. The operator now always
uses deferrable execution.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -99,9 +100,7 @@ def __init__(
) -> None:
super().__init__(**kwargs)
if "deferrable" in kwargs or deferrable is not True:
self.log.warning(
"The PowerBIDatasetRefreshOperator now always uses deferrable execution when wait_for_completion=True."
)
self.log.warning("The PowerBIDatasetRefreshOperator now always uses deferrable execution.")
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
self.dataset_id = dataset_id
self.group_id = group_id
Expand All @@ -121,40 +120,56 @@ def api_version(self) -> str | None:

def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
if not self.wait_for_completion:
# Fire and forget - synchronous execution, no deferral
hook = PowerBIHook(
conn_id=self.conn_id, proxies=self.proxies, api_version=self.api_version, timeout=self.timeout
)

dataset_refresh_id = hook.trigger_dataset_refresh(
dataset_id=self.dataset_id,
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=False,
request_body=self.request_body,
),
method_name=self.handle_refresh.__name__,
)

def handle_refresh(self, context: Context, event: dict[str, str] | None) -> None:
"""
Handle refresh-trigger event and optionally defer again to wait for refresh completion.

:param context: Airflow context dictionary
:param event: Event dict from trigger with status and dataset_refresh_id
"""
if not event:
return

dataset_refresh_id = event.get("dataset_refresh_id")
if dataset_refresh_id:
context["ti"].xcom_push(
key=f"{self.task_id}.powerbi_dataset_refresh_id",
value=dataset_refresh_id,
)

if dataset_refresh_id:
self.log.info("Triggered dataset refresh %s (fire-and-forget)", dataset_refresh_id)
context["ti"].xcom_push(
key=f"{self.task_id}.powerbi_dataset_refresh_id",
value=dataset_refresh_id,
)
else:
raise AirflowException("Failed to trigger dataset refresh")
if event["status"] == "error":
raise AirflowException(event["message"])

if not self.wait_for_completion:
self.log.info("Triggered dataset refresh %s (fire-and-forget)", dataset_refresh_id)
return

# Wait for termination - use deferrable trigger
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
dataset_refresh_id=dataset_refresh_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_completion,
request_body=self.request_body,
wait_for_termination=True,
),
method_name=self.execute_complete.__name__,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@
from airflow.providers.microsoft.azure.hooks.powerbi import (
PowerBIDatasetRefreshFields,
PowerBIDatasetRefreshStatus,
PowerBIHook,
)
from airflow.providers.microsoft.azure.operators.powerbi import PowerBIDatasetRefreshOperator
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger

from tests_common.test_utils.mock_context import mock_context
from tests_common.test_utils.operators.run_deferrable import execute_operator
from unit.microsoft.azure.test_utils import get_airflow_connection

try:
from airflow.sdk import timezone
except ImportError:
from airflow.utils import timezone # type: ignore[no-redef]
from airflow.utils import timezone # type: ignore[attr-defined, no-redef]


DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
Expand Down Expand Up @@ -109,10 +111,12 @@ def test_execute_wait_for_completion_with_deferrable(self, connection):

assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.dataset_refresh_id is None
assert exc.value.trigger.wait_for_termination is False
assert exc.value.method_name == "handle_refresh"

@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_powerbi_operator_async_get_refresh_status_success(self, connection):
"""Test that execute defers once when wait_for_completion=True"""
"""Test that execute first defers to trigger refresh when wait_for_completion=True"""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=True, # Explicitly set to True
Expand All @@ -128,14 +132,119 @@ def test_powerbi_operator_async_get_refresh_status_success(self, connection):
# Verify trigger has correct parameters
assert exc.value.trigger.dataset_id == DATASET_ID
assert exc.value.trigger.group_id == GROUP_ID
assert exc.value.trigger.wait_for_termination is True
assert exc.value.trigger.wait_for_termination is False

# Verify callback method name
assert exc.value.method_name == "execute_complete"
assert exc.value.method_name == "handle_refresh"

# Verify dataset_refresh_id is None (trigger will create it)
assert exc.value.trigger.dataset_refresh_id is None

@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_handle_refresh_wait_for_completion(self, connection):
"""Test that handle_refresh defers to execute_complete when wait_for_completion=True."""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=True,
)
context = {"ti": MagicMock()}

with pytest.raises(TaskDeferred) as exc:
operator.handle_refresh(
context=context,
event=SUCCESS_TRIGGER_EVENT,
)

assert context["ti"].xcom_push.call_count == 1
assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.dataset_refresh_id == NEW_REFRESH_REQUEST_ID
assert exc.value.trigger.wait_for_termination is True
assert exc.value.method_name == "execute_complete"

@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_handle_refresh_fire_and_forget(self, connection):
"""Test that handle_refresh finishes immediately for fire-and-forget mode."""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=False,
)
context = {"ti": MagicMock()}

operator.handle_refresh(
context=context,
event=SUCCESS_TRIGGER_EVENT,
)

assert context["ti"].xcom_push.call_count == 1

@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_handle_refresh_failure(self, connection):
"""Test that handle_refresh raises exception on trigger error."""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=False,
)
context = {"ti": MagicMock()}

with pytest.raises(AirflowException, match="Failed to trigger the dataset refresh."):
operator.handle_refresh(
context=context,
event={
"status": "error",
"dataset_refresh_status": None,
"message": "Failed to trigger the dataset refresh.",
"dataset_refresh_id": None,
},
)

assert context["ti"].xcom_push.call_count == 0

@mock.patch.object(PowerBIHook, "get_refresh_details_by_refresh_id")
@mock.patch.object(PowerBIHook, "trigger_dataset_refresh")
def test_execute_operator_wait_for_completion_full_lifecycle(
self, mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id
):
"""Assert the full deferrable lifecycle completes successfully."""
mock_trigger_dataset_refresh.return_value = NEW_REFRESH_REQUEST_ID
mock_get_refresh_details_by_refresh_id.side_effect = [
{
PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS,
PowerBIDatasetRefreshFields.ERROR.value: None,
},
{
PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED,
PowerBIDatasetRefreshFields.ERROR.value: None,
},
]

operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=True,
)

result, events = execute_operator(operator)

assert result is None
mock_trigger_dataset_refresh.assert_called_once_with(
dataset_id=DATASET_ID,
group_id=GROUP_ID,
request_body=REQUEST_BODY,
)
assert mock_get_refresh_details_by_refresh_id.call_count == 2
assert len(events) == 2
assert events[0].payload == {
"status": "success",
"dataset_refresh_status": None,
"message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has been triggered.",
"dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
}
assert events[1].payload == {
"status": "success",
"dataset_refresh_status": PowerBIDatasetRefreshStatus.COMPLETED,
"message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has {PowerBIDatasetRefreshStatus.COMPLETED}.",
"dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
}

def test_powerbi_operator_async_execute_complete_success(self):
"""Assert that execute_complete processes success event correctly"""
operator = PowerBIDatasetRefreshOperator(**CONFIG)
Expand Down Expand Up @@ -232,63 +341,25 @@ def test_powerbi_link(self, dag_maker, create_task_instance_of_operator):

assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK

@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_execute_fire_and_forget_mode(self, mock_connection, mock_hook_class):
"""Test fire-and-forget mode (wait_for_completion=False)"""
mock_hook_instance = mock_hook_class.return_value
mock_hook_instance.trigger_dataset_refresh.return_value = NEW_REFRESH_REQUEST_ID

operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=False,
)
context = {"ti": MagicMock()}
context["ti"].task_id = TASK_ID

# Should not raise TaskDeferred
result = operator.execute(context)

# Verify hook was called correctly
mock_hook_instance.trigger_dataset_refresh.assert_called_once_with(
dataset_id=DATASET_ID,
group_id=GROUP_ID,
request_body=REQUEST_BODY,
)

# Verify XCom push
assert context["ti"].xcom_push.call_count == 1
call_args = context["ti"].xcom_push.call_args
assert call_args[1]["key"] == f"{TASK_ID}.powerbi_dataset_refresh_id"
assert call_args[1]["value"] == NEW_REFRESH_REQUEST_ID

# Should return None (completes synchronously)
assert result is None

@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_execute_fire_and_forget_mode_failure(self, mock_connection, mock_hook_class):
"""Test fire-and-forget mode raises exception when trigger fails"""
mock_hook_instance = mock_hook_class.return_value
mock_hook_instance.trigger_dataset_refresh.return_value = None

def test_execute_fire_and_forget_mode(self, mock_connection):
"""Test fire-and-forget mode defers to trigger refresh."""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=False,
)
context = {"ti": MagicMock()}
context["ti"].task_id = TASK_ID
context = mock_context(task=operator)

# Should raise AirflowException
with pytest.raises(AirflowException, match="Failed to trigger dataset refresh"):
with pytest.raises(TaskDeferred) as exc:
operator.execute(context)

# Should not push to XCom on failure
assert context["ti"].xcom_push.call_count == 0
assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.wait_for_termination is False
assert exc.value.method_name == "handle_refresh"

@mock.patch.object(BaseHook, "get_connection", side_effect=get_airflow_connection)
def test_execute_default_behavior_waits_for_completion(self, mock_connection):
"""Test that default behavior (wait_for_completion=True) defers and waits"""
"""Test that default behavior (wait_for_completion=True) first defers to trigger refresh."""
config_without_wait = {
"task_id": TASK_ID,
"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET,
Expand All @@ -303,11 +374,11 @@ def test_execute_default_behavior_waits_for_completion(self, mock_connection):
operator = PowerBIDatasetRefreshOperator(**config_without_wait)
context = mock_context(task=operator)

# Should defer (because default is wait_for_completion=True)
# Should defer for initial trigger step (because default is wait_for_completion=True)
with pytest.raises(TaskDeferred) as exc:
operator.execute(context)

# Verify it deferred with correct trigger
assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.wait_for_termination is True
assert exc.value.method_name == "execute_complete"
assert exc.value.trigger.wait_for_termination is False
assert exc.value.method_name == "handle_refresh"
Loading