diff --git a/google/auth/_constants.py b/google/auth/_constants.py index 28e47025f..45c5b785c 100644 --- a/google/auth/_constants.py +++ b/google/auth/_constants.py @@ -1,5 +1,5 @@ """Shared constants.""" -_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" -_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" -_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" +_SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" +_WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/{pool_id}/allowedLocations" +_WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" diff --git a/google/auth/_helpers.py b/google/auth/_helpers.py index 750631aa5..5a7e71623 100644 --- a/google/auth/_helpers.py +++ b/google/auth/_helpers.py @@ -314,8 +314,7 @@ def get_bool_from_env(variable_name, default=False): The environment variable is interpreted as a boolean with the following (case-insensitive) rules: - "true", "1" are considered true. - - "false", "0" are considered false. - Any other values will raise an exception. + - Any other value (or unset) is considered false. Args: variable_name (str): The name of the environment variable. @@ -324,10 +323,6 @@ def get_bool_from_env(variable_name, default=False): Returns: bool: The boolean value of the environment variable. - - Raises: - google.auth.exceptions.InvalidValue: If the environment variable is - set to a value that can not be interpreted as a boolean. """ value = os.environ.get(variable_name) @@ -338,14 +333,8 @@ def get_bool_from_env(variable_name, default=False): if value in ("true", "1"): return True - elif value in ("false", "0"): - return False else: - raise exceptions.InvalidValue( - 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( - variable_name - ) - ) + return False def is_python_3(): diff --git a/google/auth/_regional_access_boundary_utils.py b/google/auth/_regional_access_boundary_utils.py new file mode 100644 index 000000000..303a5e021 --- /dev/null +++ b/google/auth/_regional_access_boundary_utils.py @@ -0,0 +1,104 @@ +"""Utilities for Regional Access Boundary management.""" + +import datetime +import threading + +from google.auth import _helpers +from google.auth._default import _LOGGER + + +# The default lifetime for a cached Regional Access Boundary. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) + +# The initial cooldown period for a failed Regional Access Boundary lookup. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) + +# The maximum cooldown period for a failed Regional Access Boundary lookup. +MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) + + +class _RegionalAccessBoundaryRefreshThread(threading.Thread): + """Thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self, credentials, request): + super(_RegionalAccessBoundaryRefreshThread, self).__init__() + self._credentials = credentials + self._request = request + + def run(self): + """ + Performs the Regional Access Boundary lookup and updates the credential's state. + + This method is run in a separate thread. It delegates the actual lookup + to the credentials object's `_lookup_regional_access_boundary` method. + Based on the lookup's outcome (success or complete failure after retries), + it updates the credential's cached Regional Access Boundary information, + its expiry, its cooldown expiry, and its exponential cooldown duration. + """ + regional_access_boundary_info = ( + self._credentials._lookup_regional_access_boundary(self._request) + ) + + with self._credentials._stale_boundary_lock: # Acquire the lock + if regional_access_boundary_info: + # On success, update the boundary and its expiry, and clear any cooldown. + self._credentials._regional_access_boundary = ( + regional_access_boundary_info + ) + self._credentials._regional_access_boundary_expiry = ( + _helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL + ) + self._credentials._regional_access_boundary_cooldown_expiry = None + # Reset the cooldown duration on success. + self._credentials._current_rab_cooldown_duration = ( + DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.debug( + "Asynchronous Regional Access Boundary lookup successful." + ) + else: + # On complete failure, calculate the next exponential cooldown duration and set the cooldown expiry. + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.warning( + "Asynchronous Regional Access Boundary lookup failed. Entering cooldown." + ) + new_cooldown_duration = ( + self._credentials._current_rab_cooldown_duration * 2 + ) + self._credentials._current_rab_cooldown_duration = min( + new_cooldown_duration, MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + self._credentials._regional_access_boundary_cooldown_expiry = ( + _helpers.utcnow() + self._credentials._current_rab_cooldown_duration + ) + # If the proactive refresh failed, clear any existing expired RAB data. + # This ensures we don't continue using stale data. + self._credentials._regional_access_boundary = None + self._credentials._regional_access_boundary_expiry = None + + +class _RegionalAccessBoundaryRefreshManager(object): + """Manages a thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self): + self._lock = threading.Lock() + self._worker = None + + def start_refresh(self, credentials, request): + """ + Starts a background thread to refresh the Regional Access Boundary if one is not already running. + + Args: + credentials (CredentialsWithRegionalAccessBoundary): The credentials + to refresh. + request (google.auth.transport.Request): The object used to make + HTTP requests. + """ + with self._lock: + if self._worker and self._worker.is_alive(): + # A refresh is already in progress. + return + + self._worker = _RegionalAccessBoundaryRefreshThread(credentials, request) + self._worker.start() diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py index 9507e837f..1071f5798 100644 --- a/google/auth/compute_engine/credentials.py +++ b/google/auth/compute_engine/credentials.py @@ -20,7 +20,11 @@ """ import datetime +import logging +_LOGGER = logging.getLogger(__name__) + +from google.auth import _constants from google.auth import _helpers from google.auth import credentials from google.auth import exceptions @@ -30,16 +34,12 @@ from google.auth.compute_engine import _metadata from google.oauth2 import _client -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) - class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithUniverseDomain, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Compute Engine Credentials. @@ -66,7 +66,6 @@ def __init__( scopes=None, default_scopes=None, universe_domain=None, - trust_boundary=None, ): """ Args: @@ -82,7 +81,6 @@ def __init__( provided or None, credential will attempt to fetch the value from metadata server. If metadata server doesn't have universe domain endpoint, then the default googleapis.com will be used. - trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() self._service_account_email = service_account_email @@ -93,7 +91,6 @@ def __init__( if universe_domain: self._universe_domain = universe_domain self._universe_domain_cached = True - self._trust_boundary = trust_boundary def _retrieve_info(self, request): """Retrieve information about the service account. @@ -146,8 +143,8 @@ def _perform_refresh_token(self, request): new_exc = exceptions.RefreshError(caught_exc) raise new_exc from caught_exc - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API for GCE.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the regional access boundary lookup API for GCE.""" # If the service account email is 'default', we need to get the # actual email address from the metadata server. if self._service_account_email == "default": @@ -157,24 +154,26 @@ def _build_trust_boundary_lookup_url(self): try: info = _metadata.get_service_account_info(request, "default") if not info or "email" not in info: - raise exceptions.RefreshError( + _LOGGER.error( "Unexpected response from metadata server: " - "service account info is missing 'email' field." + "service account info is missing 'email' field. Cannot build Regional Access Boundary lookup URL." ) + return None self._service_account_email = info["email"] except exceptions.TransportError as e: # If fetching the service account email fails due to a transport error, - # it means we cannot build the trust boundary lookup URL. - # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. - raise exceptions.RefreshError( - "Failed to get service account email for trust boundary lookup: {}".format( - e - ) - ) from e + # it means we cannot build the regional access boundary lookup URL. + _LOGGER.error( + "Failed to get service account email to build Regional Access Boundary lookup URL: %s", + e, + ) + return None - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self.universe_domain, self.service_account_email + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + self.service_account_email + ) ) @property @@ -211,17 +210,22 @@ def get_cred_info(self): "principal": self.service_account_email, } - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): + def _make_copy(self): creds = self.__class__( service_account_email=self._service_account_email, - quota_project_id=quota_project_id, + quota_project_id=self._quota_project_id, scopes=self._scopes, default_scopes=self._default_scopes, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) creds._universe_domain_cached = self._universe_domain_cached + self._copy_regional_access_boundary_state(creds) + return creds + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + creds = self._make_copy() + creds._quota_project_id = quota_project_id return creds @_helpers.copy_docstring(credentials.Scoped) @@ -229,39 +233,15 @@ def with_scopes(self, scopes, default_scopes=None): # Compute Engine credentials can not be scoped (the metadata service # ignores the scopes parameter). App Engine, Cloud Run and Flex support # requesting scopes. - creds = self.__class__( - scopes=scopes, - default_scopes=default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._scopes = scopes + creds._default_scopes = default_scopes return creds @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - return self.__class__( - scopes=self._scopes, - default_scopes=self._default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - trust_boundary=self._trust_boundary, - universe_domain=universe_domain, - ) - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - creds = self.__class__( - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - universe_domain=self._universe_domain, - trust_boundary=trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._universe_domain = universe_domain return creds diff --git a/google/auth/credentials.py b/google/auth/credentials.py index cdb206532..40db38cda 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -16,20 +16,23 @@ """Interfaces for credentials.""" import abc +import datetime from enum import Enum import logging import os -from typing import List +import threading +from urllib.parse import urlparse +import warnings +from google.auth import _exponential_backoff from google.auth import _helpers, environment_vars from google.auth import exceptions from google.auth import metrics from google.auth._credentials_base import _BaseCredentials from google.auth._refresh_worker import RefreshThreadManager +from google.auth import _regional_access_boundary_utils DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" -NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] -NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" _LOGGER = logging.getLogger("google.auth._default") @@ -290,8 +293,21 @@ def with_universe_domain(self, universe_domain): ) -class CredentialsWithTrustBoundary(Credentials): - """Abstract base for credentials supporting ``with_trust_boundary`` factory""" +class CredentialsWithRegionalAccessBoundary(Credentials): + """Abstract base for credentials supporting ``with_regional_access_boundary`` factory""" + + def __init__(self, *args, **kwargs): + super(CredentialsWithRegionalAccessBoundary, self).__init__(*args, **kwargs) + self._regional_access_boundary = None + self._regional_access_boundary_expiry = None + self._regional_access_boundary_cooldown_expiry = None + self._regional_access_boundary_refresh_manager = ( + _regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager() + ) + self._current_rab_cooldown_duration = ( + _regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + self._stale_boundary_lock = threading.Lock() @abc.abstractmethod def _perform_refresh_token(self, request): @@ -307,29 +323,137 @@ def _perform_refresh_token(self, request): """ raise NotImplementedError("_perform_refresh_token must be implemented") - def with_trust_boundary(self, trust_boundary): - """Returns a copy of these credentials with a modified trust boundary. + def with_regional_access_boundary(self, regional_access_boundary): + """Returns a copy of these credentials with a modified Regional Access Boundary. + + This method allows for manually providing the Regional Access Boundary + information, which will be cached with a 6-hour lifetime. This bypasses + the initial asynchronous lookup. After the cache expires, the library + will trigger a background refresh on the next request. Args: - trust_boundary Mapping[str, str]: The trust boundary to use for the - credential. This should be a map with a "locations" key that maps to - a list of GCP regions, and a "encodedLocations" key that maps to a - hex string. + regional_access_boundary (Mapping[str, str]): The Regional Access Boundary + to use for the credential. This should be a map with an + "encodedLocations" key that maps to a hex string. Optionally, + it can also contain a "locations" key with a list of GCP regions. + Example: `{"locations": ["us-central1"], "encodedLocations": "0xA30"}` Returns: - google.auth.credentials.Credentials: A new credentials instance. + google.auth.credentials.Credentials: A new credentials instance + with the specified Regional Access Boundary. + + Raises: + google.auth.exceptions.InvalidValue: If `regional_access_boundary` + is not a dictionary or does not contain the "encodedLocations" key. + """ + if ( + not isinstance(regional_access_boundary, dict) + or "encodedLocations" not in regional_access_boundary + ): + raise exceptions.InvalidValue( + "regional_access_boundary must be a dictionary with an 'encodedLocations' key." + ) + + new_creds = self._make_copy() + new_creds._regional_access_boundary = regional_access_boundary + + new_creds._regional_access_boundary_expiry = ( + _helpers.utcnow() + + _regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL + ) + + new_creds._regional_access_boundary_cooldown_expiry = None + + return new_creds + + def _copy_regional_access_boundary_state(self, target): + """Copies the regional access boundary state to another instance.""" + target._regional_access_boundary = self._regional_access_boundary + target._regional_access_boundary_expiry = self._regional_access_boundary_expiry + target._regional_access_boundary_cooldown_expiry = ( + self._regional_access_boundary_cooldown_expiry + ) + target._current_rab_cooldown_duration = self._current_rab_cooldown_duration + # Create a new lock for the target instance to ensure independent thread-safety. + target._stale_boundary_lock = threading.Lock() + + def handle_stale_regional_access_boundary(self, request): + """Handles a stale regional access boundary error. + This method is thread-safe and will only initiate a single refresh + even if called concurrently. + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + """ + with self._stale_boundary_lock: + # Another thread might have already handled the stale boundary. + if self._regional_access_boundary is None: + return + + _LOGGER.info("Stale regional access boundary detected. Refreshing.") + + # Clear the cached boundary. + self._regional_access_boundary = None + self._regional_access_boundary_expiry = None + + # Start the background refresh. + self._regional_access_boundary_refresh_manager.start_refresh(self, request) + + def _maybe_start_regional_access_boundary_refresh(self, request, url): + """ + Starts a background thread to refresh the Regional Access Boundary if needed. + + This method checks if a refresh is necessary and if one is not already + in progress or in a cooldown period. If so, it starts a background + thread to perform the lookup. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + url (str): The URL of the request. """ - raise NotImplementedError("This credential does not support trust boundaries.") + try: + # Do not perform a lookup if the request is for a regional endpoint. + hostname = urlparse(url).hostname + if hostname and ( + hostname.endswith(".rep.googleapis.com") + or hostname.endswith(".rep.sandbox.googleapis.com") + ): + return + except (ValueError, TypeError): + # If the URL is malformed, proceed with the default lookup behavior. + pass + + # A refresh is only needed if the feature is enabled. + if not self._is_regional_access_boundary_lookup_required(): + return + + # Don't start a new refresh if the Regional Access Boundary info is still valid. + if ( + self._regional_access_boundary + and self._regional_access_boundary_expiry + and _helpers.utcnow() < self._regional_access_boundary_expiry + ): + return + + # Don't start a new refresh if the cooldown is still in effect. + if ( + self._regional_access_boundary_cooldown_expiry + and _helpers.utcnow() < self._regional_access_boundary_cooldown_expiry + ): + return - def _is_trust_boundary_lookup_required(self): - """Checks if a trust boundary lookup is required. + # If all checks pass, start the background refresh. + self._regional_access_boundary_refresh_manager.start_refresh(self, request) + + def _is_regional_access_boundary_lookup_required(self): + """Checks if a Regional Access Boundary lookup is required. A lookup is required if the feature is enabled via an environment - variable, the universe domain is supported, and a no-op boundary - is not already cached. + variable and the universe domain is supported. Returns: - bool: True if a trust boundary lookup is required, False otherwise. + bool: True if a Regional Access Boundary lookup is required, False otherwise. """ # 1. Check if the feature is enabled via environment variable. if not _helpers.get_bool_from_env( @@ -337,112 +461,92 @@ def _is_trust_boundary_lookup_required(self): ): return False - # 2. Skip trust boundary flow for non-default universe domains. + # 2. Skip for non-default universe domains. if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: return False - # 3. Do not trigger refresh if credential has a cached no-op trust boundary. - return not self._has_no_op_trust_boundary() + return True - def _get_trust_boundary_header(self): - if self._trust_boundary is not None: - if self._has_no_op_trust_boundary(): - # STS expects an empty string if the trust boundary value is no-op. - return {"x-allowed-locations": ""} - else: - return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} + def _get_regional_access_boundary_header(self): + if self._regional_access_boundary is not None: + return { + "x-allowed-locations": self._regional_access_boundary[ + "encodedLocations" + ] + } return {} def apply(self, headers, token=None): """Apply the token to the authentication header.""" super().apply(headers, token) - headers.update(self._get_trust_boundary_header()) - def refresh(self, request): - """Refreshes the access token and the trust boundary. + boundary_header = self._get_regional_access_boundary_header() + if boundary_header: + headers.update(boundary_header) + else: + # If we have no boundary to add, ensure the header is not present + # from a previous, stale state. We use pop() with a default to + # avoid a KeyError if the header was never there. + headers.pop("x-allowed-locations", None) - This method calls the subclass's token refresh logic and then - refreshes the trust boundary if applicable. + def before_request(self, request, method, url, headers): + """Refreshes the access token and triggers the Regional Access Boundary + lookup if necessary. """ - self._perform_refresh_token(request) - self._refresh_trust_boundary(request) - - def _refresh_trust_boundary(self, request): - """Triggers a refresh of the trust boundary and updates the cache if necessary. + super(CredentialsWithRegionalAccessBoundary, self).before_request( + request, method, url, headers + ) + self._maybe_start_regional_access_boundary_refresh(request, url) - Args: - request (google.auth.transport.Request): The object used to make - HTTP requests. + def refresh(self, request): + """Refreshes the access token. - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could - not be refreshed and no cached value is available. + This method calls the subclass's token refresh logic. The Regional + Access Boundary is refreshed separately in a non-blocking way. """ - if not self._is_trust_boundary_lookup_required(): - return - try: - self._trust_boundary = self._lookup_trust_boundary(request) - except exceptions.RefreshError as error: - # If the call to the lookup API failed, check if there is a trust boundary - # already cached. If there is, do nothing. If not, then throw the error. - if self._trust_boundary is None: - raise error - if _helpers.is_logging_enabled(_LOGGER): - _LOGGER.debug( - "Using cached trust boundary due to refresh error: %s", error - ) - return + self._perform_refresh_token(request) - def _lookup_trust_boundary(self, request): - """Calls the trust boundary lookup API to refresh the trust boundary cache. + def _lookup_regional_access_boundary(self, request): + """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. Args: request (google.auth.transport.Request): The object used to make HTTP requests. Returns: - trust_boundary (dict): The trust boundary object returned by the lookup API. - - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could not be - retrieved. + Optional[dict]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. """ from google.oauth2 import _client - url = self._build_trust_boundary_lookup_url() + url = self._build_regional_access_boundary_lookup_url() if not url: - raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") + _LOGGER.error("Failed to build Regional Access Boundary lookup URL.") + return None headers = {} self._apply(headers) - headers.update(self._get_trust_boundary_header()) - return _client._lookup_trust_boundary(request, url, headers=headers) + headers.update(self._get_regional_access_boundary_header()) + return _client._lookup_regional_access_boundary(request, url, headers=headers) @abc.abstractmethod - def _build_trust_boundary_lookup_url(self): + def _build_regional_access_boundary_lookup_url(self): """ - Builds and returns the URL for the trust boundary lookup API. + Builds and returns the URL for the Regional Access Boundary lookup API. This method should be implemented by subclasses to provide the specific URL based on the credential type and its properties. Returns: - str: The URL for the trust boundary lookup endpoint, or None + str: The URL for the Regional Access Boundary lookup endpoint, or None if lookup should be skipped (e.g., for non-applicable universe domains). """ raise NotImplementedError( - "_build_trust_boundary_lookup_url must be implemented" + "_build_regional_access_boundary_lookup_url must be implemented" ) - def _has_no_op_trust_boundary(self): - # A no-op trust boundary is indicated by encodedLocations being "0x0". - # The "locations" list may or may not be present as an empty list. - if self._trust_boundary is None: - return False - return ( - self._trust_boundary.get("encodedLocations") - == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS - ) + +# For backward compatibility. +CredentialsWithTrustBoundary = CredentialsWithRegionalAccessBoundary class AnonymousCredentials(Credentials): diff --git a/google/auth/external_account.py b/google/auth/external_account.py index 05874eda7..664fe551b 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -35,6 +35,11 @@ import io import json import re +import warnings + +import logging + +_LOGGER = logging.getLogger(__name__) from google.auth import _constants from google.auth import _helpers @@ -82,7 +87,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, metaclass=abc.ABCMeta, ): """Base class for all external account credentials. @@ -117,7 +122,6 @@ def __init__( default_scopes=None, workforce_pool_user_project=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """Instantiates an external account credentials object. @@ -150,7 +154,6 @@ def __init__( billing/quota. universe_domain (str): The universe domain. The default universe domain is googleapis.com. - trust_boundary (str): String representation of trust boundary meta. Raises: google.auth.exceptions.RefreshError: If the generateAccessToken endpoint returned an error. @@ -176,7 +179,6 @@ def __init__( self._scopes = scopes self._default_scopes = default_scopes self._workforce_pool_user_project = workforce_pool_user_project - self._trust_boundary = trust_boundary if self._client_id: self._client_auth = utils.ClientAuthentication( @@ -242,7 +244,6 @@ def _constructor_args(self): "scopes": self._scopes, "default_scopes": self._default_scopes, "universe_domain": self._universe_domain, - "trust_boundary": self._trust_boundary, } if not self.is_workforce_pool: args.pop("workforce_pool_user_project") @@ -417,20 +418,9 @@ def refresh(self, request): """Refreshes the access token. For impersonated credentials, this method will refresh the underlying - source credentials and the impersonated credentials. For non-impersonated - credentials, it will refresh the access token and the trust boundary. + source credentials and the impersonated credentials. """ self._perform_refresh_token(request) - self._handle_trust_boundary(request) - - def _handle_trust_boundary(self, request): - # If we are impersonating, the trust boundary is handled by the - # impersonated credentials object. We need to get it from there. - if self._service_account_impersonation_url: - self._trust_boundary = self._impersonated_credentials._trust_boundary - else: - # Otherwise, refresh the trust boundary for the external account. - self._refresh_trust_boundary(request) def _perform_refresh_token(self, request, cert_fingerprint=None): scopes = self._scopes if self._scopes is not None else self._default_scopes @@ -486,8 +476,8 @@ def _perform_refresh_token(self, request, cert_fingerprint=None): self.expiry = now + lifetime - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API.""" url = None # Try to parse as a workload identity pool. # Audience format: //iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID @@ -497,8 +487,7 @@ def _build_trust_boundary_lookup_url(self): ) if workload_match: project_number, pool_id = workload_match.groups() - url = _constants._WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, + url = _constants._WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( project_number=project_number, pool_id=pool_id, ) @@ -510,8 +499,8 @@ def _build_trust_boundary_lookup_url(self): ) if workforce_match: pool_id = workforce_match.groups()[0] - url = _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + url = _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id ) if url: @@ -525,6 +514,7 @@ def _make_copy(self): new_cred = self.__class__(**kwargs) new_cred._cred_file_path = self._cred_file_path new_cred._metrics_options = self._metrics_options + self._copy_regional_access_boundary_state(new_cred) return new_cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -546,12 +536,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _should_initialize_impersonated_credentials(self): return ( self._service_account_impersonation_url is not None @@ -600,7 +584,6 @@ def _initialize_impersonated_credentials(self): lifetime=self._service_account_impersonation_options.get( "token_lifetime_seconds" ), - trust_boundary=self._trust_boundary, ) def _create_default_metrics_options(self): @@ -667,7 +650,7 @@ def from_info(cls, info, **kwargs): Raises: InvalidValue: For invalid parameters. """ - return cls( + initial_creds = cls( audience=info.get("audience"), subject_token_type=info.get("subject_token_type"), token_url=info.get("token_url"), @@ -687,10 +670,17 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds.with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + @classmethod def from_file(cls, filename, **kwargs): """Creates a Credentials instance from an external account json file. diff --git a/google/auth/external_account_authorized_user.py b/google/auth/external_account_authorized_user.py index 680fce628..c77da8813 100644 --- a/google/auth/external_account_authorized_user.py +++ b/google/auth/external_account_authorized_user.py @@ -37,6 +37,11 @@ import io import json import re +import warnings + +import logging + +_LOGGER = logging.getLogger(__name__) from google.auth import _constants from google.auth import _helpers @@ -52,7 +57,7 @@ class Credentials( credentials.CredentialsWithQuotaProject, credentials.ReadOnlyScoped, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Credentials for External Account Authorized Users. @@ -87,7 +92,6 @@ def __init__( scopes=None, quota_project_id=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """Instantiates a external account authorized user credentials object. @@ -113,7 +117,6 @@ def __init__( create the credentials. universe_domain (Optional[str]): The universe domain. The default value is googleapis.com. - trust_boundary (Mapping[str,str]): A credential trust boundary. Returns: google.auth.external_account_authorized_user.Credentials: The @@ -134,7 +137,6 @@ def __init__( self._scopes = scopes self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._cred_file_path = None - self._trust_boundary = trust_boundary if not self.valid and not self.can_refresh: raise exceptions.InvalidOperation( @@ -182,7 +184,6 @@ def constructor_args(self): "scopes": self._scopes, "quota_project_id": self._quota_project_id, "universe_domain": self._universe_domain, - "trust_boundary": self._trust_boundary, } @property @@ -308,18 +309,29 @@ def _perform_refresh_token(self, request): if "refresh_token" in response_data: self._refresh_token = response_data["refresh_token"] - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. + + Returns: + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the URL cannot be built due to an invalid workforce pool audience format. + """ # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience) if not match: - raise exceptions.InvalidValue("Invalid workforce pool audience format.") + _LOGGER.error( + "Invalid workforce pool audience format for Regional Access Boundary lookup: %s", + self._audience, + ) + return None pool_id = match.groups()[0] - return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + return ( + _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id + ) ) def revoke(self, request): @@ -359,6 +371,7 @@ def _make_copy(self): kwargs = self.constructor_args() cred = self.__class__(**kwargs) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -379,12 +392,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - @classmethod def from_info(cls, info, **kwargs): """Creates a Credentials instance from parsed external account info. @@ -414,7 +421,7 @@ def from_info(cls, info, **kwargs): expiry = datetime.datetime.strptime( expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" ) - return cls( + initial_creds = cls( audience=info.get("audience"), refresh_token=info.get("refresh_token"), token_url=info.get("token_url"), @@ -429,10 +436,17 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds.with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + @classmethod def from_file(cls, filename, **kwargs): """Creates a Credentials instance from an external account json file. diff --git a/google/auth/identity_pool.py b/google/auth/identity_pool.py index 50b2a83e4..30819ef04 100644 --- a/google/auth/identity_pool.py +++ b/google/auth/identity_pool.py @@ -572,4 +572,3 @@ def refresh(self, request): ) self._perform_refresh_token(request, cert_fingerprint=cert_fingerprint) - self._handle_trust_boundary(request) diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index 304f0606e..000bdf948 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -30,6 +30,7 @@ from datetime import datetime import http.client as http_client import json +import warnings from google.auth import _exponential_backoff from google.auth import _helpers @@ -40,13 +41,16 @@ from google.auth import metrics from google.oauth2 import _client +import logging + +_LOGGER = logging.getLogger(__name__) _REFRESH_ERROR = "Unable to acquire impersonated credentials" _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( +_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = ( "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" ) @@ -123,7 +127,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """This module defines impersonated credentials which are essentially impersonated identities. @@ -204,7 +208,6 @@ def __init__( lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, quota_project_id=None, iam_endpoint_override=None, - trust_boundary=None, ): """ Args: @@ -235,7 +238,6 @@ def __init__( subject (Optional[str]): sub field of a JWT. This field should only be set if you wish to impersonate as a user. This feature is useful when using domain wide delegation. - trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() @@ -267,7 +269,6 @@ def __init__( self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override self._cred_file_path = None - self._trust_boundary = trust_boundary def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE @@ -344,25 +345,23 @@ def _perform_refresh_token(self, request): iam_endpoint_override=self._iam_endpoint_override, ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for impersonated credentials." ) - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + return None + return _REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( self.universe_domain, self.service_account_email ) @@ -435,15 +434,9 @@ def _make_copy(self): lifetime=self._lifetime, quota_project_id=self._quota_project_id, iam_endpoint_override=self._iam_endpoint_override, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path - return cred - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -527,17 +520,23 @@ def from_impersonated_service_account_info(cls, info, scopes=None): delegates = info.get("delegates") quota_project_id = info.get("quota_project_id") scopes = scopes or info.get("scopes") - trust_boundary = info.get("trust_boundary") - return cls( + initial_creds = cls( source_credentials, target_principal, scopes, delegates, quota_project_id=quota_project_id, - trust_boundary=trust_boundary, ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds.with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + class IDTokenCredentials(credentials.CredentialsWithQuotaProject): """Open ID Connect ID Token-based service account credentials.""" diff --git a/google/auth/transport/requests.py b/google/auth/transport/requests.py index 9735762c4..37946c4f7 100644 --- a/google/auth/transport/requests.py +++ b/google/auth/transport/requests.py @@ -476,6 +476,18 @@ def configure_mtls_channel(self, client_cert_callback=None): new_exc = exceptions.MutualTLSChannelError(caught_exc) raise new_exc from caught_exc + def _is_stale_regional_access_boundary_error(self, response): + """Checks if the response indicates a stale regional access boundary.""" + if response.status_code != 406: + return False + + try: + # The response data is bytes, decode it to a string. + response_text = response.content.decode("utf-8") + return "stale regional access boundary" in response_text.lower() + except (UnicodeDecodeError, AttributeError): + return False + def request( self, method, @@ -519,6 +531,7 @@ def request( # Use a kwarg for this instead of an attribute to maintain # thread-safety. _credential_refresh_attempt = kwargs.pop("_credential_refresh_attempt", 0) + _stale_boundary_retried = kwargs.pop("_stale_boundary_retried", False) # Make a copy of the headers. They will be modified by the credentials # and we want to pass the original headers if we recurse. @@ -621,6 +634,28 @@ def request( **kwargs ) + # If the response indicated a stale regional access boundary, clear the + # cached boundary and re-attempt the request. This is only done once. + if ( + self._is_stale_regional_access_boundary_error(response) + and not _stale_boundary_retried + ): + _LOGGER.info( + "Stale regional access boundary detected, clearing and retrying." + ) + self.credentials.handle_stale_regional_access_boundary(auth_request) + # Recurse, passing in the original headers and marking that we have retried. + return self.request( + method, + url, + data=data, + headers=headers, + max_allowed_time=remaining_time, + timeout=timeout, + _stale_boundary_retried=True, + **kwargs + ) + return response @property diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py index d4db42007..2635d2590 100644 --- a/google/oauth2/_client.py +++ b/google/oauth2/_client.py @@ -28,6 +28,8 @@ import json import urllib +import logging + from google.auth import _exponential_backoff from google.auth import _helpers from google.auth import credentials @@ -36,6 +38,8 @@ from google.auth import metrics from google.auth import transport +_LOGGER = logging.getLogger(__name__) + _URLENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded" _JSON_CONTENT_TYPE = "application/json" _JWT_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer" @@ -514,20 +518,20 @@ def refresh_grant( return _handle_refresh_grant_response(response_data, refresh_token) -def _lookup_trust_boundary(request, url, headers=None): - """Implements the global lookup of a credential trust boundary. +def _lookup_regional_access_boundary(request, url, headers=None): + """Implements the global lookup of a credential Regional Access Boundary. For the lookup, we send a request to the global lookup endpoint and then parse the response. Service account credentials, workload identity - pools and workforce pools implementation may have trust boundaries configured. + pools and workforce pools implementation may have Regional Access Boundaries configured. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str,list|str]: A dictionary containing + Optional[Mapping[str,list|str]]: A dictionary containing "locations" as a list of allowed locations as strings and - "encodedLocations" as a hex string. + "encodedLocations" as a hex string, or None if the lookup failed. e.g: { "locations": [ @@ -535,61 +539,66 @@ def _lookup_trust_boundary(request, url, headers=None): ], "encodedLocations": "0xA30" } - If the credential is not set up with explicit trust boundaries, a trust boundary - of "all" will be returned as a default response. - { - "locations": [], - "encodedLocations": "0x0" - } - Raises: - exceptions.RefreshError: If the response status code is not 200. - exceptions.MalformedError: If the response is not in a valid format. """ - response_data = _lookup_trust_boundary_request(request, url, headers=headers) - # In case of no-op response, the "locations" list may or may not be present as an empty list. + response_data = _lookup_regional_access_boundary_request( + request, url, headers=headers + ) + if response_data is None: + # Error was already logged by _lookup_regional_access_boundary_request + return None + if "encodedLocations" not in response_data: - raise exceptions.MalformedError( - "Invalid trust boundary info: {}".format(response_data) + _LOGGER.error( + "Regional Access Boundary response malformed: missing 'encodedLocations' key in %s", + response_data, ) + return None return response_data -def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. +def _lookup_regional_access_boundary_request( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str, str]: The JSON-decoded response data. - - Raises: - google.auth.exceptions.RefreshError: If the token endpoint returned - an error. + Optional[Mapping[str, str]]: The JSON-decoded response data on success, or None on failure. """ ( response_status_ok, response_data, retryable_error, - ) = _lookup_trust_boundary_request_no_throw(request, url, can_retry, headers) + ) = _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry, headers + ) if not response_status_ok: - _handle_error_response(response_data, retryable_error) + _LOGGER.warning( + "Regional Access Boundary HTTP request failed after retries: response_data=%s, retryable_error=%s", + response_data, + retryable_error, + ) + return None return response_data -def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. This +def _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. This function doesn't throw on response errors. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. @@ -603,7 +612,7 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header response_data = {} retryable_error = False - retries = _exponential_backoff.ExponentialBackoff() + retries = _exponential_backoff.ExponentialBackoff(total_attempts=6) for _ in retries: response = request(method="GET", url=url, headers=headers) response_body = ( @@ -624,6 +633,9 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header retryable_error = _can_retry( status_code=response.status, response_data=response_data ) + # Add 502 (Bad Gateway) as a retryable error for RAB lookups. + if response.status == http_client.BAD_GATEWAY: + retryable_error = True if not can_retry or not retryable_error: return False, response_data, retryable_error diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index f897b3b75..a96943f0a 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -83,6 +83,10 @@ from google.auth import metrics from google.oauth2 import _client +import logging + +_LOGGER = logging.getLogger(__name__) + _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" @@ -92,7 +96,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Service account credentials @@ -142,7 +146,6 @@ def __init__( additional_claims=None, always_use_jwt_access=False, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """ Args: @@ -166,7 +169,6 @@ def __init__( universe_domain (str): The universe domain. The default universe domain is googleapis.com. For default value self signed jwt is used for token refresh. - trust_boundary (Mapping[str,str]): A credential trust boundary. .. note:: Typically one of the helper constructors :meth:`from_service_account_file` or @@ -196,7 +198,6 @@ def __init__( self._additional_claims = additional_claims else: self._additional_claims = {} - self._trust_boundary = trust_boundary @classmethod def _from_signer_and_info(cls, signer, info, **kwargs): @@ -214,7 +215,7 @@ def _from_signer_and_info(cls, signer, info, **kwargs): Raises: ValueError: If the info is not in the expected format. """ - return cls( + initial_creds = cls( signer, service_account_email=info["client_email"], token_uri=info["token_uri"], @@ -222,9 +223,14 @@ def _from_signer_and_info(cls, signer, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs, ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds.with_regional_access_boundary( + regional_access_boundary + ) + return initial_creds @classmethod def from_service_account_info(cls, info, **kwargs): @@ -296,9 +302,9 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.Scoped) @@ -384,12 +390,6 @@ def with_token_uri(self, token_uri): cred._token_uri = token_uri return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _make_authorization_grant_assertion(self): """Create the OAuth 2.0 assertion. @@ -433,7 +433,7 @@ def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_JWT return metrics.CRED_TYPE_SA_ASSERTION - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + @_helpers.copy_docstring(credentials.CredentialsWithRegionalAccessBoundary) def _perform_refresh_token(self, request): if self._always_use_jwt_access and not self._jwt_credentials: # If self signed jwt should be used but jwt credential is not @@ -499,27 +499,26 @@ def _create_self_signed_jwt(self, audience): self, audience ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for service account credentials." + ) + return None + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self._service_account_email, ) - return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, - service_account_email=self._service_account_email, ) @_helpers.copy_docstring(credentials.Signing)