Files

162 lines
4.8 KiB
Python
Raw Permalink Normal View History

"""Incident repository for database operations."""
from datetime import datetime
from typing import Any
from uuid import UUID
import asyncpg
class IncidentRepository:
"""Database operations for incidents."""
def __init__(self, conn: asyncpg.Connection) -> None:
self.conn = conn
async def create(
self,
incident_id: UUID,
org_id: UUID,
service_id: UUID,
title: str,
description: str | None,
severity: str,
) -> dict:
"""Create a new incident."""
row = await self.conn.fetchrow(
"""
INSERT INTO incidents (id, org_id, service_id, title, description, status, severity)
VALUES ($1, $2, $3, $4, $5, 'triggered', $6)
RETURNING id, org_id, service_id, title, description, status, severity,
version, created_at, updated_at
""",
incident_id,
org_id,
service_id,
title,
description,
severity,
)
return dict(row)
async def get_by_id(self, incident_id: UUID) -> dict | None:
"""Get incident by ID."""
row = await self.conn.fetchrow(
"""
SELECT id, org_id, service_id, title, description, status, severity,
version, created_at, updated_at
FROM incidents
WHERE id = $1
""",
incident_id,
)
return dict(row) if row else None
async def get_by_org(
self,
org_id: UUID,
status: str | None = None,
cursor: datetime | None = None,
limit: int = 20,
) -> list[dict]:
"""Get incidents for an organization with optional filtering and pagination."""
query = """
SELECT id, org_id, service_id, title, description, status, severity,
version, created_at, updated_at
FROM incidents
WHERE org_id = $1
"""
params: list[Any] = [org_id]
param_idx = 2
if status:
query += f" AND status = ${param_idx}"
params.append(status)
param_idx += 1
if cursor:
query += f" AND created_at < ${param_idx}"
params.append(cursor)
param_idx += 1
query += f" ORDER BY created_at DESC LIMIT ${param_idx}"
params.append(limit + 1) # Fetch one extra to check if there are more
rows = await self.conn.fetch(query, *params)
return [dict(row) for row in rows]
async def update_status(
self,
incident_id: UUID,
new_status: str,
expected_version: int,
) -> dict | None:
"""Update incident status with optimistic locking.
Returns updated incident if successful, None if version mismatch.
"""
row = await self.conn.fetchrow(
"""
UPDATE incidents
SET status = $2, version = version + 1, updated_at = now()
WHERE id = $1 AND version = $3
RETURNING id, org_id, service_id, title, description, status, severity,
version, created_at, updated_at
""",
incident_id,
new_status,
expected_version,
)
return dict(row) if row else None
async def add_event(
self,
event_id: UUID,
incident_id: UUID,
event_type: str,
actor_user_id: UUID | None,
payload: dict[str, Any] | None,
) -> dict:
"""Add an event to the incident timeline."""
import json
row = await self.conn.fetchrow(
"""
INSERT INTO incident_events (id, incident_id, event_type, actor_user_id, payload)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, incident_id, event_type, actor_user_id, payload, created_at
""",
event_id,
incident_id,
event_type,
actor_user_id,
json.dumps(payload) if payload else None,
)
result = dict(row)
# Parse JSON payload back to dict
if result["payload"]:
result["payload"] = json.loads(result["payload"])
return result
async def get_events(self, incident_id: UUID) -> list[dict]:
"""Get all events for an incident."""
import json
rows = await self.conn.fetch(
"""
SELECT id, incident_id, event_type, actor_user_id, payload, created_at
FROM incident_events
WHERE incident_id = $1
ORDER BY created_at
""",
incident_id,
)
results = []
for row in rows:
result = dict(row)
if result["payload"]:
result["payload"] = json.loads(result["payload"])
results.append(result)
return results