feat(worker): implement background jobs for incidents

This commit is contained in:
2024-12-30 12:00:00 -05:00
parent 8ac4d814ee
commit 06db4231cf
3 changed files with 252 additions and 0 deletions

View File

@@ -0,0 +1,64 @@
using Hangfire;
using IncidentOps.Domain.Entities;
using IncidentOps.Domain.Enums;
using IncidentOps.Infrastructure.Data.Repositories;
using IncidentOps.Infrastructure.Jobs;
using Microsoft.Extensions.Logging;
namespace IncidentOps.Worker.Jobs;
public class EscalateIfUnackedJob : IEscalateIfUnackedJob
{
private readonly IIncidentEventRepository _incidentEventRepository;
private readonly IBackgroundJobClient _backgroundJobClient;
private readonly ILogger<EscalateIfUnackedJob> _logger;
public EscalateIfUnackedJob(
IIncidentEventRepository incidentEventRepository,
IBackgroundJobClient backgroundJobClient,
ILogger<EscalateIfUnackedJob> logger)
{
_incidentEventRepository = incidentEventRepository;
_backgroundJobClient = backgroundJobClient;
_logger = logger;
}
public async Task ExecuteAsync(Guid incidentId, int step)
{
_logger.LogInformation("Checking escalation for incident {IncidentId}, step {Step}", incidentId, step);
using var connection = new Npgsql.NpgsqlConnection(
Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? "");
var incident = await Dapper.SqlMapper.QuerySingleOrDefaultAsync<Incident>(
connection, "SELECT * FROM incidents WHERE id = @Id", new { Id = incidentId });
if (incident == null)
{
_logger.LogWarning("Incident {IncidentId} not found for escalation", incidentId);
return;
}
if (incident.Status != IncidentStatus.Triggered)
{
_logger.LogInformation("Incident {IncidentId} is no longer in Triggered state, skipping escalation",
incidentId);
return;
}
// Record escalation event
await _incidentEventRepository.CreateAsync(new IncidentEvent
{
Id = Guid.NewGuid(),
IncidentId = incidentId,
EventType = IncidentEventType.EscalationTriggered,
Payload = $"{{\"step\": {step}}}",
CreatedAt = DateTime.UtcNow
});
_logger.LogInformation("Escalation triggered for incident {IncidentId}, step {Step}", incidentId, step);
// TODO: Implement secondary notification targets or on-call escalation
// For now, just log the escalation
}
}

View File

@@ -0,0 +1,64 @@
using Hangfire;
using IncidentOps.Infrastructure.Data.Repositories;
using IncidentOps.Infrastructure.Jobs;
using Microsoft.Extensions.Logging;
namespace IncidentOps.Worker.Jobs;
public class IncidentTriggeredJob : IIncidentTriggeredJob
{
private readonly IIncidentRepository _incidentRepository;
private readonly INotificationTargetRepository _notificationTargetRepository;
private readonly IBackgroundJobClient _backgroundJobClient;
private readonly ILogger<IncidentTriggeredJob> _logger;
public IncidentTriggeredJob(
IIncidentRepository incidentRepository,
INotificationTargetRepository notificationTargetRepository,
IBackgroundJobClient backgroundJobClient,
ILogger<IncidentTriggeredJob> logger)
{
_incidentRepository = incidentRepository;
_notificationTargetRepository = notificationTargetRepository;
_backgroundJobClient = backgroundJobClient;
_logger = logger;
}
public async Task ExecuteAsync(Guid incidentId)
{
_logger.LogInformation("Processing incident triggered job for incident {IncidentId}", incidentId);
// We need to get the incident to find its org_id
// Since we don't have org context here, we'll need to query differently
// For now, we'll use a direct query approach
// In production, you might pass orgId as a job parameter
var targets = await GetTargetsForIncidentAsync(incidentId);
foreach (var target in targets)
{
_backgroundJobClient.Enqueue<ISendWebhookNotificationJob>(
j => j.ExecuteAsync(incidentId, target.Id));
}
_logger.LogInformation("Enqueued {Count} notification jobs for incident {IncidentId}", targets.Count, incidentId);
}
private async Task<IReadOnlyList<Domain.Entities.NotificationTarget>> GetTargetsForIncidentAsync(Guid incidentId)
{
// This is a simplified implementation
// In production, you'd want to pass the orgId with the job
// or query the incident first to get its org_id
using var connection = new Npgsql.NpgsqlConnection(
Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? "");
const string sql = @"
SELECT nt.* FROM notification_targets nt
INNER JOIN incidents i ON i.org_id = nt.org_id
WHERE i.id = @IncidentId AND nt.is_enabled = true";
var result = await Dapper.SqlMapper.QueryAsync<Domain.Entities.NotificationTarget>(
connection, sql, new { IncidentId = incidentId });
return result.ToList();
}
}

View File

@@ -0,0 +1,124 @@
using System.Text;
using System.Text.Json;
using IncidentOps.Domain.Entities;
using IncidentOps.Domain.Enums;
using IncidentOps.Infrastructure.Data.Repositories;
using IncidentOps.Infrastructure.Jobs;
using Microsoft.Extensions.Logging;
namespace IncidentOps.Worker.Jobs;
public class SendWebhookNotificationJob : ISendWebhookNotificationJob
{
private readonly IIncidentEventRepository _incidentEventRepository;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<SendWebhookNotificationJob> _logger;
public SendWebhookNotificationJob(
IIncidentEventRepository incidentEventRepository,
IHttpClientFactory httpClientFactory,
ILogger<SendWebhookNotificationJob> logger)
{
_incidentEventRepository = incidentEventRepository;
_httpClientFactory = httpClientFactory;
_logger = logger;
}
public async Task ExecuteAsync(Guid incidentId, Guid targetId)
{
_logger.LogInformation("Sending webhook notification for incident {IncidentId} to target {TargetId}",
incidentId, targetId);
try
{
// Get incident and target details
var (incident, target) = await GetIncidentAndTargetAsync(incidentId, targetId);
if (incident == null || target == null)
{
_logger.LogWarning("Incident or target not found. IncidentId: {IncidentId}, TargetId: {TargetId}",
incidentId, targetId);
return;
}
// Parse webhook configuration
var config = JsonSerializer.Deserialize<WebhookConfig>(target.Configuration);
if (config?.Url == null)
{
_logger.LogError("Invalid webhook configuration for target {TargetId}", targetId);
await RecordNotificationEventAsync(incidentId, false, "Invalid webhook configuration");
return;
}
// Build payload
var payload = new
{
incidentId = incident.Id,
title = incident.Title,
description = incident.Description,
status = incident.Status.ToString(),
createdAt = incident.CreatedAt,
serviceId = incident.ServiceId
};
// Send webhook
var client = _httpClientFactory.CreateClient();
var content = new StringContent(
JsonSerializer.Serialize(payload),
Encoding.UTF8,
"application/json");
var response = await client.PostAsync(config.Url, content);
if (response.IsSuccessStatusCode)
{
_logger.LogInformation("Successfully sent webhook for incident {IncidentId}", incidentId);
await RecordNotificationEventAsync(incidentId, true, null);
}
else
{
var errorMessage = $"Webhook failed with status {response.StatusCode}";
_logger.LogWarning("Webhook failed for incident {IncidentId}: {Error}", incidentId, errorMessage);
await RecordNotificationEventAsync(incidentId, false, errorMessage);
throw new Exception(errorMessage); // Trigger Hangfire retry
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending webhook for incident {IncidentId}", incidentId);
await RecordNotificationEventAsync(incidentId, false, ex.Message);
throw; // Re-throw to trigger Hangfire retry
}
}
private async Task<(Incident?, NotificationTarget?)> GetIncidentAndTargetAsync(Guid incidentId, Guid targetId)
{
using var connection = new Npgsql.NpgsqlConnection(
Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? "");
var incident = await Dapper.SqlMapper.QuerySingleOrDefaultAsync<Incident>(
connection, "SELECT * FROM incidents WHERE id = @Id", new { Id = incidentId });
var target = await Dapper.SqlMapper.QuerySingleOrDefaultAsync<NotificationTarget>(
connection, "SELECT * FROM notification_targets WHERE id = @Id", new { Id = targetId });
return (incident, target);
}
private async Task RecordNotificationEventAsync(Guid incidentId, bool success, string? errorMessage)
{
var eventType = success ? IncidentEventType.NotificationSent : IncidentEventType.NotificationFailed;
var payload = success ? null : JsonSerializer.Serialize(new { error = errorMessage });
await _incidentEventRepository.CreateAsync(new IncidentEvent
{
Id = Guid.NewGuid(),
IncidentId = incidentId,
EventType = eventType,
ActorUserId = null,
Payload = payload,
CreatedAt = DateTime.UtcNow
});
}
private record WebhookConfig(string? Url, string? Secret);
}