Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions google/auth/environment_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
"""Environment variable defining the location of Google API certificate config
file."""

CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE = (
"CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE"
)
"""Environment variable controlling whether to use client certificate or not.
This variable is the fallback of GOOGLE_API_USE_CLIENT_CERTIFICATE."""

CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH = (
"CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH"
)
"""Environment variable defining the location of Google API certificate config
file. This variable is the fallback of GOOGLE_API_CERTIFICATE_CONFIG."""

GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES = (
"GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES"
)
Expand Down
23 changes: 7 additions & 16 deletions google/auth/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import base64
import http.client as http_client
import json
import os

from google.auth import _exponential_backoff
from google.auth import _helpers
from google.auth import credentials
from google.auth import crypt
from google.auth import exceptions
from google.auth.transport import mtls
from google.auth.transport import _mtls_helper

IAM_RETRY_CODES = {
http_client.INTERNAL_SERVER_ERROR,
Expand All @@ -40,20 +39,12 @@

_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]

# 1. Determine if we should use mTLS.
# Note: We only support automatic mTLS on the default googleapis.com universe.
if hasattr(mtls, "should_use_client_cert"):
use_client_cert = mtls.should_use_client_cert()
else: # pragma: NO COVER
# if unsupported, fallback to reading from env var
use_client_cert = (
os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false").lower() == "true"
)

# 2. Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
# This ensures that the .replace() calls in the classes will work correctly.
if use_client_cert:
# We use the .mtls. prefix only for the default universe template
# Determine if we should use mTLS.
if (
hasattr(_mtls_helper, "check_use_client_cert")
and _mtls_helper.check_use_client_cert()
):
# Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
_IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
else:
_IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
Expand Down
23 changes: 20 additions & 3 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ def _get_cert_config_path(certificate_config_path=None):
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
env_path = environ.get(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH,
None,
)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
Expand Down Expand Up @@ -452,13 +459,23 @@ def check_use_client_cert():
Returns:
bool: Whether the client certificate should be used for mTLS connection.
"""
use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE")
use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert is None or use_client_cert == "":
use_client_cert = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
)

# Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
if use_client_cert:
return use_client_cert.lower() == "true"
else:
# Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)

if cert_path:
try:
with open(cert_path, "r") as f:
Expand Down
101 changes: 100 additions & 1 deletion tests/transport/test__mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from OpenSSL import crypto
import pytest # type: ignore

from google.auth import exceptions
from google.auth import environment_vars, exceptions
from google.auth.transport import _mtls_helper

CERT_MOCK_VAL = b"cert"
Expand Down Expand Up @@ -681,6 +681,37 @@ def test_env_variable_file_does_not_exist(self, mock_path_exists):
returned_path = _mtls_helper._get_cert_config_path()
assert returned_path is None

def test_cert_config_path_precedence(self):
# GOOGLE_API_CERTIFICATE_CONFIG takes precedence
google_path = "/path/to/google/config"
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(
os.environ,
{
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG: google_path,
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path,
},
):
with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == google_path

def test_cert_config_path_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH if GOOGLE_API_CERTIFICATE_CONFIG is unset
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(
os.environ,
{
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
},
):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]

with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == cloudsdk_path

@mock.patch.dict(
os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
)
Expand Down Expand Up @@ -811,6 +842,74 @@ def test_config_file_not_found(self, mock_file):
def test_no_env_vars_set(self):
assert _mtls_helper.check_use_client_cert() is False

def test_use_client_cert_precedence(self):
# GOOGLE_API_USE_CLIENT_CERTIFICATE takes precedence
with mock.patch.dict(
os.environ,
{
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false",
},
):
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(
os.environ,
{
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "false",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true",
},
):
assert _mtls_helper.check_use_client_cert() is False

def test_use_client_cert_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE if GOOGLE_API_USE_CLIENT_CERTIFICATE is unset
with mock.patch.dict(
os.environ,
{environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"},
):
# Ensure GOOGLE_API_USE_CLIENT_CERTIFICATE is not set
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(
os.environ,
{environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"},
):
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is False

@mock.patch("builtins.open", autospec=True)
def test_check_use_client_cert_config_fallback(self, mock_file):
# Test fallback for config file when determining if client cert should be used
cloudsdk_path = "/path/to/cloudsdk/config"

mock_file.side_effect = mock.mock_open(
read_data='{"cert_configs": {"workload": "exists"}}'
)

with mock.patch.dict(
os.environ,
{
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
},
):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
if (
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
in os.environ
):
del os.environ[
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
]

assert _mtls_helper.check_use_client_cert() is True


class TestMtlsHelper:
@mock.patch.object(_mtls_helper, "call_client_cert_callback")
Expand Down
Loading