Files
incidentops/app/api/deps.py

102 lines
3.1 KiB
Python
Raw Normal View History

2025-12-29 09:55:30 +00:00
"""Shared FastAPI dependencies (auth, RBAC, ownership)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
from uuid import UUID
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.core import exceptions as exc, security
from app.db import db
from app.repositories import OrgRepository, UserRepository
bearer_scheme = HTTPBearer(auto_error=False)
ROLE_RANKS: dict[str, int] = {"viewer": 0, "member": 1, "admin": 2}
@dataclass(slots=True)
class CurrentUser:
"""Authenticated user context derived from the access token."""
user_id: UUID
email: str
org_id: UUID
org_role: str
token: str
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
) -> CurrentUser:
"""Extract and validate the current user from the Authorization header."""
if credentials is None or credentials.scheme.lower() != "bearer":
raise exc.UnauthorizedError("Missing bearer token")
try:
payload = security.TokenPayload(security.decode_access_token(credentials.credentials))
except security.JWTError as err: # pragma: no cover - jose error types
raise exc.UnauthorizedError("Invalid access token") from err
async with db.connection() as conn:
user_repo = UserRepository(conn)
user = await user_repo.get_by_id(payload.user_id)
if user is None:
raise exc.UnauthorizedError("User not found")
org_repo = OrgRepository(conn)
membership = await org_repo.get_member(payload.user_id, payload.org_id)
if membership is None:
raise exc.ForbiddenError("Organization access denied")
return CurrentUser(
user_id=payload.user_id,
email=user["email"],
org_id=payload.org_id,
org_role=membership["role"],
token=credentials.credentials,
)
class RoleChecker:
"""Dependency that enforces a minimum organization role."""
def __init__(self, minimum_role: str) -> None:
if minimum_role not in ROLE_RANKS:
raise ValueError(f"Unknown role '{minimum_role}'")
self.minimum_role = minimum_role
def __call__(self, current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if ROLE_RANKS[current_user.org_role] < ROLE_RANKS[self.minimum_role]:
raise exc.ForbiddenError("Insufficient role for this operation")
return current_user
def require_role(min_role: str) -> Callable[[CurrentUser], CurrentUser]:
"""Factory that returns a dependency enforcing the specified role."""
return RoleChecker(min_role)
def ensure_org_access(resource_org_id: UUID, current_user: CurrentUser) -> None:
"""Verify that the resource belongs to the active org in the token."""
if resource_org_id != current_user.org_id:
raise exc.ForbiddenError("Resource does not belong to the active organization")
__all__ = [
"CurrentUser",
"ROLE_RANKS",
"RoleChecker",
"bearer_scheme",
"ensure_org_access",
"get_current_user",
"require_role",
]