This repository was archived by the owner on Feb 5, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathapp.py
More file actions
122 lines (102 loc) · 4.15 KB
/
app.py
File metadata and controls
122 lines (102 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from requests_oauthlib import OAuth2Session
from flask import Flask, request, redirect, render_template, session, url_for
import base64
import hashlib
import json
import jwt
import os
import pprint
app = Flask(__name__)
app.secret_key = os.urandom(24)
client_id = os.environ["OAUTH_APP_ID"]
client_secret = os.environ["OAUTH_SECRET"]
domain = os.environ.get("DOMAIN", "http://localhost:5000")
api_url = os.environ.get("API_URL", "https://api.planningcenteronline.com")
token_url = f"{api_url}/oauth/token"
def token_updater(token):
session["oauth_token"] = token
pco = OAuth2Session(client_id,
scope="openid people services",
redirect_uri=f"{domain}/auth/complete",
auto_refresh_kwargs={
"client_id": client_id,
"client_secret": client_secret,
},
auto_refresh_url=token_url,
token_updater=token_updater)
def fetch_user_info():
# if the id_token is not present then no openid scopes were requested
if "id_token" not in session.get("oauth_token", {}):
return None
userinfo = pco.get(f"{api_url}/oauth/userinfo").json()
return userinfo
def parse_id_token(id_token):
try:
# Basic JWT decoding without signature verification for demo purposes only.
#
# NOTE: In production, you should verify the JWT signature using the JWK from the
# JWKS endpoint ({API_URL}/oauth/discovery/keys).
#
# This verification will happen automatically if using an authentication
# package like `pyoidc` or `oidc-client`. Otherwise a package like `pyjwt`
# can be used to manually build the public key from the JWK hash to pass into
# `jwt.decode`.
id_token_claims = jwt.decode(id_token, options={"verify_signature": False})
return id_token_claims
except Exception as e:
print(f"Failed to decode ID token: {e}")
return None
def gen_code_verifier():
return base64.urlsafe_b64encode(os.urandom(64)).decode().rstrip("=")
def gen_code_challenge(verifier):
digest = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(digest).decode().rstrip("=")
@app.route("/")
def index():
if "oauth_token" in session:
return redirect("/people")
else:
return render_template("login.html")
@app.route("/auth")
def auth():
code_verifier = gen_code_verifier()
code_challenge = gen_code_challenge(code_verifier)
authorization_url, state = pco.authorization_url(f"{api_url}/oauth/authorize", prompt="select_account", code_challenge=code_challenge, code_challenge_method='S256')
session['code_verifier'] = code_verifier
session['oauth_state'] = state
return redirect(authorization_url)
@app.route("/auth/complete", methods=["GET"])
def callback():
token = pco.fetch_token(token_url, client_secret=client_secret,
authorization_response=request.url,
code_verifier=session.pop('code_verifier', ''))
session["oauth_token"] = token
if 'id_token' in token:
session["current_user"] = {
"name": fetch_user_info().get("name", ""),
"claims": parse_id_token(token['id_token'])
}
return redirect("/")
@app.route("/people")
def people():
if "oauth_token" not in session:
return redirect("/")
response = pco.get(f"{api_url}/people/v2/people").json()
formatted_response = pprint.PrettyPrinter(indent=2).pformat(response)
return render_template("people.html", logged_in=True, people=response['data'], formatted_response=formatted_response)
@app.route("/profile")
def profile():
if "oauth_token" not in session:
return redirect("/")
return render_template("profile.html",
logged_in=True,
userinfo_claims=fetch_user_info(),
id_token_claims=session.get("current_user", {}).get("claims", {}))
@app.route("/auth/logout")
def logout():
pco.post(f"{api_url}/oauth/revoke", data={"token": session["oauth_token"]})
if "oauth_token" in session:
del session["oauth_token"]
if "current_user" in session:
del session["current_user"]
return redirect("/")