Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/oauth/examples/user_identifier_token_exchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Example: User Identifier Token Exchange

This example demonstrates how to use the user_identifier parameter to retrieve
grants on behalf of users without requiring the user to be present, after
a one-time consent.
"""


from keycardai.oauth import AsyncClient, TokenType


async def main():
"""Exchange using user identifier for offline delegated grant access."""

# Example 1: Using AsyncClient with user_identifier parameter
async with AsyncClient(
base_url="https://zone1234.keycard.cloud",
client_id="agent-app-id",
client_secret="agent-app-secret"
) as client:

# Exchange token using user identifier
# The SDK automatically builds the unsigned JWT and sets the correct token type
response = await client.exchange_token(
user_identifier="user@example.com",
resource="https://graph.microsoft.com"
)

print(f"Access token: {response.access_token[:20]}...")
print(f"Token type: {response.token_type}")
print(f"Expires in: {response.expires_in}s")

# Example 2: Building the unsigned JWT manually (advanced usage)
from keycardai.oauth.utils.jwt import build_user_identifier_token

user_identifier_jwt = build_user_identifier_token("user@example.com")
print(f"\nManually built JWT: {user_identifier_jwt[:50]}...")

# This JWT can be used with TokenExchangeRequest
from keycardai.oauth import TokenExchangeRequest

request = TokenExchangeRequest(
subject_token=user_identifier_jwt,
subject_token_type=TokenType.USER_IDENTIFIER,
resource="https://graph.microsoft.com"
)
print(f"Subject token type: {request.subject_token_type}")


if __name__ == "__main__":
# Note: This example won't run without a real Keycard zone
# It demonstrates the API usage pattern
print("Example usage pattern for user identifier token exchange\n")
print("To run this example, configure a real Keycard zone and credentials.\n")

# Uncomment to run:
# asyncio.run(main())
2 changes: 2 additions & 0 deletions packages/oauth/src/keycardai/oauth/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ async def exchange_token(
client_id: str | None = None,
client_assertion_type: str | None = None,
client_assertion: str | None = None,
user_identifier: str | None = None,
) -> TokenResponse: ...

async def exchange_token(self, request: TokenExchangeRequest | None = None, /, **token_exchange_args) -> TokenResponse:
Expand Down Expand Up @@ -1027,6 +1028,7 @@ def exchange_token(
actor_token_type: str | None = None,
timeout: float | None = None,
client_id: str | None = None,
user_identifier: str | None = None,
) -> TokenResponse: ...

def exchange_token(self, request: TokenExchangeRequest | None = None, /, **token_exchange_args) -> TokenResponse:
Expand Down
26 changes: 22 additions & 4 deletions packages/oauth/src/keycardai/oauth/operations/_token_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from ..http._context import HTTPContext
from ..http._wire import HttpRequest, HttpResponse
from ..types.models import TokenExchangeRequest, TokenResponse
from ..types.oauth import TokenType
from ..utils.jwt import build_user_identifier_token


def build_token_exchange_http_request(
Expand All @@ -29,7 +31,7 @@ def build_token_exchange_http_request(
payload = request.model_dump(
mode="json",
exclude_none=True,
exclude={"timeout"}
exclude={"timeout", "user_identifier"}
)

headers = {
Expand Down Expand Up @@ -144,13 +146,22 @@ def exchange_token(
TokenResponse with the exchanged token and metadata

Raises:
ValueError: If required parameters are missing
ValueError: If required parameters are missing or both user_identifier and subject_token are provided
OAuthHttpError: If token endpoint is unreachable or returns non-200
OAuthProtocolError: If response format is invalid or contains OAuth errors
NetworkError: If network request fails

Reference: https://datatracker.ietf.org/doc/html/rfc8693#section-2.1
"""
# Handle user_identifier parameter
if request.user_identifier is not None:
if request.subject_token is not None:
raise ValueError("Cannot provide both user_identifier and subject_token")

# Build unsigned JWT from user identifier
request.subject_token = build_user_identifier_token(request.user_identifier)
request.subject_token_type = TokenType.USER_IDENTIFIER

http_req = build_token_exchange_http_request(request, context)

# Execute HTTP request using transport
Expand All @@ -177,14 +188,21 @@ async def exchange_token_async(
TokenResponse with the exchanged token and metadata

Raises:
ValueError: If required parameters are missing
ValueError: If required parameters are missing or both user_identifier and subject_token are provided
OAuthHttpError: If token endpoint is unreachable or returns non-200
OAuthProtocolError: If response format is invalid or contains OAuth errors
NetworkError: If network request fails

Reference: https://datatracker.ietf.org/doc/html/rfc8693#section-2.1
"""
# Build HTTP request
# Handle user_identifier parameter
if request.user_identifier is not None:
if request.subject_token is not None:
raise ValueError("Cannot provide both user_identifier and subject_token")

# Build unsigned JWT from user identifier
request.subject_token = build_user_identifier_token(request.user_identifier)
request.subject_token_type = TokenType.USER_IDENTIFIER

http_req = build_token_exchange_http_request(request, context)

Expand Down
2 changes: 2 additions & 0 deletions packages/oauth/src/keycardai/oauth/types/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class TokenExchangeRequest(BaseModel):
client_assertion_type: str | None = None
client_assertion: str | None = None

user_identifier: str | None = Field(default=None, description="User identifier for user identifier delegated grant access. Mutually exclusive with subject_token.")


@dataclass
class TokenResponse:
Expand Down
3 changes: 3 additions & 0 deletions packages/oauth/src/keycardai/oauth/types/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class TokenType(str, Enum):
# RFC 9068 - JWT Profile for Access Tokens
JWT = "urn:ietf:params:oauth:token-type:jwt"

# Keycard extension - User identifier for user identifier delegated grant access
USER_IDENTIFIER = "urn:keycard:params:oauth:token-type:user-identifier"


class TokenTypeHint(str, Enum):
"""Token type hints for introspection and revocation as defined in RFCs.
Expand Down
44 changes: 44 additions & 0 deletions packages/oauth/src/keycardai/oauth/utils/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,50 @@
from ..types.models import ClientConfig


def build_user_identifier_token(identifier: str) -> str:
"""Build an unsigned JWT for user identifier token exchange.

Creates a JWT with header {"typ": "vnd.kc.user+jwt", "alg": "none"}
and payload {"sub": identifier}, with no signature.

The token is intentionally unsigned. It carries no security guarantees on
its own. Security relies on two other layers:

1. Client authentication: the calling application must authenticate via
client credentials (e.g. client_secret_basic), proving it is authorized
to request grants on behalf of users.
2. Prior user consent: the user must have previously authenticated
interactively and established a delegated grant for the requested
resource.

The unsigned JWT is a structured container for the user identifier, keeping
a consistent format across custom subject token types (see also
vnd.kc.process+jwt).

Args:
identifier: User identifier string (e.g. email, sub, oid value)

Returns:
Base64url-encoded JWT string in format: header.payload.

Example:
>>> token = build_user_identifier_token("user@example.com")
>>> print(token) # eyJ0eXAiOiJ2bmQua2MudXNlcitqd3QiLCJhbGciOiJub25lIn0.eyJzdWIiOiJ1c2VyQGV4YW1wbGUuY29tIn0.
"""
header = {"typ": "vnd.kc.user+jwt", "alg": "none"}
payload = {"sub": identifier}

# Encode header and payload as base64url (no padding)
header_json = json.dumps(header, separators=(",", ":"))
payload_json = json.dumps(payload, separators=(",", ":"))

header_b64 = base64.urlsafe_b64encode(header_json.encode()).decode().rstrip("=")
payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode().rstrip("=")

# Return header.payload. (trailing dot, empty signature)
return f"{header_b64}.{payload_b64}."


def _split_jwt_token(jwt_token: str) -> tuple[str, str, str]:
"""Split JWT token into its three parts.

Expand Down
31 changes: 23 additions & 8 deletions packages/oauth/src/keycardai/oauth/utils/pkce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
- Enables secure OAuth flows for mobile and SPA applications
"""

import base64
import hashlib
import secrets

from pydantic import BaseModel


Expand Down Expand Up @@ -86,8 +90,9 @@ def generate_code_verifier(length: int = 128) -> str:

Reference: https://datatracker.ietf.org/doc/html/rfc7636#section-4.1
"""
# Implementation placeholder
raise NotImplementedError("PKCE code verifier generation not yet implemented")
if length < 43 or length > 128:
raise ValueError("Code verifier length must be between 43 and 128 characters")
return secrets.token_urlsafe(96)[:length]

@staticmethod
def generate_code_challenge(verifier: str, method: str = "S256") -> str:
Expand All @@ -107,8 +112,13 @@ def generate_code_challenge(verifier: str, method: str = "S256") -> str:

Reference: https://datatracker.ietf.org/doc/html/rfc7636#section-4.2
"""
# Implementation placeholder
raise NotImplementedError("PKCE code challenge generation not yet implemented")
if method == PKCEMethods.S256:
digest = hashlib.sha256(verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
elif method == PKCEMethods.PLAIN:
return verifier
else:
raise ValueError(f"Unsupported PKCE method: {method}")

def generate_pkce_pair(
self, method: str = "S256", verifier_length: int = 128
Expand All @@ -127,8 +137,13 @@ def generate_pkce_pair(

Reference: https://datatracker.ietf.org/doc/html/rfc7636#section-4
"""
# Implementation placeholder
raise NotImplementedError("PKCE pair generation not yet implemented")
verifier = self.generate_code_verifier(verifier_length)
challenge = self.generate_code_challenge(verifier, method)
return PKCEChallenge(
code_verifier=verifier,
code_challenge=challenge,
code_challenge_method=method,
)

@staticmethod
def validate_pkce_pair(
Expand All @@ -148,5 +163,5 @@ def validate_pkce_pair(

Reference: https://datatracker.ietf.org/doc/html/rfc7636#section-4.6
"""
# Implementation placeholder
raise NotImplementedError("PKCE validation not yet implemented")
expected = PKCEGenerator.generate_code_challenge(code_verifier, method)
return secrets.compare_digest(expected, code_challenge)
Loading
Loading