-
jwt-extenedFlask 2024. 3. 5. 17:08
액세스 토큰, 리프레시 토큰이 생긴 것을 볼 수 있다.
구성 app.py
from flask import Flask, render_template from routes.user import user_bp from jwt_utils import configure_jwt # JWT 설정 함수를 임포트합니다. app = Flask(__name__) configure_jwt(app) # JWT 관련 추가 설정을 적용합니다. app.register_blueprint(user_bp, url_prefix='/user') @app.route('/') def index(): return render_template('index.html') if __name__ == '__main__': app.run(debug=True)
blocklist.py
BLOCKLIST = set() def add_to_blocklist(jti): BLOCKLIST.add(jti) def remove_from_blocklist(jti): BLOCKLIST.discard(jti)
jwt_utils.py
# JWT 관련 유틸리티 함수와 설정을 정의 from flask_jwt_extended import JWTManager from blocklist import BLOCKLIST from flask import jsonify jwt = JWTManager() def configure_jwt(app): # JWT에 사용할 시크릿키 설정(JWT를 서명하고 검증하는데 사용) app.config["JWT_SECRET_KEY"] = "your-secret-key" jwt.init_app(app) # token expired time settings # 60분 단위로 설정한 만료시간을 초단위로 변환 freshness_in_minutes = 1 app.config["JWT_ACCESS_TOKEN_EXPIRES"] = freshness_in_minutes * 60 # 1 hour jwt.init_app(app) # claim: 청구하다 # 추가적인 정보를 토큰에 넣고 싶을 때 사용 @jwt.additional_claims_loader # @데코레이터 def add_claims_to_jwt(identity): # ID가 1인 경우는 관리자임을 나타내는 id_admin 클레임 추가 if identity == 1: return {"is_admin": True} return {"is_admin": False} # 토큰이 블록리스트에 있는지 확인하는 함수 # 블록리스트에 있으면 해당 토큰이 유효하지 않다고 판단 @jwt.token_in_blocklist_loader def check_if_token_in_blocklist(jwt_header, jwt_payload): # jti=jwt id return jwt_payload["jti"] in BLOCKLIST # 만료된 토큰이 사용되었을 때 실행되는 함수 @jwt.expired_token_loader def expired_token_callback(jwt_header, jwt_payload): return jsonify({"msg": "Token expired", "error": "token_expired"}), 401 # 유효하지 않은 토큰이 사용되었을 때 실행되는 함수 # 토큰의 서명이나 구조가 유효하지 않을 때 실행됩니다. 주로 토큰 자체의 문제로 발생하는 경우에 해당합니다. @jwt.invalid_token_loader def invalid_token_callback(error): return ( jsonify( {"message": "Invalid token", "error": "invalid_token"} ), 401, ) # 해당 토큰으로 접근 권한이 없는 경우 @jwt.unauthorized_loader def missing_token_callback(error): return ( jsonify( { "description": "Access token required", "error": "access_token_required", } ), 401, ) # fresh한 토큰이 필요한데 fresh하지 않은 토큰이 사용되었을 때 실행되는 함수를 정의합니다. # 해당 응답을 반환하여 fresh한 토큰이 필요하다는 메시지를 전달 # JWT_ACCESS_TOKEN_EXPIRES으로 토큰 만료 시간 조정 @jwt.needs_fresh_token_loader def token_not_fresh_callback(jwt_header, jwt_payload): return ( jsonify( {"description": "Token is not fresh.", "error": "fresh_token_required"} ), 401, ) # 토큰이 폐기되었을 때 실행되는 함수를 @jwt.revoked_token_loader def revoked_token_callback(jwt_header, jwt_payload): return ( jsonify( {"description": "Token has been revoked.", "error": "token_revoked"} ), 401, )
models/user.py
class User: def __init__(self, id, username, password): self.id = id self.username = username self.password = password
routes/users.py
from flask import Blueprint, jsonify, request, render_template from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity from models.user import User # Blueprint 객체 생성 user_bp = Blueprint('user', __name__) # 임시 사용자 데이터 users = { 'user1': User('1', 'user1', 'pw123'), 'user2': User('2', 'user2', 'pw123') } # 로그인 기능 구현 @user_bp.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.json.get('username', None) password = request.json.get('password', None) user = users.get(username) # user 정보가 유효하면 엑세스 토큰, 리프레시 토큰을 생성하고 json 형태로 반환 if user and user.password == password: access_token = create_access_token(identity=username) refresh_token = create_refresh_token(identity=username) return jsonify(access_token=access_token, refresh_token=refresh_token) else: return jsonify({"msg": "Bad username or password"}), 401 else: return render_template('login.html') @user_bp.route('/protected', methods=['GET']) @jwt_required() # 엔드포인트에 대한 접근 권한 확인 -> 먼말임? def protected(): current_user = get_jwt_identity() return jsonify(logged_in_as=current_user), 200 @user_bp.route('/protected_page') # 이 엔드포인트에 대한 get요청을 처리 def protected_page(): return render_template('protected.html') from flask_jwt_extended import get_jwt from blocklist import add_to_blocklist # 블록리스트 관리 모듈 임포트 @user_bp.route('/logout', methods=['POST']) @jwt_required() def logout(): jti = get_jwt()["jti"] add_to_blocklist(jti) # jti를 블록리스트에 추가 return jsonify({"msg": "Successfully logged out"}), 200
templates/index.html
<!DOCTYPE html> <html> <head> <title>Home Page</title> </head> <body> <h1>Welcome to the Home Page</h1> <a href="/user/login">Login</a> | <a href="/user/logout">Logout</a> <a href="/user/protected">Protected Page</a> </body> </html>
templates/login.html
<!DOCTYPE html> <html> <head> <title>Login</title> <script> function handleLogin(event) { event.preventDefault(); // 폼의 기본 제출 동작을 방지 // 폼 데이터를 JSON으로 변환 var username = document.getElementById("username").value; var password = document.getElementById("password").value; var data = { username: username, password: password }; // fetch를 사용하여 서버에 POST 요청 보내기 fetch("/user/login", { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(data), }) .then((response) => response.json()) .then((data) => { console.log("Success:", data); // 로그인 성공 후 'protected' 페이지로 리다이렉트 localStorage.setItem("access_token", data.access_token); localStorage.setItem("refresh_token", data.refresh_token); window.location.href = "/user/protected_page"; }) .catch((error) => { console.error("Error:", error); }); } </script> </head> <body> <h1>Login</h1> <form onsubmit="handleLogin(event)"> Username: <input type="text" id="username" name="username" /><br /> Password: <input type="password" id="password" name="password" /><br /> <input type="submit" value="Login" /> </form> </body> </html>
templates/protected.html
<!DOCTYPE html> <html> <head> <title>Protected Page</title> <script> document.addEventListener("DOMContentLoaded", function () { const token = localStorage.getItem("access_token"); console.log("token", token); if (token) { fetch("/user/protected", { headers: { Authorization: `Bearer ${token}`, }, }) .then((response) => { if (response.ok) { return response.json(); } else { throw new Error("Access Denied"); } }) .then((data) => { document.getElementById("content").innerHTML = "Welcome, " + data.logged_in_as; }) .catch((error) => { document.getElementById("content").innerHTML = "Access Denied"; console.error("Error:", error); }); } else { document.getElementById("content").innerHTML = "No token found, please login."; } }); </script> </head> <body> <h1>This is a Protected Page</h1> <div id="content"> <p>Loading...</p> </div> <button onclick="logout()">Logout</button> </body> <script> // 로그아웃 함수 function logout() { // 로컬 스토리지에서 JWT 토큰 제거 localStorage.removeItem("access_token"); localStorage.removeItem("refresh_token"); // 로그인 페이지 또는 홈페이지로 리다이렉트 window.location.href = "/"; } </script> </html>