220 lines
8.0 KiB
Python
220 lines
8.0 KiB
Python
|
|
"""Unit tests covering OrgService flows."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from contextlib import asynccontextmanager
|
||
|
|
from uuid import UUID, uuid4
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from app.api.deps import CurrentUser
|
||
|
|
from app.core import exceptions as exc, security
|
||
|
|
from app.db import Database
|
||
|
|
from app.repositories import NotificationRepository, OrgRepository, ServiceRepository
|
||
|
|
from app.schemas.org import NotificationTargetCreate, ServiceCreate
|
||
|
|
from app.services.org import OrgService
|
||
|
|
|
||
|
|
|
||
|
|
pytestmark = pytest.mark.asyncio
|
||
|
|
|
||
|
|
|
||
|
|
class _SingleConnectionDatabase(Database):
|
||
|
|
"""Database stub that reuses a single asyncpg connection."""
|
||
|
|
|
||
|
|
def __init__(self, conn) -> None: # type: ignore[override]
|
||
|
|
self._conn = conn
|
||
|
|
|
||
|
|
@asynccontextmanager
|
||
|
|
async def connection(self): # type: ignore[override]
|
||
|
|
yield self._conn
|
||
|
|
|
||
|
|
@asynccontextmanager
|
||
|
|
async def transaction(self): # type: ignore[override]
|
||
|
|
tr = self._conn.transaction()
|
||
|
|
await tr.start()
|
||
|
|
try:
|
||
|
|
yield self._conn
|
||
|
|
except Exception:
|
||
|
|
await tr.rollback()
|
||
|
|
raise
|
||
|
|
else:
|
||
|
|
await tr.commit()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
async def org_service(db_conn):
|
||
|
|
"""OrgService bound to the per-test database connection."""
|
||
|
|
|
||
|
|
return OrgService(database=_SingleConnectionDatabase(db_conn))
|
||
|
|
|
||
|
|
|
||
|
|
async def _create_user(conn, email: str) -> UUID:
|
||
|
|
user_id = uuid4()
|
||
|
|
await conn.execute(
|
||
|
|
"INSERT INTO users (id, email, password_hash) VALUES ($1, $2, $3)",
|
||
|
|
user_id,
|
||
|
|
email,
|
||
|
|
security.hash_password("Password123!"),
|
||
|
|
)
|
||
|
|
return user_id
|
||
|
|
|
||
|
|
|
||
|
|
async def _create_org(conn, name: str, slug: str | None = None) -> UUID:
|
||
|
|
org_id = uuid4()
|
||
|
|
org_repo = OrgRepository(conn)
|
||
|
|
await org_repo.create(org_id, name, slug or name.lower().replace(" ", "-"))
|
||
|
|
return org_id
|
||
|
|
|
||
|
|
|
||
|
|
async def _add_membership(conn, user_id: UUID, org_id: UUID, role: str) -> None:
|
||
|
|
await conn.execute(
|
||
|
|
"INSERT INTO org_members (id, user_id, org_id, role) VALUES ($1, $2, $3, $4)",
|
||
|
|
uuid4(),
|
||
|
|
user_id,
|
||
|
|
org_id,
|
||
|
|
role,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def _create_service(conn, org_id: UUID, name: str, slug: str) -> None:
|
||
|
|
repo = ServiceRepository(conn)
|
||
|
|
await repo.create(uuid4(), org_id, name, slug)
|
||
|
|
|
||
|
|
|
||
|
|
async def _create_notification_target(conn, org_id: UUID, name: str) -> None:
|
||
|
|
repo = NotificationRepository(conn)
|
||
|
|
await repo.create_target(uuid4(), org_id, name, "webhook", "https://example.com/hook")
|
||
|
|
|
||
|
|
|
||
|
|
def _make_user(user_id: UUID, email: str, org_id: UUID, role: str) -> CurrentUser:
|
||
|
|
return CurrentUser(user_id=user_id, email=email, org_id=org_id, org_role=role, token="token")
|
||
|
|
|
||
|
|
|
||
|
|
async def test_get_current_org_returns_summary(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Current Org", slug="current-org")
|
||
|
|
user_id = await _create_user(db_conn, "owner@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "admin")
|
||
|
|
|
||
|
|
current_user = _make_user(user_id, "owner@example.com", org_id, "admin")
|
||
|
|
|
||
|
|
result = await org_service.get_current_org(current_user)
|
||
|
|
|
||
|
|
assert result.id == org_id
|
||
|
|
assert result.slug == "current-org"
|
||
|
|
|
||
|
|
|
||
|
|
async def test_get_current_org_raises_not_found(org_service, db_conn):
|
||
|
|
user_id = await _create_user(db_conn, "ghost@example.com")
|
||
|
|
missing_org = uuid4()
|
||
|
|
current_user = _make_user(user_id, "ghost@example.com", missing_org, "admin")
|
||
|
|
|
||
|
|
with pytest.raises(exc.NotFoundError):
|
||
|
|
await org_service.get_current_org(current_user)
|
||
|
|
|
||
|
|
|
||
|
|
async def test_get_members_returns_org_members(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Members Org", slug="members-org")
|
||
|
|
admin_id = await _create_user(db_conn, "admin@example.com")
|
||
|
|
member_id = await _create_user(db_conn, "member@example.com")
|
||
|
|
await _add_membership(db_conn, admin_id, org_id, "admin")
|
||
|
|
await _add_membership(db_conn, member_id, org_id, "member")
|
||
|
|
|
||
|
|
current_user = _make_user(admin_id, "admin@example.com", org_id, "admin")
|
||
|
|
members = await org_service.get_members(current_user)
|
||
|
|
|
||
|
|
emails = {m.email for m in members}
|
||
|
|
assert emails == {"admin@example.com", "member@example.com"}
|
||
|
|
|
||
|
|
|
||
|
|
async def test_create_service_rejects_duplicate_slug(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Dup Org", slug="dup-org")
|
||
|
|
user_id = await _create_user(db_conn, "service@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "member")
|
||
|
|
await _create_service(db_conn, org_id, "Existing", "duplicate")
|
||
|
|
|
||
|
|
current_user = _make_user(user_id, "service@example.com", org_id, "member")
|
||
|
|
with pytest.raises(exc.ConflictError):
|
||
|
|
await org_service.create_service(current_user, ServiceCreate(name="New", slug="duplicate"))
|
||
|
|
|
||
|
|
|
||
|
|
async def test_create_service_persists_service(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Service Org", slug="service-org")
|
||
|
|
user_id = await _create_user(db_conn, "creator@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "member")
|
||
|
|
current_user = _make_user(user_id, "creator@example.com", org_id, "member")
|
||
|
|
|
||
|
|
result = await org_service.create_service(current_user, ServiceCreate(name="API", slug="api"))
|
||
|
|
|
||
|
|
assert result.name == "API"
|
||
|
|
row = await db_conn.fetchrow(
|
||
|
|
"SELECT name, org_id FROM services WHERE id = $1",
|
||
|
|
result.id,
|
||
|
|
)
|
||
|
|
assert row is not None and row["org_id"] == org_id
|
||
|
|
|
||
|
|
|
||
|
|
async def test_get_services_returns_only_org_services(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Own Org", slug="own-org")
|
||
|
|
other_org = await _create_org(db_conn, "Other Org", slug="other-org")
|
||
|
|
user_id = await _create_user(db_conn, "viewer@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "viewer")
|
||
|
|
|
||
|
|
await _create_service(db_conn, org_id, "Owned", "owned")
|
||
|
|
await _create_service(db_conn, other_org, "Foreign", "foreign")
|
||
|
|
|
||
|
|
current_user = _make_user(user_id, "viewer@example.com", org_id, "viewer")
|
||
|
|
services = await org_service.get_services(current_user)
|
||
|
|
|
||
|
|
assert len(services) == 1
|
||
|
|
assert services[0].name == "Owned"
|
||
|
|
|
||
|
|
|
||
|
|
async def test_create_notification_target_requires_webhook_url(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Webhook Org", slug="webhook-org")
|
||
|
|
user_id = await _create_user(db_conn, "admin-webhook@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "admin")
|
||
|
|
current_user = _make_user(user_id, "admin-webhook@example.com", org_id, "admin")
|
||
|
|
|
||
|
|
with pytest.raises(exc.BadRequestError):
|
||
|
|
await org_service.create_notification_target(
|
||
|
|
current_user,
|
||
|
|
NotificationTargetCreate(name="Hook", target_type="webhook", webhook_url=None),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def test_create_notification_target_persists_target(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Notify Org", slug="notify-org")
|
||
|
|
user_id = await _create_user(db_conn, "notify@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "admin")
|
||
|
|
current_user = _make_user(user_id, "notify@example.com", org_id, "admin")
|
||
|
|
|
||
|
|
target = await org_service.create_notification_target(
|
||
|
|
current_user,
|
||
|
|
NotificationTargetCreate(
|
||
|
|
name="Pager", target_type="webhook", webhook_url="https://example.com/hook"
|
||
|
|
),
|
||
|
|
)
|
||
|
|
|
||
|
|
assert target.enabled is True
|
||
|
|
row = await db_conn.fetchrow(
|
||
|
|
"SELECT org_id, name FROM notification_targets WHERE id = $1",
|
||
|
|
target.id,
|
||
|
|
)
|
||
|
|
assert row is not None and row["org_id"] == org_id
|
||
|
|
|
||
|
|
|
||
|
|
async def test_get_notification_targets_scopes_to_org(org_service, db_conn):
|
||
|
|
org_id = await _create_org(db_conn, "Scope Org", slug="scope-org")
|
||
|
|
other_org = await _create_org(db_conn, "Scope Other", slug="scope-other")
|
||
|
|
user_id = await _create_user(db_conn, "scope@example.com")
|
||
|
|
await _add_membership(db_conn, user_id, org_id, "admin")
|
||
|
|
|
||
|
|
await _create_notification_target(db_conn, org_id, "Own Target")
|
||
|
|
await _create_notification_target(db_conn, other_org, "Other Target")
|
||
|
|
|
||
|
|
current_user = _make_user(user_id, "scope@example.com", org_id, "admin")
|
||
|
|
targets = await org_service.get_notification_targets(current_user)
|
||
|
|
|
||
|
|
assert len(targets) == 1
|
||
|
|
assert targets[0].name == "Own Target"
|