feat: Add OAuth2 server and client implementation with PKCE support

- Implemented OAuth2 server with client registration, authorization, and token endpoints.
- Created HTML templates for client authorization, client creation, and client editing.
- Developed an OAuth2 client application using Hono.js and Bun, supporting authorization code grant flow.
- Integrated PKCE (Proof Key for Code Exchange) for enhanced security during authorization.
- Added session management using cookies for user authentication.
- Included detailed README documentation for setup and usage instructions.
This commit is contained in:
암냥 2025-07-13 17:08:55 +09:00
commit 7cd05b5c6a
29 changed files with 1962 additions and 0 deletions

View file

@ -0,0 +1,36 @@
import os
from flask import Flask
from .models import db
from .oauth2 import config_oauth
from .routes import bp
def create_app(config=None):
app = Flask(__name__)
# load default configuration
app.config.from_object('website.settings')
# load environment configuration
if 'WEBSITE_CONF' in os.environ:
app.config.from_envvar('WEBSITE_CONF')
# load app specified configuration
if config is not None:
if isinstance(config, dict):
app.config.update(config)
elif config.endswith('.py'):
app.config.from_pyfile(config)
setup_app(app)
return app
def setup_app(app):
db.init_app(app)
# Create tables if they do not exist already
with app.app_context():
db.create_all()
config_oauth(app)
app.register_blueprint(bp, url_prefix='')

View file

@ -0,0 +1,56 @@
import time
from flask_sqlalchemy import SQLAlchemy
from authlib.integrations.sqla_oauth2 import (
OAuth2ClientMixin,
OAuth2AuthorizationCodeMixin,
OAuth2TokenMixin,
)
db = SQLAlchemy()
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(40), unique=True)
def __str__(self):
return self.username
def get_user_id(self):
return self.id
def check_password(self, password):
return password == 'valid'
class OAuth2Client(db.Model, OAuth2ClientMixin):
__tablename__ = 'oauth2_client'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
class OAuth2AuthorizationCode(db.Model, OAuth2AuthorizationCodeMixin):
__tablename__ = 'oauth2_code'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
class OAuth2Token(db.Model, OAuth2TokenMixin):
__tablename__ = 'oauth2_token'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
def is_refresh_token_active(self):
if self.revoked:
return False
expires_at = self.issued_at + self.expires_in * 2
return expires_at >= time.time()

View file

@ -0,0 +1,101 @@
from authlib.integrations.flask_oauth2 import (
AuthorizationServer,
ResourceProtector,
)
from authlib.integrations.sqla_oauth2 import (
create_query_client_func,
create_save_token_func,
create_revocation_endpoint,
create_bearer_token_validator,
)
from authlib.oauth2.rfc6749 import grants
from authlib.oauth2.rfc7636 import CodeChallenge
from .models import db, User
from .models import OAuth2Client, OAuth2AuthorizationCode, OAuth2Token
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic',
'client_secret_post',
'none',
]
def save_authorization_code(self, code, request):
code_challenge = request.data.get('code_challenge')
code_challenge_method = request.data.get('code_challenge_method')
auth_code = OAuth2AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=request.user.id,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
db.session.add(auth_code)
db.session.commit()
return auth_code
def query_authorization_code(self, code, client):
auth_code = OAuth2AuthorizationCode.query.filter_by(
code=code, client_id=client.client_id).first()
if auth_code and not auth_code.is_expired():
return auth_code
def delete_authorization_code(self, authorization_code):
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(self, authorization_code):
return User.query.get(authorization_code.user_id)
class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant):
def authenticate_user(self, username, password):
user = User.query.filter_by(username=username).first()
if user is not None and user.check_password(password):
return user
class RefreshTokenGrant(grants.RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token):
token = OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
if token and token.is_refresh_token_active():
return token
def authenticate_user(self, credential):
return User.query.get(credential.user_id)
def revoke_old_credential(self, credential):
credential.revoked = True
db.session.add(credential)
db.session.commit()
query_client = create_query_client_func(db.session, OAuth2Client)
save_token = create_save_token_func(db.session, OAuth2Token)
authorization = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)
require_oauth = ResourceProtector()
def config_oauth(app):
authorization.init_app(app)
# support all grants
authorization.register_grant(grants.ImplicitGrant)
authorization.register_grant(grants.ClientCredentialsGrant)
authorization.register_grant(AuthorizationCodeGrant, [CodeChallenge(required=True)])
authorization.register_grant(PasswordGrant)
authorization.register_grant(RefreshTokenGrant)
# support revocation
revocation_cls = create_revocation_endpoint(db.session, OAuth2Token)
authorization.register_endpoint(revocation_cls)
# protect resource
bearer_cls = create_bearer_token_validator(db.session, OAuth2Token)
require_oauth.register_token_validator(bearer_cls())

View file

@ -0,0 +1,180 @@
import time
from flask import Blueprint, request, session, url_for
from flask import render_template, redirect, jsonify
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from .models import db, User, OAuth2Client
from .oauth2 import authorization, require_oauth
bp = Blueprint('home', __name__)
def current_user():
if 'id' in session:
uid = session['id']
return User.query.get(uid)
return None
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
@bp.route('/', methods=('GET', 'POST'))
def home():
if request.method == 'POST':
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username)
db.session.add(user)
db.session.commit()
session['id'] = user.id
# if user is not just to log in, but need to head back to the auth page, then go for it
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect('/')
user = current_user()
if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
else:
clients = []
return render_template('home.html', user=user, clients=clients)
@bp.route('/logout')
def logout():
del session['id']
return redirect('/')
@bp.route('/create_client', methods=('GET', 'POST'))
def create_client():
user = current_user()
if not user:
return redirect('/')
if request.method == 'GET':
return render_template('create_client.html')
client_id = gen_salt(24)
client_id_issued_at = int(time.time())
client = OAuth2Client(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
user_id=user.id,
)
form = request.form
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
if form['token_endpoint_auth_method'] == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
db.session.add(client)
db.session.commit()
return redirect('/')
@bp.route('/edit_client/<int:client_id>', methods=('GET', 'POST'))
def edit_client(client_id):
user = current_user()
if not user:
return redirect('/')
client = OAuth2Client.query.filter_by(id=client_id, user_id=user.id).first()
if not client:
return redirect('/')
if request.method == 'GET':
return render_template('edit_client.html', client=client)
# POST 요청 처리 - 클라이언트 정보 업데이트
form = request.form
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
# 클라이언트 시크릿 재생성이 요청된 경우
if 'regenerate_secret' in form and form['regenerate_secret'] == 'on':
if form['token_endpoint_auth_method'] == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
db.session.commit()
return redirect('/')
@bp.route('/delete_client/<int:client_id>', methods=['POST'])
def delete_client(client_id):
user = current_user()
if not user:
return redirect('/')
client = OAuth2Client.query.filter_by(id=client_id, user_id=user.id).first()
if client:
db.session.delete(client)
db.session.commit()
return redirect('/')
@bp.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
user = current_user()
# if user log status is not true (Auth server), then to log it in
if not user:
return redirect(url_for('home.home', next=request.url))
if request.method == 'GET':
try:
grant = authorization.get_consent_grant(end_user=user)
except OAuth2Error as error:
return error.error
return render_template('authorize.html', user=user, grant=grant)
if not user and 'username' in request.form:
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if request.form['confirm']:
grant_user = user
else:
grant_user = None
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route('/oauth/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@bp.route('/oauth/revoke', methods=['POST'])
def revoke_token():
return authorization.create_endpoint_response('revocation')
@bp.route('/api/me')
@require_oauth('profile')
def api_me():
user = current_token.user
return jsonify(id=user.id, username=user.username)

View file

@ -0,0 +1,22 @@
<p>The application <strong>{{grant.client.client_name}}</strong> is requesting:
<strong>{{ grant.request.scope }}</strong>
</p>
<p>
from You - a.k.a. <strong>{{ user.username }}</strong>
</p>
<form action="" method="post">
<label>
<input type="checkbox" name="confirm">
<span>Consent?</span>
</label>
{% if not user %}
<p>You haven't logged in. Log in with:</p>
<div>
<input type="text" name="username">
</div>
{% endif %}
<br>
<button>Submit</button>
</form>

View file

@ -0,0 +1,42 @@
<style>
label, label > span { display: block; }
label { margin: 15px 0; }
</style>
<a href="/">Home</a>
<form action="" method="post">
<label>
<span>Client Name</span>
<input type="text" name="client_name">
</label>
<label>
<span>Client URI</span>
<input type="url" name="client_uri">
</label>
<label>
<span>Allowed Scope</span>
<input type="text" name="scope">
</label>
<label>
<span>Redirect URIs</span>
<textarea name="redirect_uri" cols="30" rows="10"></textarea>
</label>
<label>
<span>Allowed Grant Types</span>
<textarea name="grant_type" cols="30" rows="10"></textarea>
</label>
<label>
<span>Allowed Response Types</span>
<textarea name="response_type" cols="30" rows="10"></textarea>
</label>
<label>
<span>Token Endpoint Auth Method</span>
<select name="token_endpoint_auth_method">
<option value="client_secret_basic">client_secret_basic</option>
<option value="client_secret_post">client_secret_post</option>
<option value="none">none</option>
</select>
</label>
<button>Submit</button>
</form>

View file

@ -0,0 +1,120 @@
<!DOCTYPE html>
<html>
<head>
<title>OAuth Client 수정</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.form-group { margin-bottom: 15px; }
label { display: block; margin-bottom: 5px; font-weight: bold; }
input[type="text"], input[type="url"], textarea, select {
width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;
}
textarea { height: 60px; resize: vertical; }
button {
background-color: #007bff; color: white; padding: 10px 20px;
border: none; border-radius: 4px; cursor: pointer; margin-right: 10px;
}
button:hover { background-color: #0056b3; }
.danger { background-color: #dc3545; }
.danger:hover { background-color: #c82333; }
.client-info {
background-color: #f8f9fa; padding: 15px; margin-bottom: 20px;
border-radius: 4px; border: 1px solid #dee2e6;
}
.checkbox-group { display: flex; align-items: center; }
.checkbox-group input[type="checkbox"] { width: auto; margin-right: 10px; }
</style>
</head>
<body>
<h1>OAuth Client 수정</h1>
<div class="client-info">
<h3>현재 클라이언트 정보</h3>
<p><strong>Client ID:</strong> {{ client.client_id }}</p>
<p><strong>Client Secret:</strong>
{% if client.client_secret %}
{{ client.client_secret[:8] }}...
{% else %}
(없음 - Public Client)
{% endif %}
</p>
<p><strong>생성일:</strong> {{ client.client_id_issued_at }}</p>
</div>
<form method="post">
<div class="form-group">
<label for="client_name">클라이언트 이름:</label>
<input type="text" id="client_name" name="client_name"
value="{{ client.client_metadata.get('client_name', '') }}" required>
</div>
<div class="form-group">
<label for="client_uri">클라이언트 URI:</label>
<input type="url" id="client_uri" name="client_uri"
value="{{ client.client_metadata.get('client_uri', '') }}">
</div>
<div class="form-group">
<label for="redirect_uri">Redirect URIs (각 줄에 하나씩):</label>
<textarea id="redirect_uri" name="redirect_uri" required>{% for uri in client.client_metadata.get('redirect_uris', []) %}{{ uri }}
{% endfor %}</textarea>
</div>
<div class="form-group">
<label for="scope">Scope:</label>
<input type="text" id="scope" name="scope"
value="{{ client.client_metadata.get('scope', '') }}">
</div>
<div class="form-group">
<label for="grant_type">Grant Types (각 줄에 하나씩):</label>
<textarea id="grant_type" name="grant_type" required>{% for grant in client.client_metadata.get('grant_types', []) %}{{ grant }}
{% endfor %}</textarea>
</div>
<div class="form-group">
<label for="response_type">Response Types (각 줄에 하나씩):</label>
<textarea id="response_type" name="response_type" required>{% for response in client.client_metadata.get('response_types', []) %}{{ response }}
{% endfor %}</textarea>
</div>
<div class="form-group">
<label for="token_endpoint_auth_method">Token Endpoint Auth Method:</label>
<select id="token_endpoint_auth_method" name="token_endpoint_auth_method" required>
<option value="client_secret_basic"
{% if client.client_metadata.get('token_endpoint_auth_method') == 'client_secret_basic' %}selected{% endif %}>
client_secret_basic
</option>
<option value="client_secret_post"
{% if client.client_metadata.get('token_endpoint_auth_method') == 'client_secret_post' %}selected{% endif %}>
client_secret_post
</option>
<option value="none"
{% if client.client_metadata.get('token_endpoint_auth_method') == 'none' %}selected{% endif %}>
none
</option>
</select>
</div>
<div class="form-group">
<div class="checkbox-group">
<input type="checkbox" id="regenerate_secret" name="regenerate_secret">
<label for="regenerate_secret">Client Secret 재생성 (기존 토큰들이 무효화됩니다)</label>
</div>
</div>
<button type="submit">클라이언트 수정</button>
<a href="/" style="text-decoration: none;">
<button type="button">취소</button>
</a>
</form>
<hr style="margin: 30px 0;">
<h3>위험 영역</h3>
<form method="post" action="{{ url_for('.delete_client', client_id=client.id) }}"
onsubmit="return confirm('정말로 이 클라이언트를 삭제하시겠습니까? 이 작업은 되돌릴 수 없습니다.')">
<button type="submit" class="danger">클라이언트 삭제</button>
</form>
</body>
</html>

View file

@ -0,0 +1,82 @@
{% if user %}
<style>
pre{white-space:wrap}
.client-container {
border: 1px solid #ddd;
padding: 15px;
margin-bottom: 20px;
border-radius: 5px;
background-color: #f9f9f9;
}
.client-actions {
margin-top: 10px;
padding-top: 10px;
border-top: 1px solid #ccc;
}
.client-actions a {
margin-right: 10px;
padding: 5px 10px;
text-decoration: none;
border-radius: 3px;
}
.edit-btn {
background-color: #007bff;
color: white;
}
.edit-btn:hover {
background-color: #0056b3;
}
.delete-btn {
background-color: #dc3545;
color: white;
}
.delete-btn:hover {
background-color: #c82333;
}
.create-btn {
background-color: #28a745;
color: white;
padding: 10px 20px;
text-decoration: none;
border-radius: 3px;
display: inline-block;
margin-top: 20px;
}
.create-btn:hover {
background-color: #218838;
}
</style>
<div>Logged in as <strong>{{user}}</strong> (<a href="{{ url_for('.logout') }}">Log Out</a>)</div>
{% for client in clients %}
<div class="client-container">
<pre>
<strong>Client Info</strong>
{%- for key in client.client_info %}
<strong>{{ key }}: </strong>{{ client.client_info[key] }}
{%- endfor %}
<strong>Client Metadata</strong>
{%- for key in client.client_metadata %}
<strong>{{ key }}: </strong>{{ client.client_metadata[key] }}
{%- endfor %}
</pre>
<div class="client-actions">
<a href="{{ url_for('.edit_client', client_id=client.id) }}" class="edit-btn">수정</a>
<form style="display: inline;" method="POST"
action="{{ url_for('.delete_client', client_id=client.id) }}"
onsubmit="return confirm('정말로 이 클라이언트를 삭제하시겠습니까?')">
<button type="submit" class="delete-btn"
style="border: none; cursor: pointer; padding: 5px 10px; border-radius: 3px;">삭제</button>
</form>
</div>
</div>
{% endfor %}
<a href="{{ url_for('.create_client') }}" class="create-btn">새 클라이언트 생성</a>
{% else %}
<form action="" method="post">
<input type="text" name="username" placeholder="username">
<button type="submit">Login / Signup</button>
</form>
{% endif %}