Files
logs/internal/ingest/alert_outbox.go
2026-04-27 19:26:57 +08:00

126 lines
3.4 KiB
Go

package ingest
import (
"encoding/json"
"strings"
"time"
"git.apinb.com/ops/logs/internal/impl"
"git.apinb.com/ops/logs/internal/models"
)
const (
outboxStatusPending = "pending"
outboxStatusRetrying = "retrying"
outboxStatusSent = "sent"
outboxStatusDead = "dead"
)
func enqueueAlert(logEventID uint, body AlertReceiveBody) error {
payload, err := json.Marshal(body)
if err != nil {
return err
}
row := models.AlertOutbox{
LogEventID: logEventID,
PayloadJSON: string(payload),
Status: outboxStatusPending,
RetryCount: 0,
NextRetryAt: time.Now(),
LastError: "",
}
return impl.DBService.Create(&row).Error
}
func StartAlertDispatcher() {
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
processAlertOutboxBatch(20)
}
}()
}
func processAlertOutboxBatch(limit int) {
if limit <= 0 {
limit = 20
}
var rows []models.AlertOutbox
now := time.Now()
err := impl.DBService.
Where("status IN ? AND next_retry_at <= ?", []string{outboxStatusPending, outboxStatusRetrying}, now).
Order("id asc").
Limit(limit).
Find(&rows).Error
if err != nil || len(rows) == 0 {
return
}
for _, row := range rows {
processOneOutbox(row)
}
}
func processOneOutbox(row models.AlertOutbox) {
var body AlertReceiveBody
if err := json.Unmarshal([]byte(row.PayloadJSON), &body); err != nil {
markOutboxDead(row.ID, row.RetryCount, "invalid_payload: "+err.Error())
return
}
if err := forwardAlert(body); err != nil {
markOutboxRetry(row, err.Error())
return
}
_ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{
"status": outboxStatusSent,
"last_error": "",
"next_retry_at": time.Now(),
}).Error
_ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Updates(map[string]interface{}{
"alert_sent": true,
"dispatch_status": "sent",
}).Error
}
func markOutboxRetry(row models.AlertOutbox, msg string) {
retry := row.RetryCount + 1
const maxRetry = 5
if retry > maxRetry {
markOutboxDead(row.ID, retry, msg)
return
}
backoff := time.Duration(retry*retry) * time.Second
if backoff > 60*time.Second {
backoff = 60 * time.Second
}
_ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{
"status": outboxStatusRetrying,
"retry_count": retry,
"next_retry_at": time.Now().Add(backoff),
"last_error": truncateError(msg, 1024),
}).Error
_ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "retrying").Error
}
func markOutboxDead(id uint, retry int, msg string) {
_ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", id).Updates(map[string]interface{}{
"status": outboxStatusDead,
"retry_count": retry,
"next_retry_at": time.Now(),
"last_error": truncateError(msg, 1024),
}).Error
var row models.AlertOutbox
if err := impl.DBService.Select("log_event_id").First(&row, id).Error; err == nil && row.LogEventID > 0 {
_ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "dead").Error
}
}
func truncateError(s string, n int) string {
s = strings.TrimSpace(s)
if len(s) <= n {
return s
}
return s[:n]
}