Files

390 lines
17 KiB
Python
Raw Permalink Normal View History

"""Tests for IncidentRepository."""
from uuid import uuid4
import asyncpg
import pytest
from app.repositories.incident import IncidentRepository
from app.repositories.org import OrgRepository
from app.repositories.service import ServiceRepository
from app.repositories.user import UserRepository
class TestIncidentRepository:
"""Tests for IncidentRepository conforming to SPECS.md."""
async def _create_org(self, conn: asyncpg.Connection, slug: str) -> uuid4:
"""Helper to create an org."""
org_repo = OrgRepository(conn)
org_id = uuid4()
await org_repo.create(org_id, f"Org {slug}", slug)
return org_id
async def _create_service(self, conn: asyncpg.Connection, org_id: uuid4, slug: str) -> uuid4:
"""Helper to create a service."""
service_repo = ServiceRepository(conn)
service_id = uuid4()
await service_repo.create(service_id, org_id, f"Service {slug}", slug)
return service_id
async def _create_user(self, conn: asyncpg.Connection, email: str) -> uuid4:
"""Helper to create a user."""
user_repo = UserRepository(conn)
user_id = uuid4()
await user_repo.create(user_id, email, "hash")
return user_id
async def test_create_incident_returns_incident_data(self, db_conn: asyncpg.Connection) -> None:
"""Creating an incident returns the incident data with triggered status."""
org_id = await self._create_org(db_conn, "incident-org")
service_id = await self._create_service(db_conn, org_id, "incident-service")
repo = IncidentRepository(db_conn)
incident_id = uuid4()
result = await repo.create(
incident_id, org_id, service_id,
title="Server Down",
description="Main API server is not responding",
severity="critical"
)
assert result["id"] == incident_id
assert result["org_id"] == org_id
assert result["service_id"] == service_id
assert result["title"] == "Server Down"
assert result["description"] == "Main API server is not responding"
assert result["status"] == "triggered" # Initial status per SPECS.md
assert result["severity"] == "critical"
assert result["version"] == 1
assert result["created_at"] is not None
assert result["updated_at"] is not None
async def test_create_incident_initial_status_is_triggered(self, db_conn: asyncpg.Connection) -> None:
"""New incidents always start with 'triggered' status per SPECS.md state machine."""
org_id = await self._create_org(db_conn, "triggered-org")
service_id = await self._create_service(db_conn, org_id, "triggered-service")
repo = IncidentRepository(db_conn)
result = await repo.create(uuid4(), org_id, service_id, "Test", None, "low")
assert result["status"] == "triggered"
async def test_create_incident_initial_version_is_one(self, db_conn: asyncpg.Connection) -> None:
"""New incidents start with version 1 for optimistic locking."""
org_id = await self._create_org(db_conn, "version-org")
service_id = await self._create_service(db_conn, org_id, "version-service")
repo = IncidentRepository(db_conn)
result = await repo.create(uuid4(), org_id, service_id, "Test", None, "medium")
assert result["version"] == 1
async def test_create_incident_severity_must_be_valid(self, db_conn: asyncpg.Connection) -> None:
"""Severity must be critical, high, medium, or low per SPECS.md."""
org_id = await self._create_org(db_conn, "severity-org")
service_id = await self._create_service(db_conn, org_id, "severity-service")
repo = IncidentRepository(db_conn)
# Valid severities
for severity in ["critical", "high", "medium", "low"]:
result = await repo.create(uuid4(), org_id, service_id, f"Test {severity}", None, severity)
assert result["severity"] == severity
# Invalid severity
with pytest.raises(asyncpg.CheckViolationError):
await repo.create(uuid4(), org_id, service_id, "Invalid", None, "extreme")
async def test_get_by_id_returns_incident(self, db_conn: asyncpg.Connection) -> None:
"""get_by_id returns the correct incident."""
org_id = await self._create_org(db_conn, "getbyid-org")
service_id = await self._create_service(db_conn, org_id, "getbyid-service")
repo = IncidentRepository(db_conn)
incident_id = uuid4()
await repo.create(incident_id, org_id, service_id, "My Incident", "Details", "high")
result = await repo.get_by_id(incident_id)
assert result is not None
assert result["id"] == incident_id
assert result["title"] == "My Incident"
async def test_get_by_id_returns_none_for_nonexistent(self, db_conn: asyncpg.Connection) -> None:
"""get_by_id returns None for non-existent incident."""
repo = IncidentRepository(db_conn)
result = await repo.get_by_id(uuid4())
assert result is None
async def test_get_by_org_returns_org_incidents(self, db_conn: asyncpg.Connection) -> None:
"""get_by_org returns incidents for the organization."""
org_id = await self._create_org(db_conn, "list-org")
service_id = await self._create_service(db_conn, org_id, "list-service")
repo = IncidentRepository(db_conn)
await repo.create(uuid4(), org_id, service_id, "Incident 1", None, "low")
await repo.create(uuid4(), org_id, service_id, "Incident 2", None, "medium")
await repo.create(uuid4(), org_id, service_id, "Incident 3", None, "high")
result = await repo.get_by_org(org_id)
assert len(result) == 3
async def test_get_by_org_filters_by_status(self, db_conn: asyncpg.Connection) -> None:
"""get_by_org can filter by status."""
org_id = await self._create_org(db_conn, "filter-org")
service_id = await self._create_service(db_conn, org_id, "filter-service")
repo = IncidentRepository(db_conn)
# Create incidents and transition some
inc1 = uuid4()
inc2 = uuid4()
await repo.create(inc1, org_id, service_id, "Triggered", None, "low")
await repo.create(inc2, org_id, service_id, "Will be Acked", None, "low")
await repo.update_status(inc2, "acknowledged", 1)
result = await repo.get_by_org(org_id, status="triggered")
assert len(result) == 1
assert result[0]["title"] == "Triggered"
async def test_get_by_org_pagination_with_cursor(self, db_conn: asyncpg.Connection) -> None:
"""get_by_org supports cursor-based pagination."""
org_id = await self._create_org(db_conn, "pagination-org")
service_id = await self._create_service(db_conn, org_id, "pagination-service")
repo = IncidentRepository(db_conn)
# Create 5 incidents
for i in range(5):
await repo.create(uuid4(), org_id, service_id, f"Incident {i}", None, "low")
# Get first page - should return limit+1 to check for more
page1 = await repo.get_by_org(org_id, limit=2)
assert len(page1) == 3
# Verify total is 5 when we get all
all_incidents = await repo.get_by_org(org_id, limit=10)
assert len(all_incidents) == 5
async def test_get_by_org_orders_by_created_at_desc(self, db_conn: asyncpg.Connection) -> None:
"""get_by_org returns incidents ordered by created_at descending."""
org_id = await self._create_org(db_conn, "order-org")
service_id = await self._create_service(db_conn, org_id, "order-service")
repo = IncidentRepository(db_conn)
await repo.create(uuid4(), org_id, service_id, "First", None, "low")
await repo.create(uuid4(), org_id, service_id, "Second", None, "low")
await repo.create(uuid4(), org_id, service_id, "Third", None, "low")
result = await repo.get_by_org(org_id)
# Verify ordering - newer items should come first (or same time due to fast execution)
assert len(result) == 3
for i in range(len(result) - 1):
assert result[i]["created_at"] >= result[i + 1]["created_at"]
async def test_get_by_org_tenant_isolation(self, db_conn: asyncpg.Connection) -> None:
"""get_by_org only returns incidents for the specified org."""
org1 = await self._create_org(db_conn, "tenant-org-1")
org2 = await self._create_org(db_conn, "tenant-org-2")
service1 = await self._create_service(db_conn, org1, "tenant-service-1")
service2 = await self._create_service(db_conn, org2, "tenant-service-2")
repo = IncidentRepository(db_conn)
await repo.create(uuid4(), org1, service1, "Org1 Incident", None, "low")
await repo.create(uuid4(), org2, service2, "Org2 Incident", None, "low")
result = await repo.get_by_org(org1)
assert len(result) == 1
assert result[0]["title"] == "Org1 Incident"
class TestIncidentStatusTransitions:
"""Tests for incident status transitions per SPECS.md state machine."""
async def _setup_incident(self, conn: asyncpg.Connection) -> tuple[uuid4, IncidentRepository]:
"""Helper to create org, service, and incident."""
org_repo = OrgRepository(conn)
service_repo = ServiceRepository(conn)
incident_repo = IncidentRepository(conn)
org_id = uuid4()
service_id = uuid4()
incident_id = uuid4()
await org_repo.create(org_id, "Test Org", f"test-org-{uuid4().hex[:8]}")
await service_repo.create(service_id, org_id, "Test Service", f"test-service-{uuid4().hex[:8]}")
await incident_repo.create(incident_id, org_id, service_id, "Test Incident", None, "medium")
return incident_id, incident_repo
async def test_update_status_increments_version(self, db_conn: asyncpg.Connection) -> None:
"""update_status increments version for optimistic locking."""
incident_id, repo = await self._setup_incident(db_conn)
result = await repo.update_status(incident_id, "acknowledged", 1)
assert result is not None
assert result["version"] == 2
async def test_update_status_fails_on_version_mismatch(self, db_conn: asyncpg.Connection) -> None:
"""update_status returns None on version mismatch (optimistic locking)."""
incident_id, repo = await self._setup_incident(db_conn)
# Try with wrong version
result = await repo.update_status(incident_id, "acknowledged", 999)
assert result is None
async def test_update_status_updates_updated_at(self, db_conn: asyncpg.Connection) -> None:
"""update_status updates the updated_at timestamp."""
incident_id, repo = await self._setup_incident(db_conn)
before = await repo.get_by_id(incident_id)
result = await repo.update_status(incident_id, "acknowledged", 1)
# updated_at should be at least as recent as before (may be same in fast execution)
assert result["updated_at"] >= before["updated_at"]
# Also verify status was actually updated
assert result["status"] == "acknowledged"
async def test_status_must_be_valid_value(self, db_conn: asyncpg.Connection) -> None:
"""Status must be triggered, acknowledged, mitigated, or resolved per SPECS.md."""
incident_id, repo = await self._setup_incident(db_conn)
with pytest.raises(asyncpg.CheckViolationError):
await repo.update_status(incident_id, "invalid_status", 1)
async def test_valid_status_transitions(self, db_conn: asyncpg.Connection) -> None:
"""Test the valid status values per SPECS.md."""
incident_id, repo = await self._setup_incident(db_conn)
# Triggered -> Acknowledged
result = await repo.update_status(incident_id, "acknowledged", 1)
assert result["status"] == "acknowledged"
# Acknowledged -> Mitigated
result = await repo.update_status(incident_id, "mitigated", 2)
assert result["status"] == "mitigated"
# Mitigated -> Resolved
result = await repo.update_status(incident_id, "resolved", 3)
assert result["status"] == "resolved"
class TestIncidentEvents:
"""Tests for incident events (timeline) per SPECS.md incident_events table."""
async def _setup_incident(self, conn: asyncpg.Connection) -> tuple[uuid4, uuid4, IncidentRepository]:
"""Helper to create org, service, user, and incident."""
org_repo = OrgRepository(conn)
service_repo = ServiceRepository(conn)
user_repo = UserRepository(conn)
incident_repo = IncidentRepository(conn)
org_id = uuid4()
service_id = uuid4()
user_id = uuid4()
incident_id = uuid4()
await org_repo.create(org_id, "Test Org", f"test-org-{uuid4().hex[:8]}")
await service_repo.create(service_id, org_id, "Test Service", f"test-svc-{uuid4().hex[:8]}")
await user_repo.create(user_id, f"user-{uuid4().hex[:8]}@example.com", "hash")
await incident_repo.create(incident_id, org_id, service_id, "Test Incident", None, "medium")
return incident_id, user_id, incident_repo
async def test_add_event_creates_event(self, db_conn: asyncpg.Connection) -> None:
"""add_event creates an event in the timeline."""
incident_id, user_id, repo = await self._setup_incident(db_conn)
event_id = uuid4()
result = await repo.add_event(
event_id, incident_id, "status_changed",
actor_user_id=user_id,
payload={"from": "triggered", "to": "acknowledged"}
)
assert result["id"] == event_id
assert result["incident_id"] == incident_id
assert result["event_type"] == "status_changed"
assert result["actor_user_id"] == user_id
assert result["payload"] == {"from": "triggered", "to": "acknowledged"}
assert result["created_at"] is not None
async def test_add_event_allows_null_actor(self, db_conn: asyncpg.Connection) -> None:
"""add_event allows null actor_user_id (system events)."""
incident_id, _, repo = await self._setup_incident(db_conn)
result = await repo.add_event(
uuid4(), incident_id, "auto_escalated",
actor_user_id=None,
payload={"reason": "Unacknowledged after 30 minutes"}
)
assert result["actor_user_id"] is None
async def test_add_event_allows_null_payload(self, db_conn: asyncpg.Connection) -> None:
"""add_event allows null payload."""
incident_id, user_id, repo = await self._setup_incident(db_conn)
result = await repo.add_event(
uuid4(), incident_id, "viewed",
actor_user_id=user_id,
payload=None
)
assert result["payload"] is None
async def test_get_events_returns_all_incident_events(self, db_conn: asyncpg.Connection) -> None:
"""get_events returns all events for an incident."""
incident_id, user_id, repo = await self._setup_incident(db_conn)
await repo.add_event(uuid4(), incident_id, "created", user_id, {"title": "Test"})
await repo.add_event(uuid4(), incident_id, "status_changed", user_id, {"to": "acked"})
await repo.add_event(uuid4(), incident_id, "comment_added", user_id, {"text": "Working on it"})
result = await repo.get_events(incident_id)
assert len(result) == 3
event_types = [e["event_type"] for e in result]
assert event_types == ["created", "status_changed", "comment_added"]
async def test_get_events_orders_by_created_at(self, db_conn: asyncpg.Connection) -> None:
"""get_events returns events in chronological order."""
incident_id, user_id, repo = await self._setup_incident(db_conn)
await repo.add_event(uuid4(), incident_id, "first", user_id, None)
await repo.add_event(uuid4(), incident_id, "second", user_id, None)
await repo.add_event(uuid4(), incident_id, "third", user_id, None)
result = await repo.get_events(incident_id)
assert result[0]["event_type"] == "first"
assert result[1]["event_type"] == "second"
assert result[2]["event_type"] == "third"
async def test_get_events_returns_empty_for_no_events(self, db_conn: asyncpg.Connection) -> None:
"""get_events returns empty list for incident with no events."""
incident_id, _, repo = await self._setup_incident(db_conn)
result = await repo.get_events(incident_id)
assert result == []
async def test_event_requires_valid_incident_foreign_key(self, db_conn: asyncpg.Connection) -> None:
"""incident_events.incident_id must reference existing incident."""
incident_id, user_id, repo = await self._setup_incident(db_conn)
with pytest.raises(asyncpg.ForeignKeyViolationError):
await repo.add_event(uuid4(), uuid4(), "test", user_id, None)
async def test_event_requires_valid_user_foreign_key(self, db_conn: asyncpg.Connection) -> None:
"""incident_events.actor_user_id must reference existing user if not null."""
incident_id, _, repo = await self._setup_incident(db_conn)
with pytest.raises(asyncpg.ForeignKeyViolationError):
await repo.add_event(uuid4(), incident_id, "test", uuid4(), None)