diff --git a/Dockerfile b/Dockerfile index 2b51fb2..665af1b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ WORKDIR /app COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ # Install Python dependencies -COPY pyproject.toml uv.lock ./ +COPY pyproject.toml uv.lock README.md ./ RUN uv sync --no-cache --no-dev # Copy application code @@ -15,9 +15,17 @@ COPY app/ ./app/ COPY worker/ ./worker/ COPY migrations/ ./migrations/ +# Set up non-root user and cache directory +RUN useradd -m -u 1000 appuser && \ + mkdir -p /app/.cache && \ + chown -R appuser:appuser /app + +ENV UV_CACHE_DIR=/app/.cache + # API service target FROM base AS api +USER appuser EXPOSE 8000 CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] @@ -25,4 +33,6 @@ CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "800 # Worker service target FROM base AS worker +USER appuser + CMD ["uv", "run", "celery", "-A", "worker.celery_app", "worker", "--loglevel=info", "-Q", "critical,default,low"] diff --git a/README.md b/README.md index 12b37f8..21628bf 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,40 @@ A fullstack on-call & incident management platform +## Environment Configuration + +| Variable | Description | Default | +|----------|-------------|---------| +| `DATABASE_URL` | Postgres connection string | — | +| `REDIS_URL` | Legacy redis endpoint, also used if no broker override is supplied | `redis://localhost:6379/0` | +| `TASK_QUEUE_DRIVER` | Task queue implementation (`celery` or `inmemory`) | `celery` | +| `TASK_QUEUE_BROKER_URL` | Celery broker URL (falls back to `REDIS_URL` when unset) | `None` | +| `TASK_QUEUE_BACKEND` | Celery transport semantics (`redis` or `sqs`) | `redis` | +| `TASK_QUEUE_DEFAULT_QUEUE` | Queue used for fan-out + notification deliveries | `default` | +| `TASK_QUEUE_CRITICAL_QUEUE` | Queue used for escalation + delayed work | `critical` | +| `TASK_QUEUE_VISIBILITY_TIMEOUT` | Visibility timeout passed to `sqs` transport | `600` | +| `TASK_QUEUE_POLLING_INTERVAL` | Polling interval for `sqs` transport (seconds) | `1.0` | +| `NOTIFICATION_ESCALATION_DELAY_SECONDS` | Delay before re-checking unacknowledged incidents | `900` | +| `AWS_REGION` | Region used when `TASK_QUEUE_BACKEND=sqs` | `None` | +| `JWT_SECRET_KEY` | Symmetric JWT signing key | — | +| `JWT_ALGORITHM` | JWT algorithm | `HS256` | +| `JWT_ISSUER` | JWT issuer claim | `incidentops` | +| `JWT_AUDIENCE` | JWT audience claim | `incidentops-api` | + +### Task Queue Modes + +- **Development / Tests** – Set `TASK_QUEUE_DRIVER=inmemory` to bypass Celery entirely (default for local pytest). The API will enqueue events into an in-memory recorder while the worker code remains importable. +- **Celery + Redis** – Set `TASK_QUEUE_DRIVER=celery` and either leave `TASK_QUEUE_BROKER_URL` unset (and rely on `REDIS_URL`) or point it to another Redis endpoint. This is the default production-style configuration. +- **Celery + Amazon SQS** – Provide `TASK_QUEUE_BROKER_URL=sqs://` (Celery automatically discovers credentials), set `TASK_QUEUE_BACKEND=sqs`, and configure `AWS_REGION`. Optional tuning is available via the visibility timeout and polling interval variables above. + +### Running the Worker + +The worker automatically discovers tasks under `worker/tasks`. Use the same environment variables as the API: + +``` +uv run celery -A worker.celery_app worker --loglevel=info +``` + ## Setup ### Docker Compose diff --git a/app/api/v1/health.py b/app/api/v1/health.py index d4b6f04..e30f68a 100644 --- a/app/api/v1/health.py +++ b/app/api/v1/health.py @@ -2,7 +2,8 @@ from fastapi import APIRouter, Response, status -from app.db import db, redis_client +from app.db import db +from app.taskqueue import task_queue router = APIRouter() @@ -16,14 +17,14 @@ async def healthz() -> dict[str, str]: @router.get("/readyz") async def readyz(response: Response) -> dict[str, str | dict[str, bool]]: """ - Readiness probe - checks database and Redis connectivity. + Readiness probe - checks database and task queue connectivity. - Check Postgres status - - Check Redis status + - Check configured task queue backend - Return overall healthiness """ checks = { "postgres": False, - "redis": False, + "task_queue": False, } try: @@ -34,7 +35,7 @@ async def readyz(response: Response) -> dict[str, str | dict[str, bool]]: except Exception: pass - checks["redis"] = await redis_client.ping() + checks["task_queue"] = await task_queue.ping() all_healthy = all(checks.values()) if not all_healthy: diff --git a/app/config.py b/app/config.py index d9b5963..ef626a0 100644 --- a/app/config.py +++ b/app/config.py @@ -1,5 +1,7 @@ """Application configuration via pydantic-settings.""" +from typing import Literal + from pydantic_settings import BaseSettings, SettingsConfigDict @@ -15,9 +17,22 @@ class Settings(BaseSettings): # Database database_url: str - # Redis + # Redis (legacy default for Celery broker) redis_url: str = "redis://localhost:6379/0" + # Task queue + task_queue_driver: Literal["celery", "inmemory"] = "celery" + task_queue_broker_url: str | None = None + task_queue_backend: Literal["redis", "sqs"] = "redis" + task_queue_default_queue: str = "default" + task_queue_critical_queue: str = "critical" + task_queue_visibility_timeout: int = 600 + task_queue_polling_interval: float = 1.0 + notification_escalation_delay_seconds: int = 900 + + # AWS (used when task_queue_backend="sqs") + aws_region: str | None = None + # JWT jwt_secret_key: str jwt_algorithm: str = "HS256" @@ -30,5 +45,22 @@ class Settings(BaseSettings): debug: bool = False api_v1_prefix: str = "/v1" + # OpenTelemetry + otel_enabled: bool = True + otel_service_name: str = "incidentops-api" + otel_environment: str = "development" + otel_exporter_otlp_endpoint: str | None = None # e.g., "http://tempo:4317" + otel_exporter_otlp_insecure: bool = True + otel_log_level: str = "INFO" -settings = Settings() + # Metrics + prometheus_port: int = 9464 # Port for Prometheus metrics endpoint + + @property + def resolved_task_queue_broker_url(self) -> str: + """Return the broker URL with redis fallback for backwards compatibility.""" + + return self.task_queue_broker_url or self.redis_url + + +settings = Settings() # type: ignore[call-arg] diff --git a/app/core/logging.py b/app/core/logging.py new file mode 100644 index 0000000..ea31c32 --- /dev/null +++ b/app/core/logging.py @@ -0,0 +1,164 @@ +"""Structured JSON logging configuration with OpenTelemetry integration.""" + +import json +import logging +import sys +from datetime import datetime, timezone +from typing import Any + +from app.config import settings + + +class JSONFormatter(logging.Formatter): + """ + JSON log formatter that outputs structured logs with trace context. + + Log format includes: + - timestamp: ISO 8601 format + - level: Log level name + - message: Log message + - logger: Logger name + - trace_id: OpenTelemetry trace ID (if available) + - span_id: OpenTelemetry span ID (if available) + - Extra fields from log record + """ + + def format(self, record: logging.LogRecord) -> str: + log_data: dict[str, Any] = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "message": record.getMessage(), + "logger": record.name, + } + + # Add trace context if available (injected by OpenTelemetry LoggingInstrumentor) + if hasattr(record, "otelTraceID") and record.otelTraceID != "0": + log_data["trace_id"] = record.otelTraceID + if hasattr(record, "otelSpanID") and record.otelSpanID != "0": + log_data["span_id"] = record.otelSpanID + + # Add exception info if present + if record.exc_info: + log_data["exception"] = self.formatException(record.exc_info) + + # Add extra fields (excluding standard LogRecord attributes) + standard_attrs = { + "name", + "msg", + "args", + "created", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "exc_info", + "exc_text", + "thread", + "threadName", + "taskName", + "message", + "otelTraceID", + "otelSpanID", + "otelTraceSampled", + "otelServiceName", + } + for key, value in record.__dict__.items(): + if key not in standard_attrs and not key.startswith("_"): + log_data[key] = value + + return json.dumps(log_data, default=str) + + +class DevelopmentFormatter(logging.Formatter): + """ + Human-readable formatter for development with color support. + + Format: [TIME] LEVEL logger - message [trace_id] + """ + + COLORS = { + "DEBUG": "\033[36m", # Cyan + "INFO": "\033[32m", # Green + "WARNING": "\033[33m", # Yellow + "ERROR": "\033[31m", # Red + "CRITICAL": "\033[35m", # Magenta + } + RESET = "\033[0m" + + def format(self, record: logging.LogRecord) -> str: + color = self.COLORS.get(record.levelname, "") + reset = self.RESET + + # Format timestamp + timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S.%f")[:-3] + + # Build message + msg = f"[{timestamp}] {color}{record.levelname:8}{reset} {record.name} - {record.getMessage()}" + + # Add trace context if available + if hasattr(record, "otelTraceID") and record.otelTraceID != "0": + msg += f" [{record.otelTraceID[:8]}...]" + + # Add exception if present + if record.exc_info: + msg += f"\n{self.formatException(record.exc_info)}" + + return msg + + +def setup_logging() -> None: + """ + Configure application logging. + + - JSON format in production (OTEL enabled) + - Human-readable format in development + - Integrates with OpenTelemetry trace context + """ + # Determine log level + log_level = getattr(logging, settings.otel_log_level.upper(), logging.INFO) + + # Choose formatter based on environment + if settings.otel_enabled and not settings.debug: + formatter = JSONFormatter() + else: + formatter = DevelopmentFormatter() + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + + # Remove existing handlers + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Add stdout handler + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + root_logger.addHandler(handler) + + # Reduce noise from third-party libraries (keep uvicorn access at INFO so requests are logged) + logging.getLogger("uvicorn.access").setLevel(logging.INFO) + logging.getLogger("asyncpg").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + + logging.info( + "Logging configured", + extra={ + "log_level": settings.otel_log_level, + "format": "json" if settings.otel_enabled and not settings.debug else "dev", + }, + ) + + +def get_logger(name: str) -> logging.Logger: + """Get a logger instance with the given name.""" + return logging.getLogger(name) diff --git a/app/core/telemetry.py b/app/core/telemetry.py new file mode 100644 index 0000000..43e107d --- /dev/null +++ b/app/core/telemetry.py @@ -0,0 +1,271 @@ +"""OpenTelemetry instrumentation for tracing, metrics, and logging.""" + +import logging +from contextlib import contextmanager +from typing import Any + +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.semconv.resource import ResourceAttributes +from prometheus_client import REGISTRY, start_http_server + +from app.config import settings + +logger = logging.getLogger(__name__) + +_tracer_provider: TracerProvider | None = None +_meter_provider: MeterProvider | None = None + +# Custom metrics +_request_counter = None +_request_duration = None +_active_requests = None +_error_counter = None + + +def setup_telemetry(app: Any) -> None: + """ + Initialize OpenTelemetry with tracing, metrics, and logging instrumentation. + + Configures: + - OTLP exporter for traces (to Tempo/Jaeger) + - Prometheus exporter for metrics (scraped by Prometheus) + - Auto-instrumentation for FastAPI, asyncpg, httpx, redis + - System metrics (CPU, memory, etc.) + - Logging instrumentation for trace context injection + """ + global _tracer_provider, _meter_provider + global _request_counter, _request_duration, _active_requests, _error_counter + + if not settings.otel_enabled: + logger.info("OpenTelemetry disabled") + return + + # Create resource with service info + resource = Resource.create( + { + ResourceAttributes.SERVICE_NAME: settings.otel_service_name, + ResourceAttributes.SERVICE_VERSION: "0.1.0", + ResourceAttributes.DEPLOYMENT_ENVIRONMENT: settings.otel_environment, + } + ) + + # ========================================= + # TRACING SETUP + # ========================================= + _tracer_provider = TracerProvider(resource=resource) + + if settings.otel_exporter_otlp_endpoint: + otlp_exporter = OTLPSpanExporter( + endpoint=settings.otel_exporter_otlp_endpoint, + insecure=settings.otel_exporter_otlp_insecure, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + logger.info(f"OTLP exporter configured: {settings.otel_exporter_otlp_endpoint}") + else: + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + logger.info("Console span exporter configured (no OTLP endpoint)") + + trace.set_tracer_provider(_tracer_provider) + + # ========================================= + # METRICS SETUP + # ========================================= + # Prometheus metric reader exposes metrics at /metrics endpoint + prometheus_reader = PrometheusMetricReader() + _meter_provider = MeterProvider(resource=resource, metric_readers=[prometheus_reader]) + metrics.set_meter_provider(_meter_provider) + + # Start Prometheus HTTP server on port 9464 + prometheus_port = settings.prometheus_port + try: + start_http_server(port=prometheus_port, registry=REGISTRY) + logger.info(f"Prometheus metrics server started on port {prometheus_port}") + except OSError as e: + logger.warning(f"Could not start Prometheus server on port {prometheus_port}: {e}") + + # Create custom metrics + meter = metrics.get_meter(__name__) + + _request_counter = meter.create_counter( + name="http_requests_total", + description="Total number of HTTP requests", + unit="1", + ) + + _request_duration = meter.create_histogram( + name="http_request_duration_seconds", + description="HTTP request duration in seconds", + unit="s", + ) + + _active_requests = meter.create_up_down_counter( + name="http_requests_active", + description="Number of active HTTP requests", + unit="1", + ) + + _error_counter = meter.create_counter( + name="http_errors_total", + description="Total number of HTTP errors", + unit="1", + ) + + # Instrument system metrics (CPU, memory, etc.) + SystemMetricsInstrumentor().instrument() + logger.info("System metrics instrumentation enabled") + + # ========================================= + # LIBRARY INSTRUMENTATION + # ========================================= + FastAPIInstrumentor.instrument_app( + app, + excluded_urls="healthz,readyz,metrics", + tracer_provider=_tracer_provider, + meter_provider=_meter_provider, + ) + AsyncPGInstrumentor().instrument(tracer_provider=_tracer_provider) + HTTPXClientInstrumentor().instrument(tracer_provider=_tracer_provider) + RedisInstrumentor().instrument(tracer_provider=_tracer_provider) + + # Inject trace context into logs + LoggingInstrumentor().instrument( + set_logging_format=True, + log_level=logging.INFO, + ) + + logger.info( + f"OpenTelemetry initialized: service={settings.otel_service_name}, " + f"env={settings.otel_environment}, metrics_port={prometheus_port}" + ) + + +async def shutdown_telemetry() -> None: + """Gracefully shutdown the tracer and meter providers.""" + global _tracer_provider, _meter_provider + + if _tracer_provider: + _tracer_provider.shutdown() + _tracer_provider = None + logger.info("Tracer provider shutdown complete") + + if _meter_provider: + _meter_provider.shutdown() + _meter_provider = None + logger.info("Meter provider shutdown complete") + + +def get_tracer(name: str) -> trace.Tracer: + """Get a tracer instance for manual span creation.""" + return trace.get_tracer(name) + + +def get_meter(name: str) -> metrics.Meter: + """Get a meter instance for custom metrics.""" + return metrics.get_meter(name) + + +def get_current_trace_id() -> str | None: + """Get the current trace ID for request correlation.""" + span = trace.get_current_span() + if span and span.get_span_context().is_valid: + return format(span.get_span_context().trace_id, "032x") + return None + + +def get_current_span_id() -> str | None: + """Get the current span ID.""" + span = trace.get_current_span() + if span and span.get_span_context().is_valid: + return format(span.get_span_context().span_id, "016x") + return None + + +@contextmanager +def create_span(name: str, attributes: dict[str, Any] | None = None): + """Context manager for creating manual spans.""" + tracer = get_tracer(__name__) + with tracer.start_as_current_span(name, attributes=attributes) as span: + yield span + + +def add_span_attributes(attributes: dict[str, Any]) -> None: + """Add attributes to the current span.""" + span = trace.get_current_span() + if span: + for key, value in attributes.items(): + span.set_attribute(key, value) + + +def record_exception(exception: Exception) -> None: + """Record an exception on the current span.""" + span = trace.get_current_span() + if span: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception))) + + +# ========================================= +# CUSTOM METRICS HELPERS +# ========================================= + + +def record_request(method: str, endpoint: str, status_code: int) -> None: + """Record a request metric.""" + if _request_counter: + _request_counter.add( + 1, + { + "method": method, + "endpoint": endpoint, + "status_code": str(status_code), + }, + ) + + +def record_request_duration(method: str, endpoint: str, duration: float) -> None: + """Record request duration in seconds.""" + if _request_duration: + _request_duration.record( + duration, + { + "method": method, + "endpoint": endpoint, + }, + ) + + +def increment_active_requests(method: str, endpoint: str) -> None: + """Increment active requests counter.""" + if _active_requests: + _active_requests.add(1, {"method": method, "endpoint": endpoint}) + + +def decrement_active_requests(method: str, endpoint: str) -> None: + """Decrement active requests counter.""" + if _active_requests: + _active_requests.add(-1, {"method": method, "endpoint": endpoint}) + + +def record_error(method: str, endpoint: str, error_type: str) -> None: + """Record an error metric.""" + if _error_counter: + _error_counter.add( + 1, + { + "method": method, + "endpoint": endpoint, + "error_type": error_type, + }, + ) diff --git a/app/db.py b/app/db.py index fb167ba..3399aea 100644 --- a/app/db.py +++ b/app/db.py @@ -6,7 +6,6 @@ from contextvars import ContextVar import asyncpg from asyncpg.pool import PoolConnectionProxy -import redis.asyncio as redis class Database: @@ -46,34 +45,8 @@ class Database: yield conn -class RedisClient: - """Manages Redis connection.""" - - client: redis.Redis | None = None - - async def connect(self, url: str) -> None: - """Create Redis connection.""" - self.client = redis.from_url(url, decode_responses=True) - - async def disconnect(self) -> None: - """Close Redis connection.""" - if self.client: - await self.client.aclose() - - async def ping(self) -> bool: - """Check if Redis is reachable.""" - if not self.client: - return False - try: - await self.client.ping() - return True - except redis.RedisError: - return False - - -# Global instances +# Global instance db = Database() -redis_client = RedisClient() _connection_ctx: ContextVar[asyncpg.Connection | PoolConnectionProxy | None] = ContextVar( diff --git a/app/main.py b/app/main.py index 8eb70d7..7090db1 100644 --- a/app/main.py +++ b/app/main.py @@ -1,26 +1,50 @@ """FastAPI application entry point.""" +import logging +import time from contextlib import asynccontextmanager from typing import AsyncGenerator -from fastapi import FastAPI +from fastapi import FastAPI, Request, status +from fastapi.encoders import jsonable_encoder +from fastapi.exceptions import RequestValidationError from fastapi.openapi.utils import get_openapi +from fastapi.responses import JSONResponse +from starlette.exceptions import HTTPException as StarletteHTTPException from app.api.v1 import auth, health, incidents, org from app.config import settings -from app.db import db, redis_client +from app.core.logging import setup_logging +from app.core.telemetry import ( + get_current_trace_id, + record_exception, + setup_telemetry, + shutdown_telemetry, +) +from app.db import db +from app.schemas.common import ErrorDetail, ErrorResponse +from app.taskqueue import task_queue + +# Initialize logging before anything else +setup_logging() +logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Manage application lifecycle - connect/disconnect resources.""" # Startup + logger.info("Starting IncidentOps API") await db.connect(settings.database_url) - await redis_client.connect(settings.redis_url) + await task_queue.startup() + logger.info("Startup complete") yield # Shutdown - await redis_client.disconnect() + logger.info("Shutting down IncidentOps API") + await task_queue.shutdown() await db.disconnect() + await shutdown_telemetry() + logger.info("Shutdown complete") app = FastAPI( @@ -33,6 +57,26 @@ app = FastAPI( lifespan=lifespan, ) +# Set up OpenTelemetry instrumentation +setup_telemetry(app) + + +@app.middleware("http") +async def request_logging_middleware(request: Request, call_next): + start = time.time() + response = await call_next(request) + duration_ms = (time.time() - start) * 1000 + logger.info( + "request", + extra={ + "method": request.method, + "path": request.url.path, + "status_code": response.status_code, + "duration_ms": round(duration_ms, 2), + }, + ) + return response + app.openapi_tags = [ {"name": "auth", "description": "Registration, login, token lifecycle"}, {"name": "org", "description": "Organization membership, services, and notifications"}, @@ -41,9 +85,133 @@ app.openapi_tags = [ ] -def custom_openapi() -> dict: - """Add JWT bearer security scheme to the generated OpenAPI schema.""" +# --------------------------------------------------------------------------- +# Global Exception Handlers +# --------------------------------------------------------------------------- + +def _build_error_response( + error: str, + message: str, + status_code: int, + details: list[ErrorDetail] | None = None, +) -> JSONResponse: + """Build a structured error response with trace context.""" + response = ErrorResponse( + error=error, + message=message, + details=details, + request_id=get_current_trace_id(), + ) + return JSONResponse( + status_code=status_code, + content=jsonable_encoder(response), + ) + + +@app.exception_handler(StarletteHTTPException) +async def http_exception_handler( + request: Request, exc: StarletteHTTPException +) -> JSONResponse: + """Handle HTTP exceptions with structured error responses.""" + # Map status codes to error type strings + error_types = { + 400: "bad_request", + 401: "unauthorized", + 403: "forbidden", + 404: "not_found", + 409: "conflict", + 422: "validation_error", + 429: "rate_limited", + 500: "internal_error", + 502: "bad_gateway", + 503: "service_unavailable", + } + error_type = error_types.get(exc.status_code, "error") + + logger.warning( + "HTTP exception", + extra={ + "status_code": exc.status_code, + "error": error_type, + "detail": exc.detail, + "path": str(request.url.path), + "method": request.method, + }, + ) + + return _build_error_response( + error=error_type, + message=str(exc.detail), + status_code=exc.status_code, + ) + + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler( + request: Request, exc: RequestValidationError +) -> JSONResponse: + """Handle Pydantic validation errors with detailed error responses.""" + details = [ + ErrorDetail( + loc=[str(loc) for loc in error["loc"]], + msg=error["msg"], + type=error["type"], + ) + for error in exc.errors() + ] + + logger.warning( + "Validation error", + extra={ + "path": str(request.url.path), + "method": request.method, + "error_count": len(details), + }, + ) + + return _build_error_response( + error="validation_error", + message="Request validation failed", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + details=details, + ) + + +@app.exception_handler(Exception) +async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse: + """Handle unexpected exceptions with logging and safe error response.""" + # Record exception in the current span for tracing + record_exception(exc) + + logger.exception( + "Unhandled exception", + extra={ + "path": str(request.url.path), + "method": request.method, + "exception_type": type(exc).__name__, + }, + ) + + # Don't leak internal error details in production + message = "An unexpected error occurred" + if settings.debug: + message = f"{type(exc).__name__}: {exc}" + + return _build_error_response( + error="internal_error", + message=message, + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +# --------------------------------------------------------------------------- +# OpenAPI Customization +# --------------------------------------------------------------------------- + + +def custom_openapi() -> dict: + """Add JWT bearer security scheme and error responses to OpenAPI schema.""" if app.openapi_schema: return app.openapi_schema @@ -52,8 +220,12 @@ def custom_openapi() -> dict: version=app.version, description=app.description, routes=app.routes, + tags=app.openapi_tags, ) - security_schemes = openapi_schema.setdefault("components", {}).setdefault("securitySchemes", {}) + + # Add security schemes + components = openapi_schema.setdefault("components", {}) + security_schemes = components.setdefault("securitySchemes", {}) security_schemes["BearerToken"] = { "type": "http", "scheme": "bearer", @@ -61,6 +233,42 @@ def custom_openapi() -> dict: "description": "Paste the JWT access token returned by /auth endpoints", } openapi_schema["security"] = [{"BearerToken": []}] + + # Add common error response schemas + schemas = components.setdefault("schemas", {}) + schemas["ErrorResponse"] = { + "type": "object", + "properties": { + "error": {"type": "string", "description": "Error type identifier"}, + "message": {"type": "string", "description": "Human-readable error message"}, + "details": { + "type": "array", + "items": {"$ref": "#/components/schemas/ErrorDetail"}, + "nullable": True, + "description": "Validation error details", + }, + "request_id": { + "type": "string", + "nullable": True, + "description": "Trace ID for debugging", + }, + }, + "required": ["error", "message"], + } + schemas["ErrorDetail"] = { + "type": "object", + "properties": { + "loc": { + "type": "array", + "items": {"oneOf": [{"type": "string"}, {"type": "integer"}]}, + "description": "Error location path", + }, + "msg": {"type": "string", "description": "Error message"}, + "type": {"type": "string", "description": "Error type"}, + }, + "required": ["loc", "msg", "type"], + } + app.openapi_schema = openapi_schema return app.openapi_schema diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py index a546b21..71c8bd7 100644 --- a/app/schemas/__init__.py +++ b/app/schemas/__init__.py @@ -8,7 +8,7 @@ from app.schemas.auth import ( SwitchOrgRequest, TokenResponse, ) -from app.schemas.common import CursorParams, PaginatedResponse +from app.schemas.common import CursorParams, ErrorDetail, ErrorResponse, PaginatedResponse from app.schemas.incident import ( CommentRequest, IncidentCreate, @@ -35,6 +35,8 @@ __all__ = [ "TokenResponse", # Common "CursorParams", + "ErrorDetail", + "ErrorResponse", "PaginatedResponse", # Incident "CommentRequest", diff --git a/app/schemas/common.py b/app/schemas/common.py index c9f67c8..7574733 100644 --- a/app/schemas/common.py +++ b/app/schemas/common.py @@ -3,6 +3,47 @@ from pydantic import BaseModel, Field +class ErrorDetail(BaseModel): + """Individual error detail for validation errors.""" + + loc: list[str | int] = Field(description="Location of the error (field path)") + msg: str = Field(description="Error message") + type: str = Field(description="Error type identifier") + + +class ErrorResponse(BaseModel): + """Structured error response returned by all error handlers.""" + + error: str = Field(description="Error type (e.g., 'not_found', 'validation_error')") + message: str = Field(description="Human-readable error message") + details: list[ErrorDetail] | None = Field( + default=None, description="Additional error details for validation errors" + ) + request_id: str | None = Field( + default=None, description="Request trace ID for debugging" + ) + + model_config = { + "json_schema_extra": { + "examples": [ + { + "error": "not_found", + "message": "Incident not found", + "request_id": "abc123def456", + }, + { + "error": "validation_error", + "message": "Request validation failed", + "details": [ + {"loc": ["body", "title"], "msg": "Field required", "type": "missing"} + ], + "request_id": "abc123def456", + }, + ] + } + } + + class CursorParams(BaseModel): """Pagination parameters using cursor-based pagination.""" diff --git a/app/services/incident.py b/app/services/incident.py index 2874888..1d5dfa5 100644 --- a/app/services/incident.py +++ b/app/services/incident.py @@ -10,6 +10,7 @@ import asyncpg from asyncpg.pool import PoolConnectionProxy from app.api.deps import CurrentUser, ensure_org_access +from app.config import settings from app.core import exceptions as exc from app.db import Database, db from app.repositories import IncidentRepository, ServiceRepository @@ -21,7 +22,8 @@ from app.schemas.incident import ( IncidentResponse, TransitionRequest, ) - +from app.taskqueue import TaskQueue +from app.taskqueue import task_queue as default_task_queue _ALLOWED_TRANSITIONS: dict[str, set[str]] = { "triggered": {"acknowledged"}, @@ -40,8 +42,19 @@ def _as_conn(conn: asyncpg.Connection | PoolConnectionProxy) -> asyncpg.Connecti class IncidentService: """Encapsulates incident lifecycle operations within an org context.""" - def __init__(self, database: Database | None = None) -> None: + def __init__( + self, + database: Database | None = None, + task_queue: TaskQueue | None = None, + escalation_delay_seconds: int | None = None, + ) -> None: self.db = database or db + self.task_queue = task_queue or default_task_queue + self.escalation_delay_seconds = ( + escalation_delay_seconds + if escalation_delay_seconds is not None + else settings.notification_escalation_delay_seconds + ) async def create_incident( self, @@ -83,7 +96,22 @@ class IncidentService: }, ) - return IncidentResponse(**incident) + incident_response = IncidentResponse(**incident) + + self.task_queue.incident_triggered( + incident_id=incident_response.id, + org_id=current_user.org_id, + triggered_by=current_user.user_id, + ) + + if self.escalation_delay_seconds > 0: + self.task_queue.schedule_escalation_check( + incident_id=incident_response.id, + org_id=current_user.org_id, + delay_seconds=self.escalation_delay_seconds, + ) + + return incident_response async def get_incidents( self, diff --git a/app/taskqueue.py b/app/taskqueue.py new file mode 100644 index 0000000..f254688 --- /dev/null +++ b/app/taskqueue.py @@ -0,0 +1,188 @@ +"""Task queue abstractions for scheduling background work.""" + +from __future__ import annotations + +import asyncio +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any +from uuid import UUID + +from app.config import settings + +try: + from worker.celery_app import celery_app +except Exception: # pragma: no cover - celery app may not import during docs builds + celery_app = None # type: ignore[assignment] + + +class TaskQueue(ABC): + """Interface for enqueueing background work.""" + + async def startup(self) -> None: # pragma: no cover - default no-op + """Hook for queue initialization.""" + + async def shutdown(self) -> None: # pragma: no cover - default no-op + """Hook for queue teardown.""" + + async def ping(self) -> bool: + """Check if the queue backend is reachable.""" + + return True + + def reset(self) -> None: # pragma: no cover - optional for in-memory impls + """Reset any in-memory state (used in tests).""" + + @abstractmethod + def incident_triggered( + self, + *, + incident_id: UUID, + org_id: UUID, + triggered_by: UUID | None, + ) -> None: + """Fan out an incident triggered notification.""" + + @abstractmethod + def schedule_escalation_check( + self, + *, + incident_id: UUID, + org_id: UUID, + delay_seconds: int, + ) -> None: + """Schedule a delayed escalation check.""" + + +class CeleryTaskQueue(TaskQueue): + """Celery-backed task queue that can use Redis or SQS brokers.""" + + def __init__(self, default_queue: str, critical_queue: str) -> None: + if celery_app is None: # pragma: no cover - guarded by try/except + raise RuntimeError("Celery application is unavailable") + self._celery = celery_app + self._default_queue = default_queue + self._critical_queue = critical_queue + + def incident_triggered( + self, + *, + incident_id: UUID, + org_id: UUID, + triggered_by: UUID | None, + ) -> None: + self._celery.send_task( + "worker.tasks.notifications.incident_triggered", + kwargs={ + "incident_id": str(incident_id), + "org_id": str(org_id), + "triggered_by": str(triggered_by) if triggered_by else None, + }, + queue=self._default_queue, + ) + + def schedule_escalation_check( + self, + *, + incident_id: UUID, + org_id: UUID, + delay_seconds: int, + ) -> None: + self._celery.send_task( + "worker.tasks.notifications.escalate_if_unacked", + kwargs={ + "incident_id": str(incident_id), + "org_id": str(org_id), + }, + countdown=max(delay_seconds, 0), + queue=self._critical_queue, + ) + + async def ping(self) -> bool: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self._ping_sync) + + def _ping_sync(self) -> bool: + connection = self._celery.connection() + try: + connection.connect() + return True + except Exception: + return False + finally: + try: + connection.release() + except Exception: # pragma: no cover - release best effort + pass + + +@dataclass +class InMemoryTaskQueue(TaskQueue): + """Test-friendly queue that records dispatched tasks in memory.""" + + dispatched: list[tuple[str, dict[str, Any]]] | None = None + + def __post_init__(self) -> None: + if self.dispatched is None: + self.dispatched = [] + + def incident_triggered( + self, + *, + incident_id: UUID, + org_id: UUID, + triggered_by: UUID | None, + ) -> None: + self.dispatched.append( + ( + "incident_triggered", + { + "incident_id": incident_id, + "org_id": org_id, + "triggered_by": triggered_by, + }, + ) + ) + + def schedule_escalation_check( + self, + *, + incident_id: UUID, + org_id: UUID, + delay_seconds: int, + ) -> None: + self.dispatched.append( + ( + "escalate_if_unacked", + { + "incident_id": incident_id, + "org_id": org_id, + "delay_seconds": delay_seconds, + }, + ) + ) + + def reset(self) -> None: + if self.dispatched is not None: + self.dispatched.clear() + + +def _build_task_queue() -> TaskQueue: + if settings.task_queue_driver == "inmemory": + return InMemoryTaskQueue() + + return CeleryTaskQueue( + default_queue=settings.task_queue_default_queue, + critical_queue=settings.task_queue_critical_queue, + ) + + +task_queue = _build_task_queue() + + +__all__ = [ + "CeleryTaskQueue", + "InMemoryTaskQueue", + "TaskQueue", + "task_queue", +] diff --git a/docker-compose.yml b/docker-compose.yml index 62deaa2..16e7c1e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,7 @@ services: container_name: incidentops-api ports: - "8000:8000" + - "9464:9464" # Prometheus metrics environment: DATABASE_URL: postgresql://incidentops:incidentops@postgres:5432/incidentops REDIS_URL: redis://redis:6379/0 @@ -48,11 +49,24 @@ services: JWT_ALGORITHM: HS256 ACCESS_TOKEN_EXPIRE_MINUTES: 30 REFRESH_TOKEN_EXPIRE_DAYS: 30 + # OpenTelemetry + OTEL_ENABLED: "true" + OTEL_SERVICE_NAME: incidentops-api + OTEL_ENVIRONMENT: development + OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317 + OTEL_EXPORTER_OTLP_INSECURE: "true" + OTEL_LOG_LEVEL: INFO + # Metrics + PROMETHEUS_PORT: "9464" depends_on: postgres: condition: service_healthy redis: condition: service_healthy + otel-collector: + condition: service_started + prometheus: + condition: service_started healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/v1/healthz"] interval: 30s @@ -72,6 +86,12 @@ services: REDIS_URL: redis://redis:6379/0 CELERY_BROKER_URL: redis://redis:6379/0 CELERY_RESULT_BACKEND: redis://redis:6379/1 + # OpenTelemetry + OTEL_ENABLED: "true" + OTEL_SERVICE_NAME: incidentops-worker + OTEL_ENVIRONMENT: development + OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317 + OTEL_EXPORTER_OTLP_INSECURE: "true" depends_on: postgres: condition: service_healthy @@ -121,9 +141,89 @@ services: profiles: - monitoring + # ============================================ + # Observability Stack + # ============================================ + + # OpenTelemetry Collector - receives traces/logs from apps + otel-collector: + image: otel/opentelemetry-collector-contrib:0.96.0 + container_name: incidentops-otel-collector + command: ["--config=/etc/otel-collector/config.yaml"] + volumes: + - ./observability/otel-collector/config.yaml:/etc/otel-collector/config.yaml:ro + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + depends_on: + - tempo + - loki + + # Tempo - distributed tracing backend + tempo: + image: grafana/tempo:2.4.1 + container_name: incidentops-tempo + command: ["-config.file=/etc/tempo/config.yaml"] + volumes: + - ./observability/tempo/config.yaml:/etc/tempo/config.yaml:ro + - tempo_data:/var/tempo + ports: + - "3200:3200" # Tempo HTTP + - "4320:4317" # Tempo OTLP gRPC (different host port to avoid conflict) + + # Loki - log aggregation + loki: + image: grafana/loki:2.9.6 + container_name: incidentops-loki + command: ["-config.file=/etc/loki/config.yaml"] + volumes: + - ./observability/loki/config.yaml:/etc/loki/config.yaml:ro + - loki_data:/loki + ports: + - "3100:3100" # Loki HTTP + + # Prometheus - metrics storage + prometheus: + image: prom/prometheus:v2.51.0 + container_name: incidentops-prometheus + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.enable-lifecycle" + volumes: + - ./observability/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus_data:/prometheus + ports: + - "9090:9090" # Prometheus UI + + # Grafana - visualization + grafana: + image: grafana/grafana:10.4.1 + container_name: incidentops-grafana + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: admin + GF_USERS_ALLOW_SIGN_UP: "false" + GF_EXPLORE_ENABLED: "true" + GF_FEATURE_TOGGLES_ENABLE: traceqlEditor tempoSearch tempoBackendSearch tempoApmTable + volumes: + - ./observability/grafana/provisioning:/etc/grafana/provisioning:ro + - ./observability/grafana/dashboards:/var/lib/grafana/dashboards:ro + - grafana_data:/var/lib/grafana + ports: + - "3001:3000" # Grafana UI (3001 to avoid conflict with web frontend) + depends_on: + - tempo + - loki + - prometheus + volumes: postgres_data: redis_data: + tempo_data: + loki_data: + prometheus_data: + grafana_data: networks: default: diff --git a/helm/incidentops/templates/api-deployment.yaml b/helm/incidentops/templates/api-deployment.yaml index 9b77d7f..6195e91 100644 --- a/helm/incidentops/templates/api-deployment.yaml +++ b/helm/incidentops/templates/api-deployment.yaml @@ -29,6 +29,29 @@ spec: serviceAccountName: {{ include "incidentops.serviceAccountName" . }} securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} + initContainers: + - name: wait-for-postgres + image: busybox:1.36 + command: + - sh + - -c + - | + until nc -z {{ include "incidentops.fullname" . }}-postgresql 5432; do + echo "Waiting for PostgreSQL..." + sleep 2 + done + echo "PostgreSQL is ready" + - name: wait-for-redis + image: busybox:1.36 + command: + - sh + - -c + - | + until nc -z {{ include "incidentops.fullname" . }}-redis 6379; do + echo "Waiting for Redis..." + sleep 2 + done + echo "Redis is ready" containers: - name: api securityContext: @@ -39,6 +62,11 @@ spec: - name: http containerPort: 8000 protocol: TCP + {{- if .Values.metrics.enabled }} + - name: metrics + containerPort: {{ .Values.metrics.port }} + protocol: TCP + {{- end }} envFrom: - configMapRef: name: {{ include "incidentops.fullname" . }}-config diff --git a/helm/incidentops/templates/api-service.yaml b/helm/incidentops/templates/api-service.yaml index a51ac78..076c8b9 100644 --- a/helm/incidentops/templates/api-service.yaml +++ b/helm/incidentops/templates/api-service.yaml @@ -11,5 +11,11 @@ spec: targetPort: http protocol: TCP name: http + {{- if .Values.metrics.enabled }} + - port: {{ .Values.metrics.port }} + targetPort: metrics + protocol: TCP + name: metrics + {{- end }} selector: {{- include "incidentops.api.selectorLabels" . | nindent 4 }} diff --git a/helm/incidentops/templates/configmap.yaml b/helm/incidentops/templates/configmap.yaml index 994b8ba..af6b9f6 100644 --- a/helm/incidentops/templates/configmap.yaml +++ b/helm/incidentops/templates/configmap.yaml @@ -8,3 +8,16 @@ data: JWT_ALGORITHM: {{ .Values.config.jwtAlgorithm | quote }} ACCESS_TOKEN_EXPIRE_MINUTES: {{ .Values.config.accessTokenExpireMinutes | quote }} REFRESH_TOKEN_EXPIRE_DAYS: {{ .Values.config.refreshTokenExpireDays | quote }} + # OpenTelemetry configuration + OTEL_ENABLED: {{ .Values.observability.enabled | quote }} + OTEL_SERVICE_NAME: "incidentops-api" + OTEL_ENVIRONMENT: {{ .Values.config.environment | default "production" | quote }} + {{- if .Values.observability.enabled }} + OTEL_EXPORTER_OTLP_ENDPOINT: "http://{{ include "incidentops.fullname" . }}-otel-collector:4317" + {{- end }} + OTEL_EXPORTER_OTLP_INSECURE: "true" + OTEL_LOG_LEVEL: {{ .Values.config.logLevel | default "INFO" | quote }} + # Metrics configuration + {{- if .Values.metrics.enabled }} + PROMETHEUS_PORT: {{ .Values.metrics.port | quote }} + {{- end }} diff --git a/helm/incidentops/templates/grafana-deployment.yaml b/helm/incidentops/templates/grafana-deployment.yaml new file mode 100644 index 0000000..a56db77 --- /dev/null +++ b/helm/incidentops/templates/grafana-deployment.yaml @@ -0,0 +1,387 @@ +{{- if .Values.observability.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-grafana-datasources + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +data: + datasources.yaml: | + apiVersion: 1 + datasources: + - name: Prometheus + type: prometheus + uid: prometheus + url: http://{{ include "incidentops.fullname" . }}-prometheus:9090 + access: proxy + isDefault: false + jsonData: + httpMethod: POST + exemplarTraceIdDestinations: + - name: trace_id + datasourceUid: tempo + + - name: Tempo + type: tempo + uid: tempo + url: http://{{ include "incidentops.fullname" . }}-tempo:3200 + access: proxy + isDefault: false + jsonData: + tracesToLogsV2: + datasourceUid: loki + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + filterByTraceID: true + filterBySpanID: true + tracesToMetrics: + datasourceUid: prometheus + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + serviceMap: + datasourceUid: prometheus + nodeGraph: + enabled: true + lokiSearch: + datasourceUid: loki + + - name: Loki + type: loki + uid: loki + url: http://{{ include "incidentops.fullname" . }}-loki:3100 + access: proxy + isDefault: true + jsonData: + derivedFields: + - datasourceUid: tempo + matcherRegex: '"trace_id":"([a-f0-9]+)"' + name: TraceID + url: '$${__value.raw}' + urlDisplayLabel: 'View Trace' +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-grafana-dashboards-provider + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +data: + dashboards.yaml: | + apiVersion: 1 + providers: + - name: 'default' + orgId: 1 + folder: 'IncidentOps' + folderUid: 'incidentops' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-grafana-dashboards + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +data: + api-overview.json: | + { + "title": "IncidentOps API Overview", + "uid": "incidentops-api", + "tags": ["incidentops", "api"], + "timezone": "browser", + "editable": true, + "panels": [ + { + "id": 1, + "title": "Request Rate", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 0, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "Requests/sec", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + }, + { + "id": 2, + "title": "Request Duration (p50, p95, p99)", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 8, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.50, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.95, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p95", + "refId": "B" + }, + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.99, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p99", + "refId": "C" + } + ], + "fieldConfig": { + "defaults": { + "unit": "s" + } + } + }, + { + "id": 3, + "title": "Error Rate", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 16, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\", http_status_code=~\"5..\"}[1m])) / sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m])) * 100", + "legendFormat": "Error %", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100 + } + } + }, + { + "id": 4, + "title": "Requests by Status Code", + "type": "timeseries", + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum by (http_status_code) (rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "{{ "{{" }}http_status_code{{ "}}" }}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + }, + { + "id": 5, + "title": "Requests by Endpoint", + "type": "timeseries", + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum by (http_route) (rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "{{ "{{" }}http_route{{ "}}" }}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps" + } + } + }, + { + "id": 6, + "title": "Recent Logs", + "type": "logs", + "gridPos": {"h": 10, "w": 24, "x": 0, "y": 16}, + "targets": [ + { + "datasource": {"type": "loki", "uid": "loki"}, + "expr": "{service_name=\"incidentops-api\"} | json", + "refId": "A" + } + ], + "options": { + "showTime": true, + "showLabels": true, + "wrapLogMessage": true, + "enableLogDetails": true, + "sortOrder": "Descending" + } + }, + { + "id": 7, + "title": "Recent Traces", + "type": "traces", + "gridPos": {"h": 10, "w": 24, "x": 0, "y": 26}, + "targets": [ + { + "datasource": {"type": "tempo", "uid": "tempo"}, + "queryType": "traceqlSearch", + "filters": [ + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": ["incidentops-api"] + } + ], + "refId": "A" + } + ] + } + ], + "schemaVersion": 38, + "version": 2 + } +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "incidentops.fullname" . }}-grafana + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +spec: + replicas: 1 + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: grafana + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: grafana + annotations: + checksum/datasources: {{ .Values.observability.grafana.image.tag | sha256sum }} + spec: + securityContext: + fsGroup: 472 + runAsUser: 472 + containers: + - name: grafana + image: "{{ .Values.observability.grafana.image.repository }}:{{ .Values.observability.grafana.image.tag }}" + imagePullPolicy: {{ .Values.observability.grafana.image.pullPolicy }} + ports: + - name: http + containerPort: 3000 + protocol: TCP + env: + - name: GF_SECURITY_ADMIN_USER + value: {{ .Values.observability.grafana.adminUser | quote }} + - name: GF_SECURITY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ include "incidentops.fullname" . }}-grafana + key: admin-password + - name: GF_USERS_ALLOW_SIGN_UP + value: "false" + - name: GF_EXPLORE_ENABLED + value: "true" + - name: GF_FEATURE_TOGGLES_ENABLE + value: "traceqlEditor tempoSearch tempoBackendSearch tempoApmTable" + volumeMounts: + - name: datasources + mountPath: /etc/grafana/provisioning/datasources + - name: dashboards-provider + mountPath: /etc/grafana/provisioning/dashboards + - name: dashboards + mountPath: /var/lib/grafana/dashboards + - name: data + mountPath: /var/lib/grafana + resources: + {{- toYaml .Values.observability.grafana.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /api/health + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /api/health + port: http + initialDelaySeconds: 30 + periodSeconds: 30 + volumes: + - name: datasources + configMap: + name: {{ include "incidentops.fullname" . }}-grafana-datasources + - name: dashboards-provider + configMap: + name: {{ include "incidentops.fullname" . }}-grafana-dashboards-provider + - name: dashboards + configMap: + name: {{ include "incidentops.fullname" . }}-grafana-dashboards + - name: data + {{- if .Values.observability.grafana.persistence.enabled }} + persistentVolumeClaim: + claimName: {{ include "incidentops.fullname" . }}-grafana + {{- else }} + emptyDir: {} + {{- end }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "incidentops.fullname" . }}-grafana + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +type: Opaque +data: + admin-password: {{ .Values.observability.grafana.adminPassword | b64enc | quote }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "incidentops.fullname" . }}-grafana + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +spec: + type: {{ .Values.observability.grafana.service.type }} + ports: + - name: http + port: 80 + targetPort: http + protocol: TCP + selector: + {{- include "incidentops.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: grafana +{{- if .Values.observability.grafana.persistence.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "incidentops.fullname" . }}-grafana + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.observability.grafana.persistence.size }} +{{- end }} +{{- end }} diff --git a/helm/incidentops/templates/grafana-ingress.yaml b/helm/incidentops/templates/grafana-ingress.yaml new file mode 100644 index 0000000..09c44d3 --- /dev/null +++ b/helm/incidentops/templates/grafana-ingress.yaml @@ -0,0 +1,38 @@ +{{- if and .Values.observability.enabled .Values.observability.grafana.ingress.enabled -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "incidentops.fullname" . }}-grafana + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: grafana + {{- with .Values.observability.grafana.ingress.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + {{- if .Values.ingress.className }} + ingressClassName: {{ .Values.ingress.className }} + {{- end }} + {{- if .Values.observability.grafana.ingress.tls }} + tls: + {{- range .Values.observability.grafana.ingress.tls }} + - hosts: + {{- range .hosts }} + - {{ . | quote }} + {{- end }} + secretName: {{ .secretName }} + {{- end }} + {{- end }} + rules: + - host: {{ .Values.observability.grafana.ingress.host | quote }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "incidentops.fullname" . }}-grafana + port: + number: 80 +{{- end }} diff --git a/helm/incidentops/templates/loki-deployment.yaml b/helm/incidentops/templates/loki-deployment.yaml new file mode 100644 index 0000000..b5d8c3c --- /dev/null +++ b/helm/incidentops/templates/loki-deployment.yaml @@ -0,0 +1,155 @@ +{{- if .Values.observability.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-loki-config + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: loki +data: + loki.yaml: | + auth_enabled: false + + server: + http_listen_port: 3100 + grpc_listen_port: 9096 + + common: + path_prefix: /loki + storage: + filesystem: + chunks_directory: /loki/chunks + rules_directory: /loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + + query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + + schema_config: + configs: + - from: "2020-10-24" + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + + ruler: + alertmanager_url: http://localhost:9093 + + limits_config: + retention_period: {{ .Values.observability.loki.retention }} + allow_structured_metadata: true + volume_enabled: true +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "incidentops.fullname" . }}-loki + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: loki +spec: + replicas: 1 + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: loki + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: loki + annotations: + checksum/config: {{ .Values.observability.loki.image.tag | sha256sum }} + spec: + containers: + - name: loki + image: "{{ .Values.observability.loki.image.repository }}:{{ .Values.observability.loki.image.tag }}" + imagePullPolicy: {{ .Values.observability.loki.image.pullPolicy }} + args: + - -config.file=/etc/loki/loki.yaml + ports: + - name: http + containerPort: 3100 + protocol: TCP + - name: grpc + containerPort: 9096 + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/loki + - name: data + mountPath: /loki + resources: + {{- toYaml .Values.observability.loki.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /ready + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /ready + port: http + initialDelaySeconds: 30 + periodSeconds: 30 + volumes: + - name: config + configMap: + name: {{ include "incidentops.fullname" . }}-loki-config + - name: data + {{- if .Values.observability.loki.persistence.enabled }} + persistentVolumeClaim: + claimName: {{ include "incidentops.fullname" . }}-loki + {{- else }} + emptyDir: {} + {{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "incidentops.fullname" . }}-loki + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: loki +spec: + type: ClusterIP + ports: + - name: http + port: 3100 + targetPort: http + protocol: TCP + - name: grpc + port: 9096 + targetPort: grpc + protocol: TCP + selector: + {{- include "incidentops.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: loki +{{- if .Values.observability.loki.persistence.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "incidentops.fullname" . }}-loki + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: loki +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.observability.loki.persistence.size }} +{{- end }} +{{- end }} diff --git a/helm/incidentops/templates/migrate-job.yaml b/helm/incidentops/templates/migrate-job.yaml index 2864bfe..6d62278 100644 --- a/helm/incidentops/templates/migrate-job.yaml +++ b/helm/incidentops/templates/migrate-job.yaml @@ -30,9 +30,11 @@ spec: - name: migrate securityContext: {{- toYaml .Values.securityContext | nindent 12 }} - image: {{ include "incidentops.api.image" . }} + image: "{{ .Values.migration.image.repository }}:{{ .Values.migration.image.tag }}" imagePullPolicy: {{ .Values.migration.image.pullPolicy }} command: + - uv + - run - python - migrations/migrate.py - apply diff --git a/helm/incidentops/templates/otel-collector-deployment.yaml b/helm/incidentops/templates/otel-collector-deployment.yaml new file mode 100644 index 0000000..abc8138 --- /dev/null +++ b/helm/incidentops/templates/otel-collector-deployment.yaml @@ -0,0 +1,132 @@ +{{- if .Values.observability.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-otel-collector-config + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: otel-collector +data: + otel-collector-config.yaml: | + extensions: + health_check: + endpoint: 0.0.0.0:13133 + + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + + processors: + batch: + timeout: 1s + send_batch_size: 1024 + memory_limiter: + check_interval: 1s + limit_mib: 512 + spike_limit_mib: 128 + + exporters: + otlp/tempo: + endpoint: {{ include "incidentops.fullname" . }}-tempo:4317 + tls: + insecure: true + loki: + endpoint: http://{{ include "incidentops.fullname" . }}-loki:3100/loki/api/v1/push + default_labels_enabled: + exporter: true + job: true + + service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlp/tempo] + logs: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [loki] +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "incidentops.fullname" . }}-otel-collector + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: otel-collector +spec: + replicas: {{ .Values.observability.otelCollector.replicaCount }} + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: otel-collector + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: otel-collector + annotations: + checksum/config: {{ .Values.observability.otelCollector.image.tag | sha256sum }} + spec: + containers: + - name: otel-collector + image: "{{ .Values.observability.otelCollector.image.repository }}:{{ .Values.observability.otelCollector.image.tag }}" + imagePullPolicy: {{ .Values.observability.otelCollector.image.pullPolicy }} + args: + - --config=/etc/otel-collector/otel-collector-config.yaml + ports: + - name: otlp-grpc + containerPort: 4317 + protocol: TCP + - name: otlp-http + containerPort: 4318 + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/otel-collector + resources: + {{- toYaml .Values.observability.otelCollector.resources | nindent 12 }} + livenessProbe: + httpGet: + path: / + port: 13133 + initialDelaySeconds: 10 + periodSeconds: 30 + readinessProbe: + httpGet: + path: / + port: 13133 + initialDelaySeconds: 5 + periodSeconds: 10 + volumes: + - name: config + configMap: + name: {{ include "incidentops.fullname" . }}-otel-collector-config +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "incidentops.fullname" . }}-otel-collector + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: otel-collector +spec: + type: ClusterIP + ports: + - name: otlp-grpc + port: 4317 + targetPort: otlp-grpc + protocol: TCP + - name: otlp-http + port: 4318 + targetPort: otlp-http + protocol: TCP + selector: + {{- include "incidentops.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: otel-collector +{{- end }} diff --git a/helm/incidentops/templates/prometheus-deployment.yaml b/helm/incidentops/templates/prometheus-deployment.yaml new file mode 100644 index 0000000..90310da --- /dev/null +++ b/helm/incidentops/templates/prometheus-deployment.yaml @@ -0,0 +1,163 @@ +{{- if and .Values.observability.enabled .Values.metrics.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +data: + prometheus.yml: | + global: + scrape_interval: {{ .Values.observability.prometheus.scrapeInterval | default "15s" }} + evaluation_interval: 15s + + scrape_configs: + - job_name: "prometheus" + static_configs: + - targets: ["localhost:9090"] + + - job_name: "incidentops-api" + kubernetes_sd_configs: + - role: pod + namespaces: + names: + - {{ .Release.Namespace }} + relabel_configs: + - source_labels: [__meta_kubernetes_pod_label_app_kubernetes_io_component] + action: keep + regex: api + - source_labels: [__meta_kubernetes_pod_container_port_name] + action: keep + regex: metrics + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + metrics_path: /metrics + scrape_interval: 10s + + - job_name: "incidentops-worker" + kubernetes_sd_configs: + - role: pod + namespaces: + names: + - {{ .Release.Namespace }} + relabel_configs: + - source_labels: [__meta_kubernetes_pod_label_app_kubernetes_io_component] + action: keep + regex: worker + - source_labels: [__meta_kubernetes_pod_container_port_name] + action: keep + regex: metrics + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + metrics_path: /metrics + scrape_interval: 10s +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +spec: + replicas: 1 + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: prometheus + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: prometheus + annotations: + checksum/config: {{ .Values.observability.prometheus.image.tag | sha256sum }} + spec: + serviceAccountName: {{ include "incidentops.serviceAccountName" . }} + securityContext: + fsGroup: 65534 + runAsUser: 65534 + runAsNonRoot: true + containers: + - name: prometheus + image: "{{ .Values.observability.prometheus.image.repository }}:{{ .Values.observability.prometheus.image.tag }}" + imagePullPolicy: {{ .Values.observability.prometheus.image.pullPolicy }} + args: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--storage.tsdb.retention.time={{ .Values.observability.prometheus.retention }}" + - "--web.enable-lifecycle" + ports: + - name: http + containerPort: 9090 + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/prometheus + - name: data + mountPath: /prometheus + resources: + {{- toYaml .Values.observability.prometheus.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /-/ready + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /-/healthy + port: http + initialDelaySeconds: 30 + periodSeconds: 30 + volumes: + - name: config + configMap: + name: {{ include "incidentops.fullname" . }}-prometheus + - name: data + {{- if .Values.observability.prometheus.persistence.enabled }} + persistentVolumeClaim: + claimName: {{ include "incidentops.fullname" . }}-prometheus + {{- else }} + emptyDir: {} + {{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +spec: + type: ClusterIP + ports: + - name: http + port: 9090 + targetPort: http + protocol: TCP + selector: + {{- include "incidentops.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +{{- if .Values.observability.prometheus.persistence.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.observability.prometheus.persistence.size }} +{{- end }} +{{- end }} diff --git a/helm/incidentops/templates/prometheus-rbac.yaml b/helm/incidentops/templates/prometheus-rbac.yaml new file mode 100644 index 0000000..d98b693 --- /dev/null +++ b/helm/incidentops/templates/prometheus-rbac.yaml @@ -0,0 +1,29 @@ +{{- if and .Values.observability.enabled .Values.metrics.enabled }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +rules: + - apiGroups: [""] + resources: ["pods", "endpoints", "services"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ include "incidentops.fullname" . }}-prometheus + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: prometheus +subjects: + - kind: ServiceAccount + name: {{ include "incidentops.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ include "incidentops.fullname" . }}-prometheus +{{- end }} diff --git a/helm/incidentops/templates/promtail-daemonset.yaml b/helm/incidentops/templates/promtail-daemonset.yaml new file mode 100644 index 0000000..e8ee29e --- /dev/null +++ b/helm/incidentops/templates/promtail-daemonset.yaml @@ -0,0 +1,169 @@ +{{- if and .Values.observability.enabled .Values.observability.promtail.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-promtail-config + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: promtail +data: + promtail.yaml: | + server: + http_listen_port: 3101 + grpc_listen_port: 0 + + positions: + filename: /run/promtail/positions.yaml + + clients: + - url: http://{{ include "incidentops.fullname" . }}-loki:3100/loki/api/v1/push + + scrape_configs: + - job_name: kubernetes-pods + pipeline_stages: + - cri: {} + kubernetes_sd_configs: + - role: pod + namespaces: + names: [{{ .Release.Namespace }}] + relabel_configs: + - source_labels: [__meta_kubernetes_pod_container_init] + regex: "true" + action: drop + - source_labels: [__meta_kubernetes_pod_phase] + regex: Pending|Failed|Succeeded + action: drop + - source_labels: [__meta_kubernetes_pod_name, __meta_kubernetes_pod_namespace, __meta_kubernetes_pod_container_name] + target_label: __path__ + replacement: /var/log/containers/$1_$2_$3-*.log + - source_labels: [__meta_kubernetes_pod_label_app_kubernetes_io_component] + regex: (.*) + target_label: service_name + replacement: {{ include "incidentops.fullname" . }}-$1 + - source_labels: [__meta_kubernetes_pod_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_pod_container_name] + target_label: container + - source_labels: [__meta_kubernetes_pod_uid] + target_label: pod_uid + - target_label: cluster + replacement: {{ .Release.Namespace }} + + - job_name: containers-fallback + pipeline_stages: + - cri: {} + static_configs: + - labels: + job: containers + namespace: {{ .Release.Namespace }} + service_name: incidentops-api + __path__: /var/log/containers/incidentops-api-*_incidentops_api-*.log + - labels: + job: containers + namespace: {{ .Release.Namespace }} + service_name: incidentops-worker + __path__: /var/log/containers/incidentops-worker-*_incidentops_worker-*.log +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "incidentops.fullname" . }}-promtail + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: promtail +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "incidentops.fullname" . }}-promtail + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: promtail +rules: + - apiGroups: [""] + resources: ["pods", "pods/log", "namespaces", "services", "endpoints", "nodes"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "incidentops.fullname" . }}-promtail + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: promtail +subjects: + - kind: ServiceAccount + name: {{ include "incidentops.fullname" . }}-promtail + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ include "incidentops.fullname" . }}-promtail +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{ include "incidentops.fullname" . }}-promtail + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: promtail +spec: + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: promtail + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: promtail + annotations: + checksum/config: {{ .Values.observability.promtail.image.tag | sha256sum }} + spec: + serviceAccountName: {{ include "incidentops.fullname" . }}-promtail + securityContext: + runAsUser: 0 + containers: + - name: promtail + image: "{{ .Values.observability.promtail.image.repository }}:{{ .Values.observability.promtail.image.tag }}" + imagePullPolicy: {{ .Values.observability.promtail.image.pullPolicy }} + args: + - -config.file=/etc/promtail/promtail.yaml + ports: + - name: http-metrics + containerPort: 3101 + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/promtail + - name: positions + mountPath: /run/promtail + - name: varlog + mountPath: /var/log + readOnly: true + - name: varlogpods + mountPath: /var/log/pods + readOnly: true + - name: varlogcontainers + mountPath: /var/log/containers + readOnly: true + resources: + {{- toYaml .Values.observability.promtail.resources | nindent 12 }} + volumes: + - name: config + configMap: + name: {{ include "incidentops.fullname" . }}-promtail-config + - name: positions + emptyDir: {} + - name: varlog + hostPath: + path: /var/log + - name: varlogpods + hostPath: + path: /var/log/pods + - name: varlogcontainers + hostPath: + path: /var/log/containers +{{- end }} diff --git a/helm/incidentops/templates/tempo-deployment.yaml b/helm/incidentops/templates/tempo-deployment.yaml new file mode 100644 index 0000000..18def60 --- /dev/null +++ b/helm/incidentops/templates/tempo-deployment.yaml @@ -0,0 +1,153 @@ +{{- if .Values.observability.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "incidentops.fullname" . }}-tempo-config + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: tempo +data: + tempo.yaml: | + server: + http_listen_port: 3200 + + distributor: + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + + ingester: + trace_idle_period: 10s + max_block_bytes: 1048576 + max_block_duration: 5m + + compactor: + compaction: + block_retention: {{ .Values.observability.tempo.retention }} + + storage: + trace: + backend: local + local: + path: /var/tempo/traces + wal: + path: /var/tempo/wal + + querier: + search: + query_timeout: 30s +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "incidentops.fullname" . }}-tempo + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: tempo +spec: + replicas: 1 + selector: + matchLabels: + {{- include "incidentops.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: tempo + template: + metadata: + labels: + {{- include "incidentops.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: tempo + annotations: + checksum/config: {{ .Values.observability.tempo.image.tag | sha256sum }} + spec: + containers: + - name: tempo + image: "{{ .Values.observability.tempo.image.repository }}:{{ .Values.observability.tempo.image.tag }}" + imagePullPolicy: {{ .Values.observability.tempo.image.pullPolicy }} + args: + - -config.file=/etc/tempo/tempo.yaml + ports: + - name: http + containerPort: 3200 + protocol: TCP + - name: otlp-grpc + containerPort: 4317 + protocol: TCP + - name: otlp-http + containerPort: 4318 + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/tempo + - name: data + mountPath: /var/tempo + resources: + {{- toYaml .Values.observability.tempo.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /ready + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /ready + port: http + initialDelaySeconds: 30 + periodSeconds: 30 + volumes: + - name: config + configMap: + name: {{ include "incidentops.fullname" . }}-tempo-config + - name: data + {{- if .Values.observability.tempo.persistence.enabled }} + persistentVolumeClaim: + claimName: {{ include "incidentops.fullname" . }}-tempo + {{- else }} + emptyDir: {} + {{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "incidentops.fullname" . }}-tempo + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: tempo +spec: + type: ClusterIP + ports: + - name: http + port: 3200 + targetPort: http + protocol: TCP + - name: otlp-grpc + port: 4317 + targetPort: otlp-grpc + protocol: TCP + - name: otlp-http + port: 4318 + targetPort: otlp-http + protocol: TCP + selector: + {{- include "incidentops.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: tempo +{{- if .Values.observability.tempo.persistence.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "incidentops.fullname" . }}-tempo + labels: + {{- include "incidentops.labels" . | nindent 4 }} + app.kubernetes.io/component: tempo +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.observability.tempo.persistence.size }} +{{- end }} +{{- end }} diff --git a/helm/incidentops/templates/worker-deployment.yaml b/helm/incidentops/templates/worker-deployment.yaml index 9ad4950..eb9d21a 100644 --- a/helm/incidentops/templates/worker-deployment.yaml +++ b/helm/incidentops/templates/worker-deployment.yaml @@ -29,6 +29,29 @@ spec: serviceAccountName: {{ include "incidentops.serviceAccountName" . }} securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} + initContainers: + - name: wait-for-postgres + image: busybox:1.36 + command: + - sh + - -c + - | + until nc -z {{ include "incidentops.fullname" . }}-postgresql 5432; do + echo "Waiting for PostgreSQL..." + sleep 2 + done + echo "PostgreSQL is ready" + - name: wait-for-redis + image: busybox:1.36 + command: + - sh + - -c + - | + until nc -z {{ include "incidentops.fullname" . }}-redis 6379; do + echo "Waiting for Redis..." + sleep 2 + done + echo "Redis is ready" containers: - name: worker securityContext: @@ -36,6 +59,8 @@ spec: image: {{ include "incidentops.worker.image" . }} imagePullPolicy: {{ .Values.worker.image.pullPolicy }} command: + - uv + - run - celery - -A - worker.celery_app @@ -52,6 +77,8 @@ spec: livenessProbe: exec: command: + - uv + - run - celery - -A - worker.celery_app diff --git a/helm/incidentops/values-production.yaml b/helm/incidentops/values-production.yaml index 11be1f4..d9fc3b3 100644 --- a/helm/incidentops/values-production.yaml +++ b/helm/incidentops/values-production.yaml @@ -80,3 +80,63 @@ redis: limits: cpu: 1000m memory: 1Gi + +# Application configuration +config: + environment: production + logLevel: INFO + +# Observability Stack - Production settings +observability: + enabled: true + + otelCollector: + replicaCount: 2 + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + + tempo: + retention: "720h" # 30 days + persistence: + enabled: true + size: 50Gi + resources: + requests: + cpu: 250m + memory: 512Mi + limits: + cpu: 1000m + memory: 2Gi + + loki: + retention: "720h" # 30 days + persistence: + enabled: true + size: 100Gi + resources: + requests: + cpu: 250m + memory: 512Mi + limits: + cpu: 1000m + memory: 2Gi + + grafana: + adminPassword: "" # Set via external secret in production + service: + type: ClusterIP + persistence: + enabled: true + size: 5Gi + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi diff --git a/helm/incidentops/values.yaml b/helm/incidentops/values.yaml index d883393..fc12315 100644 --- a/helm/incidentops/values.yaml +++ b/helm/incidentops/values.yaml @@ -106,6 +106,8 @@ config: jwtAlgorithm: HS256 accessTokenExpireMinutes: 30 refreshTokenExpireDays: 30 + environment: development + logLevel: INFO # Secrets (use external secrets in production) secrets: @@ -161,3 +163,117 @@ podSecurityContext: securityContext: runAsNonRoot: true runAsUser: 1000 + +# Observability Stack (Grafana + Loki + Tempo + OpenTelemetry Collector) +observability: + enabled: true + + otelCollector: + replicaCount: 1 + image: + repository: otel/opentelemetry-collector-contrib + tag: "0.96.0" + pullPolicy: IfNotPresent + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 200m + memory: 256Mi + + tempo: + image: + repository: grafana/tempo + tag: "2.4.1" + pullPolicy: IfNotPresent + retention: "168h" # 7 days + persistence: + enabled: false + size: 10Gi + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + + loki: + image: + repository: grafana/loki + tag: "2.9.6" + pullPolicy: IfNotPresent + retention: "168h" # 7 days + persistence: + enabled: false + size: 10Gi + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + + prometheus: + image: + repository: prom/prometheus + tag: "v2.51.0" + pullPolicy: IfNotPresent + retention: "15d" + scrapeInterval: "15s" + persistence: + enabled: false + size: 10Gi + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + + grafana: + image: + repository: grafana/grafana + tag: "10.4.1" + pullPolicy: IfNotPresent + adminUser: admin + adminPassword: "admin" # Change in production! + service: + type: ClusterIP + ingress: + enabled: false + host: grafana.incidentops.local + annotations: {} + tls: [] + persistence: + enabled: false + size: 1Gi + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 200m + memory: 256Mi + + promtail: + enabled: true + image: + repository: grafana/promtail + tag: "2.9.6" + pullPolicy: IfNotPresent + resources: + requests: + cpu: 25m + memory: 64Mi + limits: + cpu: 200m + memory: 256Mi + +# Metrics configuration +metrics: + enabled: true + port: 9464 diff --git a/observability/grafana/dashboards/api-overview.json b/observability/grafana/dashboards/api-overview.json new file mode 100644 index 0000000..842615a --- /dev/null +++ b/observability/grafana/dashboards/api-overview.json @@ -0,0 +1,294 @@ +{ + "title": "IncidentOps API Overview", + "uid": "incidentops-api", + "tags": ["incidentops", "api"], + "timezone": "browser", + "editable": true, + "panels": [ + { + "id": 1, + "title": "Request Rate", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 0, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "Requests/sec", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "unit": "reqps" + } + } + }, + { + "id": 2, + "title": "Request Duration (p50, p95, p99)", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 8, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.50, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.95, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p95", + "refId": "B" + }, + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "histogram_quantile(0.99, sum(rate(http_server_request_duration_seconds_bucket{job=\"incidentops-api\"}[5m])) by (le))", + "legendFormat": "p99", + "refId": "C" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "unit": "s" + } + } + }, + { + "id": 3, + "title": "Error Rate", + "type": "timeseries", + "gridPos": {"h": 8, "w": 8, "x": 16, "y": 0}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\", http_status_code=~\"5..\"}[1m])) / sum(rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m])) * 100", + "legendFormat": "Error %", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"fixedColor": "red", "mode": "fixed"}, + "unit": "percent", + "min": 0, + "max": 100 + } + } + }, + { + "id": 4, + "title": "Requests by Status Code", + "type": "timeseries", + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum by (http_status_code) (rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "{{http_status_code}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "unit": "reqps" + } + } + }, + { + "id": 5, + "title": "Requests by Endpoint", + "type": "timeseries", + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum by (http_route) (rate(http_server_request_duration_seconds_count{job=\"incidentops-api\"}[1m]))", + "legendFormat": "{{http_route}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "unit": "reqps" + } + } + }, + { + "id": 6, + "title": "System CPU Usage", + "type": "gauge", + "gridPos": {"h": 6, "w": 6, "x": 0, "y": 16}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "avg(system_cpu_utilization{job=\"incidentops-api\"}) * 100", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "yellow", "value": 60}, + {"color": "red", "value": 80} + ] + }, + "unit": "percent", + "min": 0, + "max": 100 + } + } + }, + { + "id": 7, + "title": "Memory Usage", + "type": "gauge", + "gridPos": {"h": 6, "w": 6, "x": 6, "y": 16}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "process_runtime_cpython_memory_bytes{job=\"incidentops-api\", type=\"rss\"} / 1024 / 1024", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "yellow", "value": 256}, + {"color": "red", "value": 512} + ] + }, + "unit": "decmbytes" + } + } + }, + { + "id": 8, + "title": "Active Threads", + "type": "stat", + "gridPos": {"h": 6, "w": 6, "x": 12, "y": 16}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "process_runtime_cpython_thread_count{job=\"incidentops-api\"}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "yellow", "value": 50}, + {"color": "red", "value": 100} + ] + } + } + } + }, + { + "id": 9, + "title": "GC Collections", + "type": "stat", + "gridPos": {"h": 6, "w": 6, "x": 18, "y": 16}, + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "prometheus"}, + "expr": "sum(rate(process_runtime_cpython_gc_count{job=\"incidentops-api\"}[5m]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null} + ] + }, + "unit": "cps" + } + } + }, + { + "id": 10, + "title": "Recent Logs", + "type": "logs", + "gridPos": {"h": 10, "w": 24, "x": 0, "y": 22}, + "targets": [ + { + "datasource": {"type": "loki", "uid": "loki"}, + "expr": "{service_name=\"incidentops-api\"} | json", + "refId": "A" + } + ], + "options": { + "showTime": true, + "showLabels": true, + "wrapLogMessage": true, + "enableLogDetails": true, + "sortOrder": "Descending" + } + }, + { + "id": 11, + "title": "Error Logs", + "type": "logs", + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 32}, + "targets": [ + { + "datasource": {"type": "loki", "uid": "loki"}, + "expr": "{service_name=\"incidentops-api\"} |= \"ERROR\" | json", + "refId": "A" + } + ], + "options": { + "showTime": true, + "showLabels": true, + "wrapLogMessage": true, + "enableLogDetails": true, + "sortOrder": "Descending" + } + }, + { + "id": 12, + "title": "Recent Traces", + "type": "traces", + "gridPos": {"h": 10, "w": 24, "x": 0, "y": 40}, + "targets": [ + { + "datasource": {"type": "tempo", "uid": "tempo"}, + "queryType": "traceqlSearch", + "filters": [ + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": ["incidentops-api"] + } + ], + "refId": "A" + } + ] + } + ], + "schemaVersion": 38, + "version": 2 +} diff --git a/observability/grafana/provisioning/dashboards/dashboards.yaml b/observability/grafana/provisioning/dashboards/dashboards.yaml new file mode 100644 index 0000000..e36f2f7 --- /dev/null +++ b/observability/grafana/provisioning/dashboards/dashboards.yaml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: 'IncidentOps' + folderUid: 'incidentops' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards diff --git a/observability/grafana/provisioning/datasources/datasources.yaml b/observability/grafana/provisioning/datasources/datasources.yaml new file mode 100644 index 0000000..df253d0 --- /dev/null +++ b/observability/grafana/provisioning/datasources/datasources.yaml @@ -0,0 +1,48 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + uid: prometheus + url: http://prometheus:9090 + access: proxy + isDefault: false + jsonData: + httpMethod: POST + exemplarTraceIdDestinations: + - name: trace_id + datasourceUid: tempo + + - name: Tempo + type: tempo + uid: tempo + url: http://tempo:3200 + access: proxy + isDefault: false + jsonData: + tracesToLogsV2: + datasourceUid: loki + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + filterByTraceID: true + filterBySpanID: true + tracesToMetrics: + datasourceUid: prometheus + nodeGraph: + enabled: true + lokiSearch: + datasourceUid: loki + + - name: Loki + type: loki + uid: loki + url: http://loki:3100 + access: proxy + isDefault: true + jsonData: + derivedFields: + - datasourceUid: tempo + matcherRegex: '"trace_id":"([a-f0-9]+)"' + name: TraceID + url: '$${__value.raw}' + urlDisplayLabel: 'View Trace' diff --git a/observability/loki/config.yaml b/observability/loki/config.yaml new file mode 100644 index 0000000..0dd8105 --- /dev/null +++ b/observability/loki/config.yaml @@ -0,0 +1,41 @@ +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + +common: + path_prefix: /loki + storage: + filesystem: + chunks_directory: /loki/chunks + rules_directory: /loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +schema_config: + configs: + - from: "2020-10-24" + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + +ruler: + alertmanager_url: http://localhost:9093 + +limits_config: + retention_period: 168h # 7 days + allow_structured_metadata: true + volume_enabled: true diff --git a/observability/otel-collector/config.yaml b/observability/otel-collector/config.yaml new file mode 100644 index 0000000..3e39d63 --- /dev/null +++ b/observability/otel-collector/config.yaml @@ -0,0 +1,38 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + memory_limiter: + check_interval: 1s + limit_mib: 256 + spike_limit_mib: 64 + +exporters: + otlp/tempo: + endpoint: tempo:4317 + tls: + insecure: true + loki: + endpoint: http://loki:3100/loki/api/v1/push + default_labels_enabled: + exporter: true + job: true + +service: + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlp/tempo] + logs: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [loki] diff --git a/observability/prometheus/prometheus.yml b/observability/prometheus/prometheus.yml new file mode 100644 index 0000000..de2c2d3 --- /dev/null +++ b/observability/prometheus/prometheus.yml @@ -0,0 +1,23 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + # Scrape Prometheus itself + - job_name: "prometheus" + static_configs: + - targets: ["localhost:9090"] + + # Scrape IncidentOps API metrics + - job_name: "incidentops-api" + static_configs: + - targets: ["api:9464"] + metrics_path: /metrics + scrape_interval: 10s + + # Scrape IncidentOps Worker metrics (when metrics are enabled) + - job_name: "incidentops-worker" + static_configs: + - targets: ["worker:9464"] + metrics_path: /metrics + scrape_interval: 10s diff --git a/observability/tempo/config.yaml b/observability/tempo/config.yaml new file mode 100644 index 0000000..0c6e4e0 --- /dev/null +++ b/observability/tempo/config.yaml @@ -0,0 +1,32 @@ +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +ingester: + trace_idle_period: 10s + max_block_bytes: 1048576 + max_block_duration: 5m + +compactor: + compaction: + block_retention: 168h # 7 days + +storage: + trace: + backend: local + local: + path: /var/tempo/traces + wal: + path: /var/tempo/wal + +querier: + search: + query_timeout: 30s diff --git a/pyproject.toml b/pyproject.toml index 6f423f6..b8d5a88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,18 @@ dependencies = [ "celery[redis]>=5.4.0", "redis>=5.0.0", "httpx>=0.28.0", + # OpenTelemetry + "opentelemetry-api>=1.27.0", + "opentelemetry-sdk>=1.27.0", + "opentelemetry-exporter-otlp>=1.27.0", + "opentelemetry-exporter-prometheus>=0.48b0", + "opentelemetry-instrumentation-fastapi>=0.48b0", + "opentelemetry-instrumentation-asyncpg>=0.48b0", + "opentelemetry-instrumentation-httpx>=0.48b0", + "opentelemetry-instrumentation-redis>=0.48b0", + "opentelemetry-instrumentation-logging>=0.48b0", + "opentelemetry-instrumentation-system-metrics>=0.48b0", + "prometheus-client>=0.20.0", ] [project.optional-dependencies] diff --git a/skaffold.yaml b/skaffold.yaml index bcb9c51..751f4cc 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -27,14 +27,15 @@ build: - src: "worker/**/*.py" dest: /app - - image: incidentops/web - docker: - dockerfile: Dockerfile.web - context: . - sync: - manual: - - src: "web/src/**/*" - dest: /app + # Web frontend disabled until implemented + # - image: incidentops/web + # docker: + # dockerfile: Dockerfile.web + # context: . + # sync: + # manual: + # - src: "web/src/**/*" + # dest: /app local: push: false @@ -48,12 +49,15 @@ deploy: valuesFiles: - helm/incidentops/values.yaml setValues: - api.image.repository: incidentops/api - api.image.tag: "" - worker.image.repository: incidentops/worker - worker.image.tag: "" - web.image.repository: incidentops/web - web.image.tag: "" + web.replicaCount: 0 # Disabled until frontend is implemented + migration.enabled: true + setValueTemplates: + api.image.repository: "{{.IMAGE_REPO_incidentops_api}}" + api.image.tag: "{{.IMAGE_TAG_incidentops_api}}" + worker.image.repository: "{{.IMAGE_REPO_incidentops_worker}}" + worker.image.tag: "{{.IMAGE_TAG_incidentops_worker}}" + migration.image.repository: "{{.IMAGE_REPO_incidentops_api}}" + migration.image.tag: "{{.IMAGE_TAG_incidentops_api}}" createNamespace: true namespace: incidentops @@ -74,13 +78,15 @@ profiles: setValues: api.replicaCount: 1 worker.replicaCount: 1 - web.replicaCount: 1 - api.image.repository: incidentops/api - api.image.tag: "" - worker.image.repository: incidentops/worker - worker.image.tag: "" - web.image.repository: incidentops/web - web.image.tag: "" + web.replicaCount: 0 # Disabled until frontend is implemented + migration.enabled: true + setValueTemplates: + api.image.repository: "{{.IMAGE_REPO_incidentops_api}}" + api.image.tag: "{{.IMAGE_TAG_incidentops_api}}" + worker.image.repository: "{{.IMAGE_REPO_incidentops_worker}}" + worker.image.tag: "{{.IMAGE_TAG_incidentops_worker}}" + migration.image.repository: "{{.IMAGE_REPO_incidentops_api}}" + migration.image.tag: "{{.IMAGE_TAG_incidentops_api}}" createNamespace: true namespace: incidentops @@ -115,8 +121,30 @@ portForward: namespace: incidentops port: 8000 localPort: 8000 + # Web frontend disabled until implemented + # - resourceType: service + # resourceName: incidentops-web + # namespace: incidentops + # port: 3000 + # localPort: 3000 + # Observability - resourceType: service - resourceName: incidentops-web + resourceName: incidentops-grafana namespace: incidentops - port: 3000 - localPort: 3000 + port: 80 + localPort: 3001 + - resourceType: service + resourceName: incidentops-prometheus + namespace: incidentops + port: 9090 + localPort: 9090 + - resourceType: service + resourceName: incidentops-tempo + namespace: incidentops + port: 3200 + localPort: 3200 + - resourceType: service + resourceName: incidentops-loki + namespace: incidentops + port: 3100 + localPort: 3100 diff --git a/tests/conftest.py b/tests/conftest.py index 44b23e8..aa99d77 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ from __future__ import annotations import os from contextlib import asynccontextmanager -from typing import AsyncGenerator, Callable +from typing import AsyncGenerator, Callable, Generator from uuid import UUID, uuid4 import asyncpg @@ -15,8 +15,11 @@ import pytest os.environ.setdefault("DATABASE_URL", "postgresql://incidentops:incidentops@localhost:5432/incidentops_test") os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key-for-testing-only") os.environ.setdefault("REDIS_URL", "redis://localhost:6379/1") +os.environ.setdefault("TASK_QUEUE_DRIVER", "inmemory") +os.environ.setdefault("TASK_QUEUE_BROKER_URL", "redis://localhost:6379/2") from app.main import app +from app.taskqueue import task_queue # Module-level setup: create database and run migrations once @@ -163,3 +166,14 @@ async def db_admin(clean_database: None) -> AsyncGenerator[asyncpg.Connection, N yield conn finally: await conn.close() + + +@pytest.fixture(autouse=True) +def reset_task_queue() -> Generator[None, None, None]: + """Ensure in-memory task queue state is cleared between tests.""" + + if hasattr(task_queue, "reset"): + task_queue.reset() + yield + if hasattr(task_queue, "reset"): + task_queue.reset() diff --git a/tests/services/test_incident_service.py b/tests/services/test_incident_service.py index bcfd9a9..7e1e704 100644 --- a/tests/services/test_incident_service.py +++ b/tests/services/test_incident_service.py @@ -14,6 +14,7 @@ from app.core import exceptions as exc, security from app.db import Database from app.schemas.incident import CommentRequest, IncidentCreate, TransitionRequest from app.services.incident import IncidentService +from app.taskqueue import InMemoryTaskQueue pytestmark = pytest.mark.asyncio @@ -43,10 +44,24 @@ class _SingleConnectionDatabase(Database): @pytest.fixture -async def incident_service(db_conn: asyncpg.Connection): +def incident_task_queue() -> InMemoryTaskQueue: + """In-memory task queue used to assert dispatch behavior.""" + + return InMemoryTaskQueue() + + +@pytest.fixture +async def incident_service( + db_conn: asyncpg.Connection, + incident_task_queue: InMemoryTaskQueue, +): """IncidentService bound to the per-test database connection.""" - return IncidentService(database=_SingleConnectionDatabase(db_conn)) + return IncidentService( + database=_SingleConnectionDatabase(db_conn), + task_queue=incident_task_queue, + escalation_delay_seconds=60, + ) async def _seed_user_org_service(conn: asyncpg.Connection) -> tuple[CurrentUser, UUID]: @@ -94,7 +109,9 @@ async def _seed_user_org_service(conn: asyncpg.Connection) -> tuple[CurrentUser, async def test_create_incident_persists_and_records_event( - incident_service: IncidentService, db_conn: asyncpg.Connection + incident_service: IncidentService, + db_conn: asyncpg.Connection, + incident_task_queue: InMemoryTaskQueue, ) -> None: current_user, service_id = await _seed_user_org_service(db_conn) @@ -121,6 +138,12 @@ async def test_create_incident_persists_and_records_event( assert event["event_type"] == "created" assert event["actor_user_id"] == current_user.user_id + assert incident_task_queue.dispatched is not None + assert len(incident_task_queue.dispatched) == 2 + first, second = incident_task_queue.dispatched + assert first[0] == "incident_triggered" + assert second[0] == "escalate_if_unacked" + async def test_get_incidents_paginates_by_created_at( incident_service: IncidentService, db_conn: asyncpg.Connection diff --git a/tests/worker/test_celery_tasks.py b/tests/worker/test_celery_tasks.py new file mode 100644 index 0000000..f2450cf --- /dev/null +++ b/tests/worker/test_celery_tasks.py @@ -0,0 +1,199 @@ +"""End-to-end Celery worker tests against the real Redis broker.""" + +from __future__ import annotations + +import asyncio +import inspect +from uuid import UUID, uuid4 + +import asyncpg +import pytest +import redis + +from app.config import settings +from app.repositories.incident import IncidentRepository +from app.taskqueue import CeleryTaskQueue +from celery.contrib.testing.worker import start_worker + +from worker.celery_app import celery_app + + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture(scope="module", autouse=True) +def ensure_redis_available() -> None: + """Skip the module if the configured Redis broker is unreachable.""" + + client = redis.Redis.from_url(settings.resolved_task_queue_broker_url) + try: + client.ping() + except redis.RedisError as exc: # pragma: no cover - diagnostic-only path + pytest.skip(f"Redis broker unavailable: {exc}") + finally: + client.close() + + +@pytest.fixture(scope="module") +def celery_worker_instance(ensure_redis_available: None): + """Run a real Celery worker connected to Redis for the duration of the module.""" + + queues = [settings.task_queue_default_queue, settings.task_queue_critical_queue] + with start_worker( + celery_app, + loglevel="INFO", + pool="solo", + concurrency=1, + queues=queues, + perform_ping_check=False, + ): + yield + + +@pytest.fixture(autouse=True) +def purge_celery_queues(): + """Clear any pending tasks before and after each test for isolation.""" + + celery_app.control.purge() + yield + celery_app.control.purge() + + +@pytest.fixture +def celery_queue() -> CeleryTaskQueue: + return CeleryTaskQueue( + default_queue=settings.task_queue_default_queue, + critical_queue=settings.task_queue_critical_queue, + ) + + +async def _seed_incident_with_target(conn: asyncpg.Connection) -> tuple[UUID, UUID]: + org_id = uuid4() + service_id = uuid4() + incident_id = uuid4() + target_id = uuid4() + + await conn.execute( + "INSERT INTO orgs (id, name, slug) VALUES ($1, $2, $3)", + org_id, + "Celery Org", + f"celery-{org_id.hex[:6]}", + ) + await conn.execute( + "INSERT INTO services (id, org_id, name, slug) VALUES ($1, $2, $3, $4)", + service_id, + org_id, + "API", + f"svc-{service_id.hex[:6]}", + ) + + repo = IncidentRepository(conn) + await repo.create( + incident_id=incident_id, + org_id=org_id, + service_id=service_id, + title="Latency spike", + description="", + severity="high", + ) + + await conn.execute( + """ + INSERT INTO notification_targets (id, org_id, name, target_type, webhook_url, enabled) + VALUES ($1, $2, $3, $4, $5, $6) + """, + target_id, + org_id, + "Primary Webhook", + "webhook", + "https://example.com/hook", + True, + ) + + return org_id, incident_id + + +async def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.1) -> None: + deadline = asyncio.get_running_loop().time() + timeout + while True: + result = predicate() + if inspect.isawaitable(result): + result = await result + if result: + return + if asyncio.get_running_loop().time() >= deadline: + raise AssertionError("Timed out waiting for Celery worker to finish") + await asyncio.sleep(interval) + + +async def _attempt_sent(conn: asyncpg.Connection, incident_id: UUID) -> bool: + row = await conn.fetchrow( + "SELECT status FROM notification_attempts WHERE incident_id = $1", + incident_id, + ) + return bool(row and row["status"] == "sent") + + +async def _attempt_count(conn: asyncpg.Connection, incident_id: UUID) -> int: + count = await conn.fetchval( + "SELECT COUNT(*) FROM notification_attempts WHERE incident_id = $1", + incident_id, + ) + return int(count or 0) + + +async def _attempt_count_is(conn: asyncpg.Connection, incident_id: UUID, expected: int) -> bool: + return await _attempt_count(conn, incident_id) == expected + + +async def test_incident_triggered_task_marks_attempt_sent( + db_admin: asyncpg.Connection, + celery_worker_instance: None, + celery_queue: CeleryTaskQueue, +) -> None: + org_id, incident_id = await _seed_incident_with_target(db_admin) + + celery_queue.incident_triggered( + incident_id=incident_id, + org_id=org_id, + triggered_by=uuid4(), + ) + + await _wait_until(lambda: _attempt_sent(db_admin, incident_id)) + + +async def test_escalate_task_refires_when_incident_still_triggered( + db_admin: asyncpg.Connection, + celery_worker_instance: None, + celery_queue: CeleryTaskQueue, +) -> None: + org_id, incident_id = await _seed_incident_with_target(db_admin) + + celery_queue.schedule_escalation_check( + incident_id=incident_id, + org_id=org_id, + delay_seconds=0, + ) + + await _wait_until(lambda: _attempt_count_is(db_admin, incident_id, 1)) + + +async def test_escalate_task_skips_when_incident_acknowledged( + db_admin: asyncpg.Connection, + celery_worker_instance: None, + celery_queue: CeleryTaskQueue, +) -> None: + org_id, incident_id = await _seed_incident_with_target(db_admin) + await db_admin.execute( + "UPDATE incidents SET status = 'acknowledged' WHERE id = $1", + incident_id, + ) + + celery_queue.schedule_escalation_check( + incident_id=incident_id, + org_id=org_id, + delay_seconds=0, + ) + + await asyncio.sleep(1) + assert await _attempt_count(db_admin, incident_id) == 0 diff --git a/tests/worker/test_notifications.py b/tests/worker/test_notifications.py new file mode 100644 index 0000000..e59914e --- /dev/null +++ b/tests/worker/test_notifications.py @@ -0,0 +1,96 @@ +"""Tests for worker notification helpers.""" + +from __future__ import annotations + +from uuid import UUID, uuid4 + +import asyncpg +import pytest + +from app.repositories.incident import IncidentRepository +from worker.tasks.notifications import NotificationDispatch, prepare_notification_dispatches + + +pytestmark = pytest.mark.asyncio + + +async def _seed_incident(conn: asyncpg.Connection) -> tuple[UUID, UUID, UUID]: + org_id = uuid4() + service_id = uuid4() + incident_id = uuid4() + + await conn.execute( + "INSERT INTO orgs (id, name, slug) VALUES ($1, $2, $3)", + org_id, + "Notif Org", + "notif-org", + ) + await conn.execute( + "INSERT INTO services (id, org_id, name, slug) VALUES ($1, $2, $3, $4)", + service_id, + org_id, + "API", + "api", + ) + + repo = IncidentRepository(conn) + await repo.create( + incident_id=incident_id, + org_id=org_id, + service_id=service_id, + title="Outage", + description="", + severity="high", + ) + + return org_id, service_id, incident_id + + +async def test_prepare_notification_dispatches_creates_attempts(db_conn: asyncpg.Connection) -> None: + org_id, _service_id, incident_id = await _seed_incident(db_conn) + + target_id = uuid4() + await db_conn.execute( + """ + INSERT INTO notification_targets (id, org_id, name, target_type, enabled) + VALUES ($1, $2, $3, $4, $5) + """, + target_id, + org_id, + "Primary Webhook", + "webhook", + True, + ) + + dispatches = await prepare_notification_dispatches(db_conn, incident_id=incident_id, org_id=org_id) + + assert len(dispatches) == 1 + dispatch = dispatches[0] + assert isinstance(dispatch, NotificationDispatch) + assert dispatch.target["name"] == "Primary Webhook" + + attempt = await db_conn.fetchrow( + "SELECT status FROM notification_attempts WHERE id = $1", + dispatch.attempt_id, + ) + assert attempt is not None and attempt["status"] == "pending" + + +async def test_prepare_notification_dispatches_skips_disabled_targets(db_conn: asyncpg.Connection) -> None: + org_id, _service_id, incident_id = await _seed_incident(db_conn) + + await db_conn.execute( + """ + INSERT INTO notification_targets (id, org_id, name, target_type, enabled) + VALUES ($1, $2, $3, $4, $5) + """, + uuid4(), + org_id, + "Disabled", + "email", + False, + ) + + dispatches = await prepare_notification_dispatches(db_conn, incident_id=incident_id, org_id=org_id) + + assert dispatches == [] diff --git a/worker/__init__.py b/worker/__init__.py new file mode 100644 index 0000000..175c838 --- /dev/null +++ b/worker/__init__.py @@ -0,0 +1,3 @@ +"""Celery worker package for IncidentOps.""" + +__all__ = ["celery_app"] diff --git a/worker/celery_app.py b/worker/celery_app.py new file mode 100644 index 0000000..3b24bf3 --- /dev/null +++ b/worker/celery_app.py @@ -0,0 +1,43 @@ +"""Celery application configured for IncidentOps.""" + +from __future__ import annotations + +from celery import Celery +from kombu import Queue + +from app.config import settings + + +celery_app = Celery("incidentops") + + +celery_app.conf.update( + broker_url=settings.resolved_task_queue_broker_url, + task_default_queue=settings.task_queue_default_queue, + task_queues=( + Queue(settings.task_queue_default_queue), + Queue(settings.task_queue_critical_queue), + ), + task_routes={ + "worker.tasks.notifications.escalate_if_unacked": { + "queue": settings.task_queue_critical_queue + }, + }, + task_serializer="json", + accept_content=["json"], + timezone="UTC", + enable_utc=True, +) + +if settings.task_queue_backend == "sqs": + celery_app.conf.broker_transport_options = { + "region": settings.aws_region or "us-east-1", + "visibility_timeout": settings.task_queue_visibility_timeout, + "polling_interval": settings.task_queue_polling_interval, + } + + +celery_app.autodiscover_tasks(["worker.tasks"]) + + +__all__ = ["celery_app"] diff --git a/worker/tasks/__init__.py b/worker/tasks/__init__.py new file mode 100644 index 0000000..ad2b18e --- /dev/null +++ b/worker/tasks/__init__.py @@ -0,0 +1,5 @@ +"""Celery task definitions for IncidentOps.""" + +from worker.tasks import notifications + +__all__ = ["notifications"] diff --git a/worker/tasks/notifications.py b/worker/tasks/notifications.py new file mode 100644 index 0000000..be1555c --- /dev/null +++ b/worker/tasks/notifications.py @@ -0,0 +1,225 @@ +"""Notification-related Celery tasks and helpers.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any +from uuid import UUID, uuid4 + +import asyncpg +from celery import shared_task +from celery.utils.log import get_task_logger + +from app.config import settings +from app.repositories.incident import IncidentRepository +from app.repositories.notification import NotificationRepository + +logger = get_task_logger(__name__) + + +@dataclass +class NotificationDispatch: + """Represents a pending notification attempt for a target.""" + + attempt_id: UUID + incident_id: UUID + target: dict[str, Any] + + +def _serialize_target(target: dict[str, Any]) -> dict[str, Any]: + serialized: dict[str, Any] = {} + for key, value in target.items(): + if isinstance(value, UUID): + serialized[key] = str(value) + else: + serialized[key] = value + return serialized + + +async def prepare_notification_dispatches( + conn: asyncpg.Connection, + *, + incident_id: UUID, + org_id: UUID, +) -> list[NotificationDispatch]: + """Create notification attempts for all enabled targets in the org.""" + + notification_repo = NotificationRepository(conn) + targets = await notification_repo.get_targets_by_org(org_id, enabled_only=True) + dispatches: list[NotificationDispatch] = [] + + for target in targets: + attempt = await notification_repo.create_attempt(uuid4(), incident_id, target["id"]) + dispatches.append( + NotificationDispatch( + attempt_id=attempt["id"], + incident_id=attempt["incident_id"], + target=_serialize_target(target), + ) + ) + + return dispatches + + +async def _prepare_dispatches_with_new_connection( + incident_id: UUID, + org_id: UUID, +) -> list[NotificationDispatch]: + conn = await asyncpg.connect(settings.database_url) + try: + return await prepare_notification_dispatches(conn, incident_id=incident_id, org_id=org_id) + finally: + await conn.close() + + +async def _mark_attempt_success(attempt_id: UUID) -> None: + conn = await asyncpg.connect(settings.database_url) + try: + repo = NotificationRepository(conn) + await repo.update_attempt_success(attempt_id, datetime.now(UTC)) + finally: + await conn.close() + + +async def _mark_attempt_failure(attempt_id: UUID, error: str) -> None: + conn = await asyncpg.connect(settings.database_url) + try: + repo = NotificationRepository(conn) + await repo.update_attempt_failure(attempt_id, error) + finally: + await conn.close() + + +async def _should_escalate(incident_id: UUID) -> bool: + conn = await asyncpg.connect(settings.database_url) + try: + repo = IncidentRepository(conn) + incident = await repo.get_by_id(incident_id) + if incident is None: + return False + return incident["status"] == "triggered" + finally: + await conn.close() + + +def _simulate_delivery(channel: str, target: dict[str, Any], incident_id: str) -> None: + target_name = target.get("name") or target.get("id") + logger.info("Simulated %s delivery for incident %s to %s", channel, incident_id, target_name) + + +@shared_task(name="worker.tasks.notifications.incident_triggered", bind=True) +def incident_triggered( + self, + *, + incident_id: str, + org_id: str, + triggered_by: str | None = None, +) -> None: + """Fan-out notifications to all active targets for the incident's org.""" + + incident_uuid = UUID(incident_id) + org_uuid = UUID(org_id) + try: + dispatches = asyncio.run(_prepare_dispatches_with_new_connection(incident_uuid, org_uuid)) + except Exception as exc: # pragma: no cover - logged for observability + logger.exception("Failed to prepare notification dispatches: %s", exc) + raise + + if not dispatches: + logger.info("No notification targets for org %s", org_id) + return + + for dispatch in dispatches: + target_type = dispatch.target.get("target_type") + kwargs = { + "attempt_id": str(dispatch.attempt_id), + "incident_id": incident_id, + "target": dispatch.target, + } + if target_type == "webhook": + send_webhook.apply_async(kwargs=kwargs, queue=settings.task_queue_default_queue) + elif target_type == "email": + send_email.apply_async(kwargs=kwargs, queue=settings.task_queue_default_queue) + elif target_type == "slack": + send_slack.apply_async(kwargs=kwargs, queue=settings.task_queue_default_queue) + else: + logger.warning("Unsupported notification target type: %s", target_type) + + +@shared_task( + name="worker.tasks.notifications.send_webhook", + bind=True, + autoretry_for=(Exception,), + retry_backoff=True, + retry_kwargs={"max_retries": 3}, +) +def send_webhook(self, *, attempt_id: str, target: dict[str, Any], incident_id: str) -> None: + """Simulate webhook delivery and mark the attempt status.""" + + try: + _simulate_delivery("webhook", target, incident_id) + asyncio.run(_mark_attempt_success(UUID(attempt_id))) + except Exception as exc: # pragma: no cover - logged for observability + logger.exception("Webhook delivery failed: %s", exc) + asyncio.run(_mark_attempt_failure(UUID(attempt_id), str(exc))) + raise + + +@shared_task(name="worker.tasks.notifications.send_email", bind=True) +def send_email(self, *, attempt_id: str, target: dict[str, Any], incident_id: str) -> None: + """Simulate email delivery for the notification attempt.""" + + try: + _simulate_delivery("email", target, incident_id) + asyncio.run(_mark_attempt_success(UUID(attempt_id))) + except Exception as exc: # pragma: no cover + logger.exception("Email delivery failed: %s", exc) + asyncio.run(_mark_attempt_failure(UUID(attempt_id), str(exc))) + raise + + +@shared_task(name="worker.tasks.notifications.send_slack", bind=True) +def send_slack(self, *, attempt_id: str, target: dict[str, Any], incident_id: str) -> None: + """Simulate Slack delivery for the notification attempt.""" + + try: + _simulate_delivery("slack", target, incident_id) + asyncio.run(_mark_attempt_success(UUID(attempt_id))) + except Exception as exc: # pragma: no cover + logger.exception("Slack delivery failed: %s", exc) + asyncio.run(_mark_attempt_failure(UUID(attempt_id), str(exc))) + raise + + +@shared_task(name="worker.tasks.notifications.escalate_if_unacked", bind=True) +def escalate_if_unacked(self, *, incident_id: str, org_id: str) -> None: + """Re-dispatch notifications if the incident remains unacknowledged.""" + + incident_uuid = UUID(incident_id) + should_escalate = asyncio.run(_should_escalate(incident_uuid)) + if not should_escalate: + logger.info("Incident %s no longer needs escalation", incident_id) + return + + logger.info("Incident %s still triggered; re-fanning notifications", incident_id) + incident_triggered.apply_async( # type: ignore[attr-defined] + kwargs={ + "incident_id": incident_id, + "org_id": org_id, + "triggered_by": None, + }, + queue=settings.task_queue_critical_queue, + ) + + +__all__ = [ + "NotificationDispatch", + "incident_triggered", + "escalate_if_unacked", + "prepare_notification_dispatches", + "send_email", + "send_slack", + "send_webhook", +]