Files
incidentops/tests/api/test_auth.py

214 lines
6.4 KiB
Python
Raw Permalink Normal View History

2025-12-29 09:55:30 +00:00
"""Integration tests for FastAPI auth endpoints."""
from __future__ import annotations
from uuid import UUID
import asyncpg
import pytest
from httpx import AsyncClient
from app.core import security
from tests.api import helpers
pytestmark = pytest.mark.asyncio
API_PREFIX = "/v1/auth"
async def test_register_endpoint_persists_user_and_membership(
api_client: AsyncClient,
db_admin: asyncpg.Connection,
) -> None:
data = await helpers.register_user(
api_client,
email="api-register@example.com",
password="SuperSecret1!",
org_name="API Org",
)
assert "access_token" in data and "refresh_token" in data
token_payload = security.decode_access_token(data["access_token"])
assert token_payload["org_role"] == "admin"
stored_user = await db_admin.fetchrow("SELECT email FROM users WHERE email = $1", "api-register@example.com")
assert stored_user is not None
membership = await db_admin.fetchrow(
"SELECT role FROM org_members WHERE user_id = $1 AND org_id = $2",
UUID(token_payload["sub"]),
UUID(token_payload["org_id"]),
)
assert membership is not None and membership["role"] == "admin"
async def test_login_endpoint_rejects_bad_credentials(
api_client: AsyncClient,
) -> None:
register_payload = {
"email": "api-login@example.com",
"password": "CorrectHorse1!",
"org_name": "Login Org",
}
await helpers.register_user(api_client, **register_payload)
response = await api_client.post(
f"{API_PREFIX}/login",
json={"email": register_payload["email"], "password": "wrong"},
)
assert response.status_code == 401
async def test_refresh_endpoint_rotates_refresh_token(
api_client: AsyncClient,
db_admin: asyncpg.Connection,
) -> None:
register_payload = {
"email": "api-refresh@example.com",
"password": "RefreshPass1!",
"org_name": "Refresh Org",
}
initial = await helpers.register_user(api_client, **register_payload)
response = await api_client.post(
f"{API_PREFIX}/refresh",
json={"refresh_token": initial["refresh_token"]},
)
assert response.status_code == 200
data = response.json()
assert data["refresh_token"] != initial["refresh_token"]
old_hash = security.hash_token(initial["refresh_token"])
old_row = await db_admin.fetchrow(
"SELECT rotated_to FROM refresh_tokens WHERE token_hash = $1",
old_hash,
)
assert old_row is not None and old_row["rotated_to"] is not None
async def test_refresh_endpoint_detects_reuse(
api_client: AsyncClient,
db_admin: asyncpg.Connection,
) -> None:
tokens = await helpers.register_user(
api_client,
email="api-reuse@example.com",
password="ReusePass1!",
org_name="Reuse Org",
)
rotated = await api_client.post(
f"{API_PREFIX}/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert rotated.status_code == 200
reuse_response = await api_client.post(
f"{API_PREFIX}/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert reuse_response.status_code == 401
old_hash = security.hash_token(tokens["refresh_token"])
old_row = await db_admin.fetchrow(
"SELECT revoked_at FROM refresh_tokens WHERE token_hash = $1",
old_hash,
)
assert old_row is not None and old_row["revoked_at"] is not None
async def test_switch_org_changes_active_org(
api_client: AsyncClient,
db_admin: asyncpg.Connection,
) -> None:
email = "api-switch@example.com"
register_payload = {
"email": email,
"password": "SwitchPass1!",
"org_name": "Primary Org",
}
tokens = await helpers.register_user(api_client, **register_payload)
user_id_row = await db_admin.fetchrow("SELECT id FROM users WHERE email = $1", email)
assert user_id_row is not None
user_id = user_id_row["id"]
target_org_id = await helpers.create_org(db_admin, name="Secondary Org", slug="secondary-org")
await helpers.add_membership(db_admin, user_id=user_id, org_id=target_org_id, role="member")
response = await api_client.post(
f"{API_PREFIX}/switch-org",
json={"org_id": str(target_org_id), "refresh_token": tokens["refresh_token"]},
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert response.status_code == 200
data = response.json()
payload = security.decode_access_token(data["access_token"])
assert payload["org_id"] == str(target_org_id)
assert payload["org_role"] == "member"
new_hash = security.hash_token(data["refresh_token"])
new_row = await db_admin.fetchrow(
"SELECT active_org_id FROM refresh_tokens WHERE token_hash = $1",
new_hash,
)
assert new_row is not None and new_row["active_org_id"] == target_org_id
async def test_switch_org_forbidden_without_membership(
api_client: AsyncClient,
db_admin: asyncpg.Connection,
) -> None:
tokens = await helpers.register_user(
api_client,
email="api-switch-no-access@example.com",
password="SwitchBlock1!",
org_name="Primary",
)
foreign_org = await helpers.create_org(db_admin, name="Foreign Org", slug="foreign-org")
response = await api_client.post(
f"{API_PREFIX}/switch-org",
json={"org_id": str(foreign_org), "refresh_token": tokens["refresh_token"]},
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert response.status_code == 403
# ensure refresh token still valid after failed attempt
retry = await api_client.post(
f"{API_PREFIX}/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert retry.status_code == 200
async def test_logout_revokes_refresh_token(
api_client: AsyncClient,
) -> None:
register_payload = {
"email": "api-logout@example.com",
"password": "LogoutPass1!",
"org_name": "Logout Org",
}
tokens = await helpers.register_user(api_client, **register_payload)
logout_response = await api_client.post(
f"{API_PREFIX}/logout",
json={"refresh_token": tokens["refresh_token"]},
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert logout_response.status_code == 204
refresh_response = await api_client.post(
f"{API_PREFIX}/refresh",
json={"refresh_token": tokens["refresh_token"]},
)
assert refresh_response.status_code == 401