Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions google/auth/environment_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
"""Environment variable defining the location of Google API certificate config
file."""

CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE = (
"CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE"
)
"""Environment variable controlling whether to use client certificate or not.
This variable is the fallback of GOOGLE_API_USE_CLIENT_CERTIFICATE."""

CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH = (
"CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH"
)
"""Environment variable defining the location of Google API certificate config
file. This variable is the fallback of GOOGLE_API_CERTIFICATE_CONFIG."""

GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES = (
"GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES"
)
Expand Down
20 changes: 4 additions & 16 deletions google/auth/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import base64
import http.client as http_client
import json
import os

from google.auth import _exponential_backoff
from google.auth import _helpers
from google.auth import credentials
from google.auth import crypt
from google.auth import exceptions
from google.auth.transport import mtls
from google.auth.transport import _mtls_helper

IAM_RETRY_CODES = {
http_client.INTERNAL_SERVER_ERROR,
Expand All @@ -40,20 +39,9 @@

_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]

# 1. Determine if we should use mTLS.
# Note: We only support automatic mTLS on the default googleapis.com universe.
if hasattr(mtls, "should_use_client_cert"):
use_client_cert = mtls.should_use_client_cert()
else: # pragma: NO COVER
# if unsupported, fallback to reading from env var
use_client_cert = (
os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false").lower() == "true"
)

# 2. Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
# This ensures that the .replace() calls in the classes will work correctly.
if use_client_cert:
# We use the .mtls. prefix only for the default universe template
# Determine if we should use mTLS.
if hasattr(_mtls_helper, "check_use_client_cert") and _mtls_helper.check_use_client_cert():
# Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
_IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
else:
_IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
Expand Down
23 changes: 20 additions & 3 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ def _get_cert_config_path(certificate_config_path=None):
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
env_path = environ.get(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH,
None,
)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
Expand Down Expand Up @@ -452,13 +459,23 @@ def check_use_client_cert():
Returns:
bool: Whether the client certificate should be used for mTLS connection.
"""
use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE")
use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert is None:
use_client_cert = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
)

# Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
if use_client_cert:
return use_client_cert.lower() == "true"
else:
# Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)
Comment on lines +473 to +477
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the code more concise, you can use the or operator to chain the getenv calls. This will achieve the same fallback logic in a single line.

Suggested change
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)
cert_path = getenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG
) or getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)


if cert_path:
try:
with open(cert_path, "r") as f:
Expand Down
94 changes: 94 additions & 0 deletions tests/transport/test__mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from OpenSSL import crypto
import pytest # type: ignore

from google.auth import environment_vars
from google.auth import exceptions
from google.auth.transport import _mtls_helper

Expand Down Expand Up @@ -681,6 +682,31 @@ def test_env_variable_file_does_not_exist(self, mock_path_exists):
returned_path = _mtls_helper._get_cert_config_path()
assert returned_path is None

def test_cert_config_path_precedence(self):
# GOOGLE_API_CERTIFICATE_CONFIG takes precedence
google_path = "/path/to/google/config"
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG: google_path,
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == google_path

def test_cert_config_path_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH if GOOGLE_API_CERTIFICATE_CONFIG is unset
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]

with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == cloudsdk_path
Comment on lines +701 to +708
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To simplify the test setup and avoid modifying os.environ directly, you can use the clear=True argument in mock.patch.dict. This ensures the test runs in a clean environment, making the test more robust and readable.

Suggested change
with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]
with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == cloudsdk_path
with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}, clear=True):
with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == cloudsdk_path


@mock.patch.dict(
os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
)
Expand Down Expand Up @@ -756,6 +782,22 @@ def test_env_var_explicit_false(self):
def test_env_var_explicit_garbage(self):
assert _mtls_helper.check_use_client_cert() is False

def test_env_var_explicit_empty_string_prevents_fallback(self):
# GOOGLE_API_USE_CLIENT_CERTIFICATE is present but empty.
# It is falsy, but it SHOULD NOT fallback to CLOUDSDK var because
# GOOGLE_API_... is explicitly set (though empty).
with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
}):
# Current implementation:
# use_client_cert = "" -> if use_client_cert: False.
# It checks if value is None to decide fallback.
# value is "", so it is not None.
# Fallback is skipped.
# Returns False.
assert _mtls_helper.check_use_client_cert() is False

@mock.patch("builtins.open", autospec=True)
@mock.patch.dict(
os.environ,
Expand Down Expand Up @@ -811,6 +853,58 @@ def test_config_file_not_found(self, mock_file):
def test_no_env_vars_set(self):
assert _mtls_helper.check_use_client_cert() is False

def test_use_client_cert_precedence(self):
# GOOGLE_API_USE_CLIENT_CERTIFICATE takes precedence
with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"
}):
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "false",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
}):
assert _mtls_helper.check_use_client_cert() is False

def test_use_client_cert_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE if GOOGLE_API_USE_CLIENT_CERTIFICATE is unset
with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
}):
# Ensure GOOGLE_API_USE_CLIENT_CERTIFICATE is not set
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"
}):
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is False
Comment on lines +870 to +885
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the test setup cleaner and more reliable, you can use clear=True with mock.patch.dict. This will ensure that os.environ is cleared before applying the mocked values, removing the need to manually delete environment variables within the test.

    def test_use_client_cert_fallback(self):
        # Fallback to CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE if GOOGLE_API_USE_CLIENT_CERTIFICATE is unset
        with mock.patch.dict(os.environ, {
            environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
        }, clear=True):
             assert _mtls_helper.check_use_client_cert() is True

        with mock.patch.dict(os.environ, {
            environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"
        }, clear=True):
             assert _mtls_helper.check_use_client_cert() is False


@mock.patch("builtins.open", autospec=True)
def test_check_use_client_cert_config_fallback(self, mock_file):
# Test fallback for config file when determining if client cert should be used
cloudsdk_path = "/path/to/cloudsdk/config"

mock_file.side_effect = mock.mock_open(
read_data='{"cert_configs": {"workload": "exists"}}'
)

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
if environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE]

assert _mtls_helper.check_use_client_cert() is True
Comment on lines +896 to +906
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To simplify this test, you can use clear=True in mock.patch.dict. This will provide a clean environment for your test, removing the need to manually delete several environment variables and making the test's intent clearer.

        with mock.patch.dict(os.environ, {
            environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
        }, clear=True):
             assert _mtls_helper.check_use_client_cert() is True



class TestMtlsHelper:
@mock.patch.object(_mtls_helper, "call_client_cert_callback")
Expand Down
Loading