"""Celery application configured for IncidentOps.""" from __future__ import annotations from celery import Celery from kombu import Queue from app.config import settings celery_app = Celery("incidentops") celery_app.conf.update( broker_url=settings.resolved_task_queue_broker_url, task_default_queue=settings.task_queue_default_queue, task_queues=( Queue(settings.task_queue_default_queue), Queue(settings.task_queue_critical_queue), ), task_routes={ "worker.tasks.notifications.escalate_if_unacked": { "queue": settings.task_queue_critical_queue }, }, task_serializer="json", accept_content=["json"], timezone="UTC", enable_utc=True, ) if settings.task_queue_backend == "sqs": celery_app.conf.broker_transport_options = { "region": settings.aws_region or "us-east-1", "visibility_timeout": settings.task_queue_visibility_timeout, "polling_interval": settings.task_queue_polling_interval, } celery_app.autodiscover_tasks(["worker.tasks"]) __all__ = ["celery_app"]