from typing import Annotated
from fastapi import APIRouter, Depends, status, HTTPException
from pydantic import BaseModel
from database import SessionLocal
from models import Users
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import jwt, JWTError
from datetime import timedelta, datetime, timezone
router = APIRouter(
prefix='/auth',
tags=['auth']
) # APIRouter 클래스로 router 객체 생성
SECRET_KEY = " SECRET_KEY" # JWT 토큰 생성을 위한 시크릿 키
ALGORITHM = "HS256" # JWT 토큰 생성을 위한 알고리즘
# bcrypt 알고리즘을 사용하는 bcrypt_context 객체 생성
bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/token") # OAuth2PasswordBearer 객체 생성
# 사용자 생성 요청을 검증하기 위한 클래스
class CreateUserRequest(BaseModel):
username: str
email: str
first_name: str
last_name: str
password: str
role: str
class Token(BaseModel):
access_token: str
token_type: str
def get_db():
db = (
SessionLocal()
) # 데이터베이스 세션을 생성 - 트랜잭션 단위로 데이터베이스에 접근
try:
yield db # db를 반환
finally:
db.close() # db를 닫음
# 의존성 주입(Dependency Injection) 기능을 활용하여 데이터베이스 세션 객체를 함수에 주입
db_dependency = Annotated[Session, Depends(get_db)]
def authenticate_user(username: str, password: str, db):
user = (
db.query(Users).filter(Users.username == username).first()
) # 사용자 이름으로 사용자 조회
if not user: # 사용자가 존재하지 않으면 False 반환
return False
if not bcrypt_context.verify(
password, user.hashed_password
): # 비밀번호가 일치하지 않으면 False 반환
return False
return user # 사용자 반환
###################################
# access token 생성
###################################
def create_access_token(username: str, user_id: int, role: str, expires_delta: timedelta):
encode = {"sub": username, "id": user_id, 'role': role} # JWT 토큰에 저장할 정보
expires = datetime.now(timezone.utc) + expires_delta # 토큰 만료 시간
encode.update({"exp": expires}) # 토큰에 만료 시간 추가
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM) # JWT 토큰 생성하여
###################################
# 현재 사용자 정보 가져오기
###################################
async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
try:
payload = jwt.decode(
token, SECRET_KEY, algorithms=[ALGORITHM]
) # JWT 토큰 디코딩
username: str = payload.get("sub") # 사용자 이름 추출
user_id: int = payload.get("id") # 사용자 ID 추출
user_role: str = payload.get("role") # 사용자 역할 추출
if (
username is None or user_id is None
): # 사용자 이름 또는 ID가 없으면 False 반환
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효한 사용자가 아닙니다.",
)
return {"username": username, "id": user_id, 'user_role': user_role} # 사용자 이름과 ID 반환
except JWTError: # JWT 토큰이 유효하지 않으면 False 반환
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="유효한 사용자가 아닙니다."
)
#################################
# 엔드 포인트
#################################
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency, create_user_request: CreateUserRequest):
create_user_model = Users(
email=create_user_request.email,
username=create_user_request.username,
first_name=create_user_request.first_name,
last_name=create_user_request.last_name,
role=create_user_request.role,
hashed_password=bcrypt_context.hash(
create_user_request.password
), # 비밀번호를 해시하여 저장
is_active=True,
) # 사용자 생성
db.add(create_user_model)
db.commit()
@router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency
):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="유요한 사용자가 아닙니다."
)
token = create_access_token(
user.username, user.id, user.role, timedelta(minutes=20)
) # 20분 동안 유효한 토큰 생성
return {"access_token": token, "token_type": "bearer"} # 토큰 반환