Files

200 lines
6.3 KiB
Python
Raw Permalink Normal View History

"""Notification repository for database operations."""
from datetime import datetime
from uuid import UUID
import asyncpg
class NotificationRepository:
"""Database operations for notification targets and attempts."""
def __init__(self, conn: asyncpg.Connection) -> None:
self.conn = conn
async def create_target(
self,
target_id: UUID,
org_id: UUID,
name: str,
target_type: str,
webhook_url: str | None = None,
enabled: bool = True,
) -> dict:
"""Create a new notification target."""
row = await self.conn.fetchrow(
"""
INSERT INTO notification_targets (id, org_id, name, target_type, webhook_url, enabled)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, org_id, name, target_type, webhook_url, enabled, created_at
""",
target_id,
org_id,
name,
target_type,
webhook_url,
enabled,
)
return dict(row)
async def get_target_by_id(self, target_id: UUID) -> dict | None:
"""Get notification target by ID."""
row = await self.conn.fetchrow(
"""
SELECT id, org_id, name, target_type, webhook_url, enabled, created_at
FROM notification_targets
WHERE id = $1
""",
target_id,
)
return dict(row) if row else None
async def get_targets_by_org(
self,
org_id: UUID,
enabled_only: bool = False,
) -> list[dict]:
"""Get all notification targets for an organization."""
query = """
SELECT id, org_id, name, target_type, webhook_url, enabled, created_at
FROM notification_targets
WHERE org_id = $1
"""
if enabled_only:
query += " AND enabled = true"
query += " ORDER BY name"
rows = await self.conn.fetch(query, org_id)
return [dict(row) for row in rows]
async def update_target(
self,
target_id: UUID,
name: str | None = None,
webhook_url: str | None = None,
enabled: bool | None = None,
) -> dict | None:
"""Update a notification target."""
updates = []
params = [target_id]
param_idx = 2
if name is not None:
updates.append(f"name = ${param_idx}")
params.append(name)
param_idx += 1
if webhook_url is not None:
updates.append(f"webhook_url = ${param_idx}")
params.append(webhook_url)
param_idx += 1
if enabled is not None:
updates.append(f"enabled = ${param_idx}")
params.append(enabled)
param_idx += 1
if not updates:
return await self.get_target_by_id(target_id)
query = f"""
UPDATE notification_targets
SET {", ".join(updates)}
WHERE id = $1
RETURNING id, org_id, name, target_type, webhook_url, enabled, created_at
"""
row = await self.conn.fetchrow(query, *params)
return dict(row) if row else None
async def delete_target(self, target_id: UUID) -> bool:
"""Delete a notification target. Returns True if deleted."""
result = await self.conn.execute(
"DELETE FROM notification_targets WHERE id = $1",
target_id,
)
return result == "DELETE 1"
async def create_attempt(
self,
attempt_id: UUID,
incident_id: UUID,
target_id: UUID,
) -> dict:
"""Create a notification attempt (idempotent via unique constraint)."""
row = await self.conn.fetchrow(
"""
INSERT INTO notification_attempts (id, incident_id, target_id, status)
VALUES ($1, $2, $3, 'pending')
ON CONFLICT (incident_id, target_id) DO UPDATE SET id = notification_attempts.id
RETURNING id, incident_id, target_id, status, error, sent_at, created_at
""",
attempt_id,
incident_id,
target_id,
)
return dict(row)
async def get_attempt(self, incident_id: UUID, target_id: UUID) -> dict | None:
"""Get notification attempt for incident and target."""
row = await self.conn.fetchrow(
"""
SELECT id, incident_id, target_id, status, error, sent_at, created_at
FROM notification_attempts
WHERE incident_id = $1 AND target_id = $2
""",
incident_id,
target_id,
)
return dict(row) if row else None
async def update_attempt_success(
self,
attempt_id: UUID,
sent_at: datetime,
) -> dict | None:
"""Mark notification attempt as successful."""
row = await self.conn.fetchrow(
"""
UPDATE notification_attempts
SET status = 'sent', sent_at = $2, error = NULL
WHERE id = $1
RETURNING id, incident_id, target_id, status, error, sent_at, created_at
""",
attempt_id,
sent_at,
)
return dict(row) if row else None
async def update_attempt_failure(
self,
attempt_id: UUID,
error: str,
) -> dict | None:
"""Mark notification attempt as failed."""
row = await self.conn.fetchrow(
"""
UPDATE notification_attempts
SET status = 'failed', error = $2
WHERE id = $1
RETURNING id, incident_id, target_id, status, error, sent_at, created_at
""",
attempt_id,
error,
)
return dict(row) if row else None
async def get_pending_attempts(self, incident_id: UUID) -> list[dict]:
"""Get all pending notification attempts for an incident."""
rows = await self.conn.fetch(
"""
SELECT na.id, na.incident_id, na.target_id, na.status, na.error,
na.sent_at, na.created_at,
nt.target_type, nt.webhook_url, nt.name as target_name
FROM notification_attempts na
JOIN notification_targets nt ON nt.id = na.target_id
WHERE na.incident_id = $1 AND na.status = 'pending'
""",
incident_id,
)
return [dict(row) for row in rows]