71 lines
1.9 KiB
Python
Raw Normal View History

import os
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from psycopg_pool import AsyncConnectionPool
from neo_neo_todo.models.token import TokenData
from src.neo_neo_todo.models.member import select_member_by_email
from src.neo_neo_todo.utils.database import get_pool
router = APIRouter(
prefix="/members",
tags=["members"],
)
try:
SECRETKEY = os.environ["TODO_SECRET_KEY"]
ALGORITHM = os.environ["ALGORITHM"]
except KeyError:
raise KeyError("Can't find secret key or algorithm")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_member(
token: Annotated[str, Depends(oauth2_scheme)],
db_pool: AsyncConnectionPool = Depends(get_pool),
):
"""
Helper function for the /me API route below
Mainly used for check authentication status
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload: dict = jwt.decode(token, SECRETKEY, algorithms=[ALGORITHM])
username = payload.get(
"sub"
) # JWT subject is where we put the user's identification
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
member = await select_member_by_email(db_pool, token_data.username) # type: ignore
if not member:
raise credentials_exception
return {"email": member.email, "email_verified": member.email_verified}
@router.get("/me")
async def read_users_me(current_user: Annotated[dict, Depends(get_current_member)]):
"""
Dependency inject the get_current_member function
Use this API route to check for authentication
"""
return current_user