from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from .. import config from ..schemas.auth import Token, User from ..services.security import create_access_token from ..services.user_service import UserService def get_auth_router(user_service: UserService): router = APIRouter(prefix="/auth", tags=["auth"]) verify_token = build_token_verifier(user_service) @router.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = user_service.authenticate(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES), ) return Token(access_token=access_token) @router.get("/me", response_model=User) async def me(current_user: User = Depends(get_current_user_dependency(verify_token))): return current_user @router.post("/change-password", response_model=User) async def change_password( payload: PasswordChangePayload, current_user: User = Depends(get_current_user_dependency(verify_token)), ): if payload.new_password != payload.confirm_password: raise HTTPException(status_code=400, detail="Passwords do not match") return user_service.set_password(current_user.username, payload.new_password) return router from pydantic import BaseModel class PasswordChangePayload(BaseModel): new_password: str confirm_password: str def build_token_verifier(user_service: UserService): from jose import JWTError, jwt def verify(token: str) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM] ) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = user_service.get_user(username) if user is None: raise credentials_exception return User(username=user.username) return verify def get_current_user_dependency(token_verifier): from fastapi import Depends from fastapi.security import OAuth2PasswordBearer oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: return token_verifier(token) return get_current_user