96 lines
2.7 KiB
Python
96 lines
2.7 KiB
Python
|
|
"""Shared pytest fixtures for all tests."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import os
|
||
|
|
from uuid import uuid4
|
||
|
|
|
||
|
|
import asyncpg
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
# Set test environment variables before importing app modules
|
||
|
|
os.environ.setdefault("DATABASE_URL", "postgresql://incidentops:incidentops@localhost:5432/incidentops_test")
|
||
|
|
os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key-for-testing-only")
|
||
|
|
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/1")
|
||
|
|
|
||
|
|
|
||
|
|
# Module-level setup: create database and run migrations once
|
||
|
|
_db_initialized = False
|
||
|
|
|
||
|
|
|
||
|
|
async def _init_test_db() -> None:
|
||
|
|
"""Initialize test database and run migrations (once per session)."""
|
||
|
|
global _db_initialized
|
||
|
|
if _db_initialized:
|
||
|
|
return
|
||
|
|
|
||
|
|
admin_dsn = os.environ["DATABASE_URL"].rsplit("/", 1)[0] + "/postgres"
|
||
|
|
test_db_name = "incidentops_test"
|
||
|
|
|
||
|
|
admin_conn = await asyncpg.connect(admin_dsn)
|
||
|
|
try:
|
||
|
|
# Terminate existing connections to the test database
|
||
|
|
await admin_conn.execute(f"""
|
||
|
|
SELECT pg_terminate_backend(pg_stat_activity.pid)
|
||
|
|
FROM pg_stat_activity
|
||
|
|
WHERE pg_stat_activity.datname = '{test_db_name}'
|
||
|
|
AND pid <> pg_backend_pid()
|
||
|
|
""")
|
||
|
|
# Drop and recreate test database
|
||
|
|
await admin_conn.execute(f"DROP DATABASE IF EXISTS {test_db_name}")
|
||
|
|
await admin_conn.execute(f"CREATE DATABASE {test_db_name}")
|
||
|
|
finally:
|
||
|
|
await admin_conn.close()
|
||
|
|
|
||
|
|
# Connect to test database and run migrations
|
||
|
|
test_dsn = os.environ["DATABASE_URL"]
|
||
|
|
conn = await asyncpg.connect(test_dsn)
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Run migrations
|
||
|
|
migrations_dir = os.path.join(os.path.dirname(__file__), "..", "migrations")
|
||
|
|
migration_files = sorted(
|
||
|
|
f for f in os.listdir(migrations_dir) if f.endswith(".sql")
|
||
|
|
)
|
||
|
|
|
||
|
|
for migration_file in migration_files:
|
||
|
|
migration_path = os.path.join(migrations_dir, migration_file)
|
||
|
|
with open(migration_path) as f:
|
||
|
|
sql = f.read()
|
||
|
|
await conn.execute(sql)
|
||
|
|
finally:
|
||
|
|
await conn.close()
|
||
|
|
|
||
|
|
_db_initialized = True
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
async def db_conn() -> asyncpg.Connection:
|
||
|
|
"""Get a database connection with transaction rollback for test isolation."""
|
||
|
|
await _init_test_db()
|
||
|
|
|
||
|
|
test_dsn = os.environ["DATABASE_URL"]
|
||
|
|
conn = await asyncpg.connect(test_dsn)
|
||
|
|
|
||
|
|
# Start a transaction that will be rolled back
|
||
|
|
tr = conn.transaction()
|
||
|
|
await tr.start()
|
||
|
|
|
||
|
|
try:
|
||
|
|
yield conn
|
||
|
|
finally:
|
||
|
|
await tr.rollback()
|
||
|
|
await conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def make_user_id() -> uuid4:
|
||
|
|
"""Factory for generating user IDs."""
|
||
|
|
return lambda: uuid4()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def make_org_id() -> uuid4:
|
||
|
|
"""Factory for generating org IDs."""
|
||
|
|
return lambda: uuid4()
|