Replies: 5 comments
-
|
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
|
Could you please provide a full Dag example? |
Beta Was this translation helpful? Give feedback.
-
|
Hi @shahar1, Here's a DAG example that used to work with 2.11. from __future__ import annotations
from airflow import DAG
import pendulum
from operators.extraction import ExtractionOperator
from connectors.jdbc.source.oracle_source_connector import OracleSourceConnector
from connectors.cloud.target.gcs_target_connector import GCSTargetConnector
with DAG(
dag_id='ar_collectors',
start_date=pendulum.datetime(2025, 10, 1, tz="America/Toronto"),
schedule="0 10 * * *",
catchup=False,
tags=['jdbc', 'oracleebs', 'ar_collectors']
) as dag:
# 1. Define the Source (Oracle)
oracle_source = OracleSourceConnector(
conn_id='oracleebs',
sql="SELECT * FROM AR.AR_COLLECTORS",
query_mode='delta',
delta_column='LAST_UPDATE_DATE',
xcom_key='ar_collectors_delta'
)
# 2. Define the Target (GCS)
gcs_target = GCSTargetConnector(
conn_id='gcs-bucket-project',
table_name='AR_COLLECTORS',
gcs_path='raw/OracleEBS'
)
# 3. Use the Orchestrator (ExtractionOperator)
ExtractionOperator(
task_id='select_ar_collectors',
source=oracle_source,
targets=[gcs_target]
)FYI, the OracleSourceConnector does retrieve its connection and can retrieve the data. The Oracle Source Connector retrieves its connection using this parent class: from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException
class BaseHook(BaseHook):
"""Base class for all internal hooks to standardize logging and error handling."""
def __init__(self, conn_id: str, **kwargs):
super().__init__(**kwargs)
self.conn_id = conn_id
def get_connection_metadata(self):
"""Standardized connection retrieval with clear error messaging."""
try:
return self.get_connection(self.conn_id)
except Exception:
self.log.error(f"Failed to find Airflow Connection: {self.conn_id}")
raise AirflowException(f"Connection {self.conn_id} is missing or inaccessible.") |
Beta Was this translation helpful? Give feedback.
-
|
Ok, some more insights. I was able to get it working. See gcs_target_connector.py, most of the changes were to moved the Hook initialisation out of the enter method. from google.cloud import storage
from airflow.exceptions import AirflowException
from connectors.base_connector import TargetConnector
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
class GCSTargetConnector(TargetConnector):
"""
Target connector for writing data to Google Cloud Storage (GCS).
Handles file naming, bucket/project resolution, and client setup.
"""
def __init__(self, conn_id="google_cloud_default", bucket_name=None,
project_id=None, gcs_path=None, table_name=None, file_prefix="stream"):
"""
Initialize the GCS target connector.
conn_id: Airflow connection ID for GCS.
bucket_name: Name of the GCS bucket to write to.
project_id: GCP project ID.
gcs_path: Path in the bucket to write files.
table_name: Optional table name for metadata.
file_prefix: Prefix for output files.
"""
super().__init__(conn_id)
self.bucket_name = bucket_name
self.project_id = project_id
self.gcs_path = gcs_path
self.table_name = table_name
self.file_prefix = file_prefix
self._part_num = 0
self._hook = None
@property
def hook(self):
if self._hook is None:
# The Task SDK will resolve this only when first accessed
self.log.info(f"Resolving connection: {self.connection_id}")
self._hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
return self._hook
def __enter__(self):
# 1. Initialize the hook.
# Note: 'gcp_conn_id' is the standard param for GoogleCloudBaseHook/GoogleBaseHook
# self.log.info("Before getting hook")
# hook = GoogleBaseHook(gcp_conn_id=self.connection_id)
# self.log.info("After getting hook")
# # 2. Explicitly fetch credentials.
# # This is where ADC is triggered.
# credentials = hook.get_credentials()
# self.log.info("After getting credentials")
# # 3. Resolve Project ID.
# # For ADC, 'Project Id' in the Airflow Connection UI is still highly recommended.
# self._effective_project_id = self.project_id or hook.project_id
# if not self._effective_project_id:
# raise AirflowException("GCP Project ID could not be determined from Connection or ADC.")
# self._effective_bucket_name = self.bucket_name or self._effective_project_id
# self.log.info("Before initializing client")
# # 4. Initialize storage client
# self.client = storage.Client(
# project=self._effective_project_id,
# credentials=credentials
# )
# self.log.info("After initializing client")
self.log.info("Entering GCS context...")
return self
def save(self, log, data: bytes, context: dict):
if not data:
return None
# Use the property 'self.hook' to ensure _hook is initialized
hook_obj = self.hook
# Initialize client once or check if it exists
if self.client is None:
self.log.info("Initializing GCS Storage Client for the first batch...")
credentials = hook_obj.get_credentials()
self._effective_project_id = self.project_id or hook_obj.project_id
if not self._effective_project_id:
raise AirflowException("GCP Project ID could not be determined.")
self._effective_bucket_name = self.bucket_name or self._effective_project_id
self.client = storage.Client(
project=self._effective_project_id,
credentials=credentials
)
logical_date = context['logical_date']
formatted_date = logical_date.strftime("%Y%m%d%H%M%S%f")[:-3]
file_name = f"{self.gcs_path}/{self.table_name}/{self.file_prefix}_{self.table_name}_{formatted_date}_{self._part_num}.json"
bucket = self.client.bucket(self._effective_bucket_name)
blob = bucket.blob(file_name)
log.info(f"Uploading to gs://{self._effective_bucket_name}/{file_name}")
blob.upload_from_string(data, content_type="application/jsonl")
self._part_num += 1
return f"gs://{self._effective_bucket_name}/{file_name}"According to Gemini:
The Eager Hook Problem: The GoogleBaseHook source code shows that it attempts to fetch connection details immediately upon instantiation to populate its "extras". Initialization Timing: When you instantiated this in enter, the Task SDK hadn't yet fully "hydrated" the task environment. The worker tried to reach the API before the proxy was ready, resulting in the AirflowNotFoundException. The Lazy Solution: By moving the hook to a @Property, you deferred the connection lookup until the save method was called. At that point, the task is in its "active" phase, and the Task SDK is fully synchronized with the Internal API, allowing the connection to be resolved instantly. Not sure if that makes any sense or if there's an issue with the task SDK. |
Beta Was this translation helpful? Give feedback.
-
It does make sense for me that it's related to an expected behavior of the Task SDK in Airflow 3 - please read the AIP for more details. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
19.3.0
Apache Airflow version
3.1.6
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Docker-Compose
Deployment details
Deployment was bade using the base docker-compose file.
A custom image has been made with additionnal requirements
Docker file
What happened
I am not able to retrieve a Google Cloud connection anymore.
Produce the following error:
The connection can successfully be retrieved from CLI using the following command:
What you think should happen instead
I should be able to retrieve the connection like previously in Airflow 2.11.0. I haven't seen any documentation that would prove otherwise but I'm may have not stumbled upon it yet.
How to reproduce
Try to instantiate a hook with a Google Platform connection using Application Default Credential (ADC)
Anything else
This may or may not be an issue and be related to changes in Airflow 3 but my JDBC connections are not suffering from the same issue using airflow.hooks.base.BaseHook. I've also tried using BaseHook to retry the connections informations without success.
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions