From 694893eea33eb61b90287ba12a80f073f5170f74 Mon Sep 17 00:00:00 2001 From: zxr <271055687@qq.com> Date: Mon, 27 Apr 2026 19:26:57 +0800 Subject: [PATCH] fix --- cmd/main/main.go | 1 + doc/日志监控.md | 131 +++++ etc/logs_dev.yaml | 6 +- internal/config/config.go | 8 + internal/impl/impl.go | 2 +- internal/ingest/alert_outbox.go | 125 +++++ internal/ingest/alert_outbox_test.go | 11 + internal/ingest/engine.go | 103 +++- internal/ingest/resource_resolver_test.go | 49 ++ internal/ingest/shield.go | 2 +- internal/ingest/syslog_parse.go | 38 +- internal/ingest/syslog_parse_test.go | 44 ++ internal/logic/controllers/crud.go | 16 + internal/logic/controllers/outbox.go | 73 +++ internal/logic/controllers/resource_event.go | 228 ++++++++ .../logic/controllers/resource_event_test.go | 85 +++ internal/models/alert_outbox.go | 29 + internal/models/log_event.go | 12 + internal/models/query.go | 106 +++- internal/models/resource_event_dedup.go | 24 + internal/models/resource_mapping.go | 37 ++ internal/routers/register.go | 4 + scripts/prepare_logs_e2e_data.py | 104 ++++ scripts/run_e2e.ps1 | 108 ++++ scripts/run_logs_e2e.py | 523 ++++++++++++++++++ scripts/send_trap.go | 47 ++ 26 files changed, 1901 insertions(+), 15 deletions(-) create mode 100644 internal/ingest/alert_outbox.go create mode 100644 internal/ingest/alert_outbox_test.go create mode 100644 internal/ingest/resource_resolver_test.go create mode 100644 internal/logic/controllers/outbox.go create mode 100644 internal/logic/controllers/resource_event.go create mode 100644 internal/logic/controllers/resource_event_test.go create mode 100644 internal/models/alert_outbox.go create mode 100644 internal/models/resource_event_dedup.go create mode 100644 internal/models/resource_mapping.go create mode 100644 scripts/prepare_logs_e2e_data.py create mode 100644 scripts/run_e2e.ps1 create mode 100644 scripts/run_logs_e2e.py create mode 100644 scripts/send_trap.go diff --git a/cmd/main/main.go b/cmd/main/main.go index fe50c61..5fb37be 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -19,6 +19,7 @@ func main() { impl.NewImpl() ingest.StartRefresher() + ingest.StartAlertDispatcher() ingest.StartSyslogUDP() ingest.StartTrapUDP() diff --git a/doc/日志监控.md b/doc/日志监控.md index 05527d3..32d4e8e 100644 --- a/doc/日志监控.md +++ b/doc/日志监控.md @@ -365,3 +365,134 @@ flowchart LR BE --> Refresh[ingest.Global.Refresh()(规则/字典/屏蔽变更后触发)] ``` +--- + +## 7. 中优先级待办(已立项,未完成) + +本节用于记录当前版本可用但尚未产品化完善的中优先级项,作为后续迭代输入。 + +### 7.1 Outbox 可观测性增强 + +当前状态: +- 已支持 `alert_outbox` 入队、重试、死信、手动重试; +- 已有基础列表查询接口和前端入口。 + +待完善内容: +- 增加 outbox 指标接口或埋点: + - `pending_count` + - `retrying_count` + - `dead_count` + - `dispatch_success_rate` + - `dispatch_latency_p95` +- 增加失败原因聚合视图(按 `last_error` 分类统计)。 +- 增加任务生命周期字段(首次入队时间、最后发送时间)用于问题排查。 + +建议落地文件: +- 后端:`internal/logic/controllers/outbox.go`、`internal/ingest/alert_outbox.go` +- 前端:`front/src/views/ops/pages/log-mgmt/entries/index.vue` + +### 7.2 分发状态模型统一(替代 bool) + +当前状态: +- `logs_events` 已新增 `dispatch_status`,并在 outbox 流程中维护状态。 +- 历史字段 `alert_sent` 仍保留,用于兼容旧页面展示。 + +待完善内容: +- 明确状态枚举为:`not_applicable/pending/retrying/sent/dead`。 +- 前后端统一以 `dispatch_status` 作为主状态字段,`alert_sent` 逐步降级为派生字段或移除。 +- 页面文案由“已告警”升级为“分发状态”主展示,避免语义歧义。 + +建议落地文件: +- 后端:`internal/models/log_event.go`、`internal/logic/controllers/crud.go` +- 前端:`front/src/api/ops/logs.ts`、`front/src/views/ops/pages/log-mgmt/entries/index.vue` + +### 7.3 关键路径测试补齐 + +当前状态: +- 已有基础单测覆盖核心函数。 + +待完善内容: +- 增加资源事件安全链路测试: + - 验签失败/成功 + - 超时事件拒绝 + - 幂等事件重复提交 +- 增加 outbox 重试链路测试: + - 发送成功更新状态 + - 重试次数递增 + - 超过阈值转 `dead` +- 增加资源冲突优先级测试: + - `server > collector > device` + +建议落地文件: +- `internal/logic/controllers/resource_event_test.go` +- `internal/ingest/alert_outbox_test.go` +- `internal/ingest/resource_resolver_test.go` + +--- + +## 8. 后续产品化规划(Phase 3) + +本节对应“可运维与产品化”阶段,优先级低于中优先级修复项,但会显著提升系统可管理性。 + +### 8.1 规则发布流(draft / publish / rollback) + +目标: +- 规则配置与生效状态解耦,降低误操作风险。 + +范围: +- 引入规则草稿态与发布态; +- 支持发布记录、回滚到历史版本; +- 变更需记录操作人、时间、变更说明。 + +接口建议: +- `POST /Logs/v1/rule-sets/:id/publish` +- `POST /Logs/v1/rule-sets/:id/rollback` +- `GET /Logs/v1/rule-sets/:id/history` + +### 8.2 规则仿真/回放能力 + +目标: +- 上线前可验证规则命中结果,减少误报漏报。 + +范围: +- 输入样本报文(syslog/trap)执行仿真; +- 返回命中链路(命中/未命中原因); +- 支持历史事件回放。 + +接口建议: +- `POST /Logs/v1/rule-sets/:id/simulate` +- `POST /Logs/v1/rule-sets/:id/replay` + +### 8.3 指标与审计面板 + +目标: +- 建立“采集-匹配-分发”全链路可观测性。 + +范围: +- 采集侧:接收速率、解析失败率; +- 匹配侧:命中率、规则耗时; +- 分发侧:成功率、重试率、死信量; +- 安全侧:验签失败次数、重放拦截次数。 + +前端建议: +- 在日志管理模块增加“运行指标”页签; +- 对死信和验签失败提供快捷定位入口。 + +--- + +## 9. 未完成项执行顺序(建议) + +为降低风险,建议按以下顺序推进: + +1. **中优先级先完成** + - outbox 指标与失败聚合 + - `dispatch_status` 主状态化 + - 关键路径测试补齐 +2. **再做产品化** + - 规则发布流 + - 规则仿真/回放 + - 指标与审计面板 + +验收建议: +- 每项功能完成后执行“单项验证 + 回归验证”,最后统一做端到端联调。 + diff --git a/etc/logs_dev.yaml b/etc/logs_dev.yaml index bb5aadf..cdf5a7e 100644 --- a/etc/logs_dev.yaml +++ b/etc/logs_dev.yaml @@ -21,6 +21,10 @@ Ingest: AlertForward: enabled: true - base_url: https://ops2.apinb.com + base_url: https://ops-api.apinb.com internal_key: "ops-alert" default_policy_id: 0 + +ResourceEvent: + hmac_secret: "replace-with-dc-control-shared-secret" + max_skew_secs: 300 diff --git a/internal/config/config.go b/internal/config/config.go index 4992ffa..3420230 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,6 +21,13 @@ type IngestConf struct { RuleRefreshSecs int `yaml:"rule_refresh_secs"` } +type ResourceEventConf struct { + // HMACSecret 用于校验 dc-control 推送签名(X-Event-Signature)。 + HMACSecret string `yaml:"hmac_secret"` + // MaxSkewSecs 允许事件时间与服务端时间的最大偏差(秒)。 + MaxSkewSecs int `yaml:"max_skew_secs"` +} + type SrvConfig struct { conf.Base `yaml:",inline"` Databases *conf.DBConf `yaml:"Databases"` @@ -31,6 +38,7 @@ type SrvConfig struct { Etcd *conf.EtcdConf `yaml:"Etcd"` AlertForward *AlertForwardConf `yaml:"AlertForward"` Ingest IngestConf `yaml:"Ingest"` + ResourceEvent ResourceEventConf `yaml:"ResourceEvent"` } func New(srvKey string) { diff --git a/internal/impl/impl.go b/internal/impl/impl.go index 93ed4bf..ad1fa1d 100644 --- a/internal/impl/impl.go +++ b/internal/impl/impl.go @@ -25,7 +25,7 @@ func NewImpl() { if err := DBService.AutoMigrate(models.GetAllModels()...); err != nil { panic(fmt.Sprintf("logs migrate: %v", err)) } - if err := models.InitData(); err != nil { + if err := models.InitData(DBService); err != nil { panic(fmt.Sprintf("logs init data: %v", err)) } } diff --git a/internal/ingest/alert_outbox.go b/internal/ingest/alert_outbox.go new file mode 100644 index 0000000..b37729e --- /dev/null +++ b/internal/ingest/alert_outbox.go @@ -0,0 +1,125 @@ +package ingest + +import ( + "encoding/json" + "strings" + "time" + + "git.apinb.com/ops/logs/internal/impl" + "git.apinb.com/ops/logs/internal/models" +) + +const ( + outboxStatusPending = "pending" + outboxStatusRetrying = "retrying" + outboxStatusSent = "sent" + outboxStatusDead = "dead" +) + +func enqueueAlert(logEventID uint, body AlertReceiveBody) error { + payload, err := json.Marshal(body) + if err != nil { + return err + } + row := models.AlertOutbox{ + LogEventID: logEventID, + PayloadJSON: string(payload), + Status: outboxStatusPending, + RetryCount: 0, + NextRetryAt: time.Now(), + LastError: "", + } + return impl.DBService.Create(&row).Error +} + +func StartAlertDispatcher() { + go func() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + for range ticker.C { + processAlertOutboxBatch(20) + } + }() +} + +func processAlertOutboxBatch(limit int) { + if limit <= 0 { + limit = 20 + } + var rows []models.AlertOutbox + now := time.Now() + err := impl.DBService. + Where("status IN ? AND next_retry_at <= ?", []string{outboxStatusPending, outboxStatusRetrying}, now). + Order("id asc"). + Limit(limit). + Find(&rows).Error + if err != nil || len(rows) == 0 { + return + } + for _, row := range rows { + processOneOutbox(row) + } +} + +func processOneOutbox(row models.AlertOutbox) { + var body AlertReceiveBody + if err := json.Unmarshal([]byte(row.PayloadJSON), &body); err != nil { + markOutboxDead(row.ID, row.RetryCount, "invalid_payload: "+err.Error()) + return + } + if err := forwardAlert(body); err != nil { + markOutboxRetry(row, err.Error()) + return + } + _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{ + "status": outboxStatusSent, + "last_error": "", + "next_retry_at": time.Now(), + }).Error + _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Updates(map[string]interface{}{ + "alert_sent": true, + "dispatch_status": "sent", + }).Error +} + +func markOutboxRetry(row models.AlertOutbox, msg string) { + retry := row.RetryCount + 1 + const maxRetry = 5 + if retry > maxRetry { + markOutboxDead(row.ID, retry, msg) + return + } + backoff := time.Duration(retry*retry) * time.Second + if backoff > 60*time.Second { + backoff = 60 * time.Second + } + _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{ + "status": outboxStatusRetrying, + "retry_count": retry, + "next_retry_at": time.Now().Add(backoff), + "last_error": truncateError(msg, 1024), + }).Error + _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "retrying").Error +} + +func markOutboxDead(id uint, retry int, msg string) { + _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", id).Updates(map[string]interface{}{ + "status": outboxStatusDead, + "retry_count": retry, + "next_retry_at": time.Now(), + "last_error": truncateError(msg, 1024), + }).Error + var row models.AlertOutbox + if err := impl.DBService.Select("log_event_id").First(&row, id).Error; err == nil && row.LogEventID > 0 { + _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "dead").Error + } +} + +func truncateError(s string, n int) string { + s = strings.TrimSpace(s) + if len(s) <= n { + return s + } + return s[:n] +} + diff --git a/internal/ingest/alert_outbox_test.go b/internal/ingest/alert_outbox_test.go new file mode 100644 index 0000000..20fd405 --- /dev/null +++ b/internal/ingest/alert_outbox_test.go @@ -0,0 +1,11 @@ +package ingest + +import "testing" + +func TestTruncateError(t *testing.T) { + got := truncateError(" abcdef ", 3) + if got != "abc" { + t.Fatalf("unexpected value: %q", got) + } +} + diff --git a/internal/ingest/engine.go b/internal/ingest/engine.go index 500ec04..0c1573d 100644 --- a/internal/ingest/engine.go +++ b/internal/ingest/engine.go @@ -24,6 +24,27 @@ type Engine struct { syslogRules []models.SyslogRule trapRules []models.TrapRule shields []models.TrapShield + resourceByIP map[string]resourceRef + resourceByHN map[string]resourceRef +} + +type resourceRef struct { + ResourceType string + ResourceID string + ResourceName string +} + +func resourceTypePriority(resourceType string) int { + switch strings.ToLower(strings.TrimSpace(resourceType)) { + case "server": + return 3 + case "collector": + return 2 + case "device": + return 1 + default: + return 0 + } } var Global = &Engine{} @@ -33,6 +54,7 @@ func (e *Engine) Refresh() error { var syslog []models.SyslogRule var trap []models.TrapRule var shield []models.TrapShield + var mappings []models.ResourceMapping if err := impl.DBService.Where("enabled = ?", true).Find(&dict).Error; err != nil { return err @@ -54,12 +76,51 @@ func (e *Engine) Refresh() error { if err := impl.DBService.Where("enabled = ?", true).Find(&shield).Error; err != nil { return err } + if err := impl.DBService.Where("is_deleted = ?", false).Order("updated_at desc, id desc").Find(&mappings).Error; err != nil { + return err + } + + ipMap := make(map[string]resourceRef) + hnMap := make(map[string]resourceRef) + for _, m := range mappings { + ref := resourceRef{ + ResourceType: m.ResourceType, + ResourceID: m.ResourceID, + ResourceName: m.ResourceName, + } + var ips []string + if err := json.Unmarshal([]byte(m.IPsJSON), &ips); err == nil { + for _, ip := range ips { + key := strings.TrimSpace(ip) + if key == "" { + continue + } + if cur, exists := ipMap[key]; !exists || resourceTypePriority(ref.ResourceType) > resourceTypePriority(cur.ResourceType) { + ipMap[key] = ref + } + } + } + var hostnames []string + if err := json.Unmarshal([]byte(m.HostnamesJSON), &hostnames); err == nil { + for _, hn := range hostnames { + key := strings.ToLower(strings.TrimSpace(hn)) + if key == "" { + continue + } + if cur, exists := hnMap[key]; !exists || resourceTypePriority(ref.ResourceType) > resourceTypePriority(cur.ResourceType) { + hnMap[key] = ref + } + } + } + } e.mu.Lock() e.trapDict = dict e.syslogRules = syslog e.trapRules = trap e.shields = shield + e.resourceByIP = ipMap + e.resourceByHN = hnMap e.mu.Unlock() return nil } @@ -99,14 +160,21 @@ func (e *Engine) HandleSyslog(addr *net.UDPAddr, payload []byte) { detailBytes, _ := json.Marshal(detailObj) summary := formatSyslogSummary(parsed) sev := syslogPriorityToSeverity(parsed.Priority) + ref, method := e.resolveResource(addr.IP.String(), device) ev := models.LogEvent{ SourceKind: "syslog", RemoteAddr: addr.String(), + SourceIP: addr.IP.String(), RawPayload: string(payload), NormalizedSummary: summary, NormalizedDetail: string(detailBytes), DeviceName: device, + ResourceType: ref.ResourceType, + ResourceID: ref.ResourceID, + ResourceName: ref.ResourceName, + MatchMethod: method, + DispatchStatus: "not_applicable", SeverityCode: sev, } @@ -166,8 +234,8 @@ func (e *Engine) HandleSyslog(addr *net.UDPAddr, payload []byte) { PolicyID: matched.PolicyID, RawData: rawBytes, } - if err := forwardAlert(body); err == nil { - _ = impl.DBService.Model(&ev).Update("alert_sent", true).Error + if err := enqueueAlert(ev.ID, body); err == nil { + _ = impl.DBService.Model(&ev).Update("dispatch_status", "pending").Error } } @@ -204,10 +272,7 @@ func trapShielded(e *Engine, addr *net.UDPAddr, trapOID string, pkt *gosnmp.Snmp if !s.Enabled { continue } - if strings.TrimSpace(s.SourceIPCIDR) == "" { - continue - } - if !ipMatchesCIDR(ip, s.SourceIPCIDR) { + if cidr := strings.TrimSpace(s.SourceIPCIDR); cidr != "" && !ipMatchesCIDR(ip, cidr) { continue } if p := strings.TrimSpace(s.OIDPrefix); p != "" && !strings.HasPrefix(normOID(trapOID), normOID(p)) { @@ -265,14 +330,21 @@ func (e *Engine) HandleTrap(addr *net.UDPAddr, pkt *gosnmp.SnmpPacket) { } } detailBytes, _ := json.Marshal(detailObj) + ref, method := e.resolveResource(addr.IP.String(), addr.IP.String()) ev := models.LogEvent{ SourceKind: "snmp_trap", RemoteAddr: addr.String(), + SourceIP: addr.IP.String(), RawPayload: fp, NormalizedSummary: readable, NormalizedDetail: string(detailBytes), DeviceName: addr.IP.String(), + ResourceType: ref.ResourceType, + ResourceID: ref.ResourceID, + ResourceName: ref.ResourceName, + MatchMethod: method, + DispatchStatus: "not_applicable", SeverityCode: sev, TrapOID: trapOID, } @@ -360,8 +432,8 @@ func (e *Engine) HandleTrap(addr *net.UDPAddr, pkt *gosnmp.SnmpPacket) { PolicyID: matched.PolicyID, RawData: rawBytes, } - if err := forwardAlert(body); err == nil { - _ = impl.DBService.Model(&ev).Update("alert_sent", true).Error + if err := enqueueAlert(ev.ID, body); err == nil { + _ = impl.DBService.Model(&ev).Update("dispatch_status", "pending").Error } } @@ -440,3 +512,18 @@ func firstNonEmpty(a, b string) string { } return b } + +func (e *Engine) resolveResource(sourceIP, hostname string) (resourceRef, string) { + e.mu.RLock() + ipMap := e.resourceByIP + hnMap := e.resourceByHN + e.mu.RUnlock() + + if ref, ok := ipMap[strings.TrimSpace(sourceIP)]; ok { + return ref, "ip" + } + if ref, ok := hnMap[strings.ToLower(strings.TrimSpace(hostname))]; ok { + return ref, "hostname" + } + return resourceRef{}, "none" +} diff --git a/internal/ingest/resource_resolver_test.go b/internal/ingest/resource_resolver_test.go new file mode 100644 index 0000000..eeb07f6 --- /dev/null +++ b/internal/ingest/resource_resolver_test.go @@ -0,0 +1,49 @@ +package ingest + +import "testing" + +func TestResolveResourceByIPFirst(t *testing.T) { + e := &Engine{ + resourceByIP: map[string]resourceRef{ + "10.0.0.10": {ResourceType: "server", ResourceID: "srv-10", ResourceName: "s10"}, + }, + resourceByHN: map[string]resourceRef{ + "host-a": {ResourceType: "device", ResourceID: "dev-a", ResourceName: "a"}, + }, + } + ref, method := e.resolveResource("10.0.0.10", "host-a") + if method != "ip" { + t.Fatalf("method=%s", method) + } + if ref.ResourceID != "srv-10" { + t.Fatalf("resource id=%s", ref.ResourceID) + } +} + +func TestResolveResourceByHostname(t *testing.T) { + e := &Engine{ + resourceByIP: map[string]resourceRef{}, + resourceByHN: map[string]resourceRef{ + "host-a": {ResourceType: "device", ResourceID: "dev-a", ResourceName: "a"}, + }, + } + ref, method := e.resolveResource("10.0.0.20", "HOST-A") + if method != "hostname" { + t.Fatalf("method=%s", method) + } + if ref.ResourceID != "dev-a" { + t.Fatalf("resource id=%s", ref.ResourceID) + } +} + +func TestResolveResourceNoMatch(t *testing.T) { + e := &Engine{ + resourceByIP: map[string]resourceRef{}, + resourceByHN: map[string]resourceRef{}, + } + _, method := e.resolveResource("10.0.0.20", "host-b") + if method != "none" { + t.Fatalf("method=%s", method) + } +} + diff --git a/internal/ingest/shield.go b/internal/ingest/shield.go index ef273ae..59a6142 100644 --- a/internal/ingest/shield.go +++ b/internal/ingest/shield.go @@ -40,7 +40,7 @@ func inTimeWindows(now time.Time, jsonStr string) bool { } var windows []timeWindow if err := json.Unmarshal([]byte(s), &windows); err != nil || len(windows) == 0 { - return true + return false } tod := now.Hour()*60 + now.Minute() wd := int(now.Weekday()) diff --git a/internal/ingest/syslog_parse.go b/internal/ingest/syslog_parse.go index bb7b799..e30e89f 100644 --- a/internal/ingest/syslog_parse.go +++ b/internal/ingest/syslog_parse.go @@ -46,8 +46,20 @@ func parseSyslogPayload(payload []byte) ParsedSyslog { tokens := strings.SplitN(rest, " ", 3) if len(tokens) >= 2 { if len(tokens) >= 3 && isMonthAbbr(tokens[0]) { - p.Hostname = tokens[2] - if idx := strings.Index(rest, ": "); idx > 0 { + parts := strings.Fields(rest) + if len(parts) >= 4 && isDayOfMonth(parts[1]) && isHHMMSS(parts[2]) { + p.Hostname = parts[3] + if len(parts) > 4 { + tagMsg := strings.Join(parts[4:], " ") + if idx := strings.Index(tagMsg, ": "); idx > 0 { + p.Tag = tagMsg[:idx] + p.Message = strings.TrimSpace(tagMsg[idx+2:]) + } else { + p.Message = tagMsg + } + } + } else if idx := strings.Index(rest, ": "); idx > 0 { + // 兼容无法严格按 RFC3164 切分的历史格式。 p.Message = strings.TrimSpace(rest[idx+2:]) } } else { @@ -66,6 +78,28 @@ func parseSyslogPayload(payload []byte) ParsedSyslog { return p } +func isDayOfMonth(s string) bool { + n, err := strconv.Atoi(s) + if err != nil { + return false + } + return n >= 1 && n <= 31 +} + +func isHHMMSS(s string) bool { + parts := strings.Split(s, ":") + if len(parts) != 3 { + return false + } + h, err1 := strconv.Atoi(parts[0]) + m, err2 := strconv.Atoi(parts[1]) + sec, err3 := strconv.Atoi(parts[2]) + if err1 != nil || err2 != nil || err3 != nil { + return false + } + return h >= 0 && h <= 23 && m >= 0 && m <= 59 && sec >= 0 && sec <= 59 +} + func isMonthAbbr(s string) bool { if len(s) < 3 { return false diff --git a/internal/ingest/syslog_parse_test.go b/internal/ingest/syslog_parse_test.go index 39618dd..c638299 100644 --- a/internal/ingest/syslog_parse_test.go +++ b/internal/ingest/syslog_parse_test.go @@ -2,7 +2,12 @@ package ingest import ( "encoding/json" + "net" "testing" + "time" + + "git.apinb.com/ops/logs/internal/models" + "github.com/gosnmp/gosnmp" ) func TestParseSyslogPayloadPri(t *testing.T) { @@ -12,6 +17,19 @@ func TestParseSyslogPayloadPri(t *testing.T) { } } +func TestParseSyslogPayloadRFC3164Hostname(t *testing.T) { + p := parseSyslogPayload([]byte("Oct 11 22:14:15 mymachine su: failed")) + if p.Hostname != "mymachine" { + t.Fatalf("hostname=%q", p.Hostname) + } + if p.Tag != "su" { + t.Fatalf("tag=%q", p.Tag) + } + if p.Message != "failed" { + t.Fatalf("message=%q", p.Message) + } +} + func TestForwardAlertBodyIncludesRawData(t *testing.T) { raw := []byte(`{"source":"syslog","parsed":{}}`) b := AlertReceiveBody{ @@ -30,3 +48,29 @@ func TestForwardAlertBodyIncludesRawData(t *testing.T) { t.Fatalf("raw_data %s", dec["raw_data"]) } } + +func TestInTimeWindowsInvalidJSONReturnsFalse(t *testing.T) { + now := time.Date(2026, 1, 1, 10, 0, 0, 0, time.Local) + if inTimeWindows(now, "{invalid") { + t.Fatal("invalid json should not be treated as always effective") + } +} + +func TestTrapShieldedAllowsEmptySourceIPCIDR(t *testing.T) { + e := &Engine{ + shields: []models.TrapShield{ + { + Enabled: true, + SourceIPCIDR: "", + OIDPrefix: "1.3.6.1.4.1", + InterfaceHint: "", + TimeWindowsJSON: "", + }, + }, + } + addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.1"), Port: 162} + pkt := &gosnmp.SnmpPacket{} + if !trapShielded(e, addr, "1.3.6.1.4.1.999", pkt) { + t.Fatal("shield should match when source_ip_cidr is empty and other conditions match") + } +} diff --git a/internal/logic/controllers/crud.go b/internal/logic/controllers/crud.go index 1529bdd..756c10a 100644 --- a/internal/logic/controllers/crud.go +++ b/internal/logic/controllers/crud.go @@ -273,6 +273,10 @@ func DeleteTrapShield(ctx *gin.Context) { func ListLogEvents(ctx *gin.Context) { kind := ctx.Query("source_kind") + resourceType := ctx.Query("resource_type") + resourceID := ctx.Query("resource_id") + dispatchStatus := ctx.Query("dispatch_status") + logEventID, _ := strconv.ParseUint(ctx.DefaultQuery("log_event_id", "0"), 10, 64) page, _ := strconv.Atoi(ctx.DefaultQuery("page", "1")) size, _ := strconv.Atoi(ctx.DefaultQuery("page_size", "50")) if page < 1 { @@ -286,6 +290,18 @@ func ListLogEvents(ctx *gin.Context) { if kind != "" { q = q.Where("source_kind = ?", kind) } + if resourceType != "" { + q = q.Where("resource_type = ?", resourceType) + } + if resourceID != "" { + q = q.Where("resource_id = ?", resourceID) + } + if dispatchStatus != "" { + q = q.Where("dispatch_status = ?", dispatchStatus) + } + if logEventID > 0 { + q = q.Where("id = ?", uint(logEventID)) + } var total int64 _ = q.Count(&total).Error var rows []models.LogEvent diff --git a/internal/logic/controllers/outbox.go b/internal/logic/controllers/outbox.go new file mode 100644 index 0000000..d3737bc --- /dev/null +++ b/internal/logic/controllers/outbox.go @@ -0,0 +1,73 @@ +package controllers + +import ( + "errors" + "strconv" + "strings" + "time" + + "git.apinb.com/bsm-sdk/core/infra" + "git.apinb.com/ops/logs/internal/impl" + "git.apinb.com/ops/logs/internal/models" + "github.com/gin-gonic/gin" +) + +func ListAlertOutbox(ctx *gin.Context) { + status := strings.TrimSpace(ctx.Query("status")) + page, _ := strconv.Atoi(ctx.DefaultQuery("page", "1")) + size, _ := strconv.Atoi(ctx.DefaultQuery("page_size", "50")) + if page < 1 { + page = 1 + } + if size < 1 || size > 500 { + size = 50 + } + offset := (page - 1) * size + + q := impl.DBService.Model(&models.AlertOutbox{}) + if status != "" { + q = q.Where("status = ?", status) + } + var total int64 + _ = q.Count(&total).Error + + var rows []models.AlertOutbox + if err := q.Order("id desc").Offset(offset).Limit(size).Find(&rows).Error; err != nil { + infra.Response.Error(ctx, err) + return + } + infra.Response.Success(ctx, gin.H{ + "total": total, + "page": page, + "page_size": size, + "items": rows, + }) +} + +func RetryAlertOutbox(ctx *gin.Context) { + id, err := parseID(ctx) + if err != nil { + infra.Response.Error(ctx, errors.New("invalid id")) + return + } + var row models.AlertOutbox + if err := impl.DBService.First(&row, id).Error; err != nil { + infra.Response.Error(ctx, err) + return + } + + // 手工重试时,无论失败原因如何都重置为 pending 并立即可被 worker 消费。 + if err := impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", id).Updates(map[string]interface{}{ + "status": "pending", + "next_retry_at": time.Now(), + "last_error": "", + }).Error; err != nil { + infra.Response.Error(ctx, err) + return + } + infra.Response.Success(ctx, gin.H{ + "id": id, + "status": "pending", + }) +} + diff --git a/internal/logic/controllers/resource_event.go b/internal/logic/controllers/resource_event.go new file mode 100644 index 0000000..c989057 --- /dev/null +++ b/internal/logic/controllers/resource_event.go @@ -0,0 +1,228 @@ +package controllers + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "git.apinb.com/bsm-sdk/core/infra" + "git.apinb.com/ops/logs/internal/config" + "git.apinb.com/ops/logs/internal/impl" + "git.apinb.com/ops/logs/internal/models" + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +const ( + resourceEventUpsert = "resource.upsert" + resourceEventDelete = "resource.delete" +) + +type resourceEventRequest struct { + EventID string `json:"event_id"` + EventTime string `json:"event_time"` + EventType string `json:"event_type"` + ResourceType string `json:"resource_type"` + ResourceID string `json:"resource_id"` + ResourceName string `json:"resource_name"` + IPs []string `json:"ips"` + Hostnames []string `json:"hostnames"` + Labels map[string]string `json:"labels"` + Version int64 `json:"version"` +} + +// ReceiveResourceEvent 接收 dc-control 推送的资源变更事件并落库。 +func ReceiveResourceEvent(ctx *gin.Context) { + raw, err := ctx.GetRawData() + if err != nil { + infra.Response.Error(ctx, err) + return + } + if err := verifyResourceEventSignature(ctx.GetHeader("X-Event-Signature"), raw); err != nil { + infra.Response.Error(ctx, err) + return + } + + var req resourceEventRequest + if err := json.Unmarshal(raw, &req); err != nil { + infra.Response.Error(ctx, err) + return + } + eventTime, err := validateResourceEventRequest(&req) + if err != nil { + infra.Response.Error(ctx, err) + return + } + if err := validateEventTimeSkew(eventTime); err != nil { + infra.Response.Error(ctx, err) + return + } + if ok, err := tryInsertResourceEventDedup(req.EventID, eventTime, req.ResourceType, req.ResourceID); err != nil { + infra.Response.Error(ctx, err) + return + } else if !ok { + infra.Response.Success(ctx, gin.H{ + "ignored": true, + "reason": "duplicate_event_id", + "event_id": req.EventID, + }) + return + } + + var row models.ResourceMapping + err = impl.DBService.Where("resource_type = ? AND resource_id = ?", req.ResourceType, req.ResourceID).First(&row).Error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + infra.Response.Error(ctx, err) + return + } + + // 已存在记录且版本回退时忽略该事件,避免乱序覆盖。 + if err == nil && row.Version > req.Version { + infra.Response.Success(ctx, gin.H{ + "ignored": true, + "reason": "stale_version", + "current": row.Version, + "incoming": req.Version, + }) + return + } + + ipsJSON, _ := json.Marshal(nonEmptyUnique(req.IPs)) + hostnamesJSON, _ := json.Marshal(nonEmptyUnique(req.Hostnames)) + labelsJSON, _ := json.Marshal(req.Labels) + + row.ResourceType = req.ResourceType + row.ResourceID = req.ResourceID + row.ResourceName = req.ResourceName + row.IPsJSON = string(ipsJSON) + row.HostnamesJSON = string(hostnamesJSON) + row.LabelsJSON = string(labelsJSON) + row.Version = req.Version + row.LastEventID = req.EventID + row.EventTime = eventTime + row.IsDeleted = req.EventType == resourceEventDelete + + if err := impl.DBService.Save(&row).Error; err != nil { + infra.Response.Error(ctx, err) + return + } + infra.Response.Success(ctx, gin.H{ + "resource_type": row.ResourceType, + "resource_id": row.ResourceID, + "version": row.Version, + "is_deleted": row.IsDeleted, + }) +} + +func validateResourceEventRequest(req *resourceEventRequest) (time.Time, error) { + req.EventID = strings.TrimSpace(req.EventID) + req.EventType = strings.TrimSpace(req.EventType) + req.ResourceType = strings.TrimSpace(req.ResourceType) + req.ResourceID = strings.TrimSpace(req.ResourceID) + req.ResourceName = strings.TrimSpace(req.ResourceName) + req.EventTime = strings.TrimSpace(req.EventTime) + + if req.EventID == "" { + return time.Time{}, errors.New("event_id is required") + } + if req.EventType != resourceEventUpsert && req.EventType != resourceEventDelete { + return time.Time{}, errors.New("event_type must be resource.upsert or resource.delete") + } + if req.ResourceType == "" { + return time.Time{}, errors.New("resource_type is required") + } + if req.ResourceID == "" { + return time.Time{}, errors.New("resource_id is required") + } + if req.Version <= 0 { + return time.Time{}, errors.New("version must be positive") + } + if req.EventTime == "" { + return time.Time{}, errors.New("event_time is required") + } + tm, err := time.Parse(time.RFC3339, req.EventTime) + if err != nil { + return time.Time{}, errors.New("event_time must be RFC3339") + } + return tm, nil +} + +func nonEmptyUnique(items []string) []string { + if len(items) == 0 { + return nil + } + seen := make(map[string]struct{}, len(items)) + out := make([]string, 0, len(items)) + for _, item := range items { + v := strings.TrimSpace(item) + if v == "" { + continue + } + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + out = append(out, v) + } + return out +} + +func verifyResourceEventSignature(signature string, body []byte) error { + signature = strings.TrimSpace(signature) + signature = strings.TrimPrefix(strings.ToLower(signature), "sha256=") + secret := strings.TrimSpace(config.Spec.ResourceEvent.HMACSecret) + if secret == "" { + return errors.New("resource_event hmac_secret is not configured") + } + if signature == "" { + return errors.New("missing X-Event-Signature") + } + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(body) + expected := fmt.Sprintf("%x", mac.Sum(nil)) + if !hmac.Equal([]byte(strings.ToLower(signature)), []byte(expected)) { + return errors.New("invalid X-Event-Signature") + } + return nil +} + +func validateEventTimeSkew(eventTime time.Time) error { + maxSkew := config.Spec.ResourceEvent.MaxSkewSecs + if maxSkew <= 0 { + maxSkew = 300 + } + diff := time.Since(eventTime) + if diff < 0 { + diff = -diff + } + if diff > time.Duration(maxSkew)*time.Second { + return errors.New("event_time out of allowed skew window") + } + return nil +} + +func tryInsertResourceEventDedup(eventID string, eventTime time.Time, resourceType, resourceID string) (bool, error) { + // 先查询再插入,避免依赖数据库唯一索引存在与否。 + var existed models.ResourceEventDedup + if err := impl.DBService.Where("event_id = ?", eventID).First(&existed).Error; err == nil { + return false, nil + } + row := models.ResourceEventDedup{ + EventID: eventID, + EventTime: eventTime, + ResourceType: resourceType, + ResourceID: resourceID, + } + if err := impl.DBService.Create(&row).Error; err != nil { + if strings.Contains(strings.ToLower(err.Error()), "duplicate") || strings.Contains(strings.ToLower(err.Error()), "unique") { + return false, nil + } + return false, err + } + return true, nil +} + diff --git a/internal/logic/controllers/resource_event_test.go b/internal/logic/controllers/resource_event_test.go new file mode 100644 index 0000000..9945f03 --- /dev/null +++ b/internal/logic/controllers/resource_event_test.go @@ -0,0 +1,85 @@ +package controllers + +import ( + "crypto/hmac" + "crypto/sha256" + "fmt" + "testing" + "time" + + "git.apinb.com/ops/logs/internal/config" +) + +func TestValidateResourceEventRequest(t *testing.T) { + req := &resourceEventRequest{ + EventID: "evt-1", + EventTime: "2026-04-27T08:00:00Z", + EventType: resourceEventUpsert, + ResourceType: "server", + ResourceID: "srv-1", + ResourceName: "server-1", + Version: 1, + } + if _, err := validateResourceEventRequest(req); err != nil { + t.Fatalf("expected valid request, got error: %v", err) + } +} + +func TestValidateResourceEventRequestInvalidTime(t *testing.T) { + req := &resourceEventRequest{ + EventID: "evt-1", + EventTime: "bad-time", + EventType: resourceEventUpsert, + ResourceType: "server", + ResourceID: "srv-1", + Version: 1, + } + if _, err := validateResourceEventRequest(req); err == nil { + t.Fatal("expected invalid time error") + } +} + +func TestNonEmptyUnique(t *testing.T) { + got := nonEmptyUnique([]string{" 10.0.0.1 ", "", "10.0.0.1", "host-a", "host-a"}) + if len(got) != 2 { + t.Fatalf("unexpected unique size: %d", len(got)) + } + if got[0] != "10.0.0.1" || got[1] != "host-a" { + t.Fatalf("unexpected output: %#v", got) + } +} + +func TestVerifyResourceEventSignature(t *testing.T) { + old := config.Spec.ResourceEvent.HMACSecret + config.Spec.ResourceEvent.HMACSecret = "abc123" + defer func() { + config.Spec.ResourceEvent.HMACSecret = old + }() + + body := []byte(`{"event_id":"evt-1"}`) + mac := hmac.New(sha256.New, []byte("abc123")) + mac.Write(body) + signature := fmt.Sprintf("%x", mac.Sum(nil)) + if err := verifyResourceEventSignature(signature, body); err != nil { + t.Fatalf("expected signature to pass: %v", err) + } + if err := verifyResourceEventSignature("bad", body); err == nil { + t.Fatal("expected invalid signature error") + } +} + +func TestValidateEventTimeSkew(t *testing.T) { + old := config.Spec.ResourceEvent.MaxSkewSecs + config.Spec.ResourceEvent.MaxSkewSecs = 60 + defer func() { + config.Spec.ResourceEvent.MaxSkewSecs = old + }() + + if err := validateEventTimeSkew(time.Now()); err != nil { + t.Fatalf("expected current time to pass: %v", err) + } + if err := validateEventTimeSkew(time.Now().Add(-2 * time.Minute)); err == nil { + t.Fatal("expected skew validation to fail for old timestamp") + } +} + diff --git a/internal/models/alert_outbox.go b/internal/models/alert_outbox.go new file mode 100644 index 0000000..c8e5cc4 --- /dev/null +++ b/internal/models/alert_outbox.go @@ -0,0 +1,29 @@ +package models + +import "time" + +// AlertOutbox 表示待发送或重试中的告警任务。 +type AlertOutbox struct { + ID uint `gorm:"primaryKey" json:"id"` + + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + // LogEventID 关联日志事件 ID。 + LogEventID uint `gorm:"index" json:"log_event_id"` + // PayloadJSON 保存 AlertReceiveBody 的 JSON 文本。 + PayloadJSON string `gorm:"type:text" json:"payload_json"` + // Status 任务状态:pending/retrying/sent/dead。 + Status string `gorm:"size:32;index" json:"status"` + // RetryCount 已重试次数。 + RetryCount int `json:"retry_count"` + // NextRetryAt 下一次可重试时间。 + NextRetryAt time.Time `gorm:"index" json:"next_retry_at"` + // LastError 最近一次错误信息。 + LastError string `gorm:"type:text" json:"last_error"` +} + +func (AlertOutbox) TableName() string { + return "logs_alert_outbox" +} + diff --git a/internal/models/log_event.go b/internal/models/log_event.go index 7965b7c..814bbea 100644 --- a/internal/models/log_event.go +++ b/internal/models/log_event.go @@ -20,6 +20,18 @@ type LogEvent struct { NormalizedDetail string `gorm:"type:text" json:"normalized_detail"` // DeviceName 表示关联设备名称。 DeviceName string `gorm:"size:512;index" json:"device_name"` + // SourceIP 表示原始来源 IP(不含端口)。 + SourceIP string `gorm:"size:64;index" json:"source_ip"` + // ResourceType 表示关联到的资源类型。 + ResourceType string `gorm:"size:32;index" json:"resource_type"` + // ResourceID 表示关联到的资源 ID。 + ResourceID string `gorm:"size:128;index" json:"resource_id"` + // ResourceName 表示关联到的资源名称。 + ResourceName string `gorm:"size:256" json:"resource_name"` + // MatchMethod 表示资源命中方式(ip/hostname/none)。 + MatchMethod string `gorm:"size:32" json:"match_method"` + // DispatchStatus 表示告警分发状态(not_applicable/pending/retrying/sent/dead)。 + DispatchStatus string `gorm:"size:32;index" json:"dispatch_status"` // SeverityCode 表示告警/严重度编码。 SeverityCode string `gorm:"size:32" json:"severity_code"` // TrapOID 表示关联的 Trap OID(若来源为 trap)。 diff --git a/internal/models/query.go b/internal/models/query.go index 23861f5..411f9e7 100644 --- a/internal/models/query.go +++ b/internal/models/query.go @@ -1,9 +1,14 @@ package models +import "gorm.io/gorm" + // GetAllModels 数据库迁移用模型列表 func GetAllModels() []interface{} { return []interface{}{ &LogEvent{}, + &AlertOutbox{}, + &ResourceMapping{}, + &ResourceEventDedup{}, &TrapDictionaryEntry{}, &SyslogRule{}, &TrapRule{}, @@ -11,7 +16,104 @@ func GetAllModels() []interface{} { } } -// InitData 预留默认数据 -func InitData() error { +// InitData 初始化默认规则数据(幂等) +func InitData(db *gorm.DB) error { + if db == nil { + return nil + } + if err := seedDefaultSyslogRules(db); err != nil { + return err + } + if err := seedDefaultTrapRules(db); err != nil { + return err + } + if err := seedDefaultTrapDictionary(db); err != nil { + return err + } return nil } + +func seedDefaultSyslogRules(db *gorm.DB) error { + var cnt int64 + if err := db.Model(&SyslogRule{}).Count(&cnt).Error; err != nil { + return err + } + if cnt > 0 { + return nil + } + rows := []SyslogRule{ + { + Name: "默认-系统严重错误", + Enabled: true, + Priority: 100, + DeviceNameContains: "", + KeywordRegex: "(?i)(panic|fatal|segmentation fault|kernel panic|out of memory|oom)", + AlertName: "Syslog严重错误", + SeverityCode: "critical", + PolicyID: 0, + }, + { + Name: "默认-链路中断告警", + Enabled: true, + Priority: 90, + DeviceNameContains: "", + KeywordRegex: "(?i)(link down|interface .* down|port .* down)", + AlertName: "Syslog链路中断", + SeverityCode: "major", + PolicyID: 0, + }, + } + return db.Create(&rows).Error +} + +func seedDefaultTrapRules(db *gorm.DB) error { + var cnt int64 + if err := db.Model(&TrapRule{}).Count(&cnt).Error; err != nil { + return err + } + if cnt > 0 { + return nil + } + rows := []TrapRule{ + { + Name: "默认-Trap链路中断", + Enabled: true, + Priority: 100, + OIDPrefix: "1.3.6.1.6.3.1.1.5", + VarbindMatchRegex: "(?i)(linkdown|ifdown|down)", + AlertName: "SNMP Trap链路中断", + SeverityCode: "major", + PolicyID: 0, + }, + } + return db.Create(&rows).Error +} + +func seedDefaultTrapDictionary(db *gorm.DB) error { + var cnt int64 + if err := db.Model(&TrapDictionaryEntry{}).Count(&cnt).Error; err != nil { + return err + } + if cnt > 0 { + return nil + } + rows := []TrapDictionaryEntry{ + { + OIDPrefix: "1.3.6.1.6.3.1.1.5.3", + Title: "ifDown 接口中断", + Description: "检测到设备接口状态变为 down。", + SeverityCode: "major", + RecoveryMessage: "请检查链路、端口状态和对端设备。", + Enabled: true, + }, + { + OIDPrefix: "1.3.6.1.6.3.1.1.5.4", + Title: "ifUp 接口恢复", + Description: "检测到设备接口状态恢复为 up。", + SeverityCode: "info", + RecoveryMessage: "接口已恢复,请确认业务连通性。", + Enabled: true, + }, + } + return db.Create(&rows).Error +} diff --git a/internal/models/resource_event_dedup.go b/internal/models/resource_event_dedup.go new file mode 100644 index 0000000..aceb149 --- /dev/null +++ b/internal/models/resource_event_dedup.go @@ -0,0 +1,24 @@ +package models + +import "time" + +// ResourceEventDedup 用于资源事件幂等去重。 +type ResourceEventDedup struct { + ID uint `gorm:"primaryKey" json:"id"` + + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + // EventID 为外部事件唯一标识。 + EventID string `gorm:"size:128;uniqueIndex" json:"event_id"` + // EventTime 记录事件时间,便于排查重放问题。 + EventTime time.Time `json:"event_time"` + // ResourceType/ResourceID 便于定位被操作资源。 + ResourceType string `gorm:"size:32;index" json:"resource_type"` + ResourceID string `gorm:"size:128;index" json:"resource_id"` +} + +func (ResourceEventDedup) TableName() string { + return "logs_resource_event_dedup" +} + diff --git a/internal/models/resource_mapping.go b/internal/models/resource_mapping.go new file mode 100644 index 0000000..2cc7a3c --- /dev/null +++ b/internal/models/resource_mapping.go @@ -0,0 +1,37 @@ +package models + +import "time" + +// ResourceMapping 表示来自 dc-control 的资源映射快照。 +type ResourceMapping struct { + ID uint `gorm:"primaryKey" json:"id"` + // CreatedAt/UpdatedAt 由 GORM 维护。 + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + // ResourceType 资源类型(server/collector/device)。 + ResourceType string `gorm:"size:32;index:idx_logs_resource_unique,unique" json:"resource_type"` + // ResourceID 资源 ID(来自 dc-control)。 + ResourceID string `gorm:"size:128;index:idx_logs_resource_unique,unique" json:"resource_id"` + // ResourceName 资源名称。 + ResourceName string `gorm:"size:256" json:"resource_name"` + + // IPsJSON/HostnamesJSON/LabelsJSON 以 JSON 文本存储数组和标签。 + IPsJSON string `gorm:"type:text" json:"ips_json"` + HostnamesJSON string `gorm:"type:text" json:"hostnames_json"` + LabelsJSON string `gorm:"type:text" json:"labels_json"` + + // Version 用于处理乱序事件,仅允许新版本覆盖。 + Version int64 `gorm:"index" json:"version"` + // IsDeleted 表示逻辑删除。 + IsDeleted bool `gorm:"index" json:"is_deleted"` + // LastEventID 记录最后一次成功应用的事件 ID(幂等辅助)。 + LastEventID string `gorm:"size:128" json:"last_event_id"` + // EventTime 记录事件产生时间。 + EventTime time.Time `json:"event_time"` +} + +func (ResourceMapping) TableName() string { + return "logs_resource_mappings" +} + diff --git a/internal/routers/register.go b/internal/routers/register.go index c407ea9..1c406d7 100644 --- a/internal/routers/register.go +++ b/internal/routers/register.go @@ -39,6 +39,10 @@ func Register(srvKey string, engine *gin.Engine) { api.PUT("/trap-suppressions/:id", controllers.UpdateTrapShield) api.DELETE("/trap-suppressions/:id", controllers.DeleteTrapShield) + api.POST("/resource-events", controllers.ReceiveResourceEvent) + api.GET("/entries", controllers.ListLogEvents) + api.GET("/alert-outbox", controllers.ListAlertOutbox) + api.POST("/alert-outbox/:id/retry", controllers.RetryAlertOutbox) } } diff --git a/scripts/prepare_logs_e2e_data.py b/scripts/prepare_logs_e2e_data.py new file mode 100644 index 0000000..d237977 --- /dev/null +++ b/scripts/prepare_logs_e2e_data.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +import argparse +import json +from pathlib import Path +from typing import Any, Dict + +import psycopg2 +import yaml + + +def load_yaml(path: Path) -> Dict[str, Any]: + return yaml.safe_load(path.read_text(encoding="utf-8")) + + +def parse_pg_dsn(dsn: str) -> str: + parts = dsn.split() + kept = [] + timezone = None + for part in parts: + if "=" not in part: + kept.append(part) + continue + k, v = part.split("=", 1) + if k.lower() == "timezone": + timezone = v + continue + kept.append(part) + if timezone: + kept.append(f"options='-c timezone={timezone}'") + return " ".join(kept) + + +def main() -> int: + parser = argparse.ArgumentParser(description="日志管理 E2E 测试数据准备脚本") + parser.add_argument( + "--config", + default="d:/work/ops/logs/etc/logs_dev.yaml", + help="logs 配置文件路径", + ) + parser.add_argument("--run-id", required=True, help="本次测试 run id") + parser.add_argument("--cleanup-only", action="store_true", help="仅清理历史测试数据") + args = parser.parse_args() + + cfg = load_yaml(Path(args.config)) + dsn = parse_pg_dsn(cfg["Databases"]["Source"][0]) + run_id = args.run_id + marker = f"%[E2E:{run_id}]%" + + summary: Dict[str, Any] = {"run_id": run_id, "cleanup": {}, "seed": {}} + with psycopg2.connect(dsn) as conn: + with conn.cursor() as cur: + cur.execute("DELETE FROM logs_alert_outbox WHERE id IN (SELECT id FROM logs_alert_outbox ORDER BY id DESC LIMIT 0)") + cur.execute("DELETE FROM logs_syslog_rules WHERE name LIKE %s", (marker,)) + summary["cleanup"]["logs_syslog_rules"] = cur.rowcount + cur.execute("DELETE FROM logs_trap_rules WHERE name LIKE %s", (marker,)) + summary["cleanup"]["logs_trap_rules"] = cur.rowcount + cur.execute("DELETE FROM logs_trap_dictionary WHERE title LIKE %s", (marker,)) + summary["cleanup"]["logs_trap_dictionary"] = cur.rowcount + cur.execute("DELETE FROM logs_trap_shields WHERE name LIKE %s", (marker,)) + summary["cleanup"]["logs_trap_shields"] = cur.rowcount + cur.execute( + "DELETE FROM logs_resource_event_dedup WHERE event_id LIKE %s", + (f"e2e-{run_id}-%",), + ) + summary["cleanup"]["logs_resource_event_dedup"] = cur.rowcount + if not args.cleanup_only: + cur.execute( + """ + INSERT INTO logs_resource_mappings( + resource_type, resource_id, resource_name, ips_json, hostnames_json, labels_json, + version, is_deleted, last_event_id, event_time, created_at, updated_at + ) VALUES (%s,%s,%s,%s,%s,%s,%s,false,%s,now(),now(),now()) + ON CONFLICT (resource_type, resource_id) + DO UPDATE SET + resource_name=EXCLUDED.resource_name, + ips_json=EXCLUDED.ips_json, + hostnames_json=EXCLUDED.hostnames_json, + labels_json=EXCLUDED.labels_json, + version=EXCLUDED.version, + is_deleted=false, + last_event_id=EXCLUDED.last_event_id, + event_time=now(), + updated_at=now() + """, + ( + "server", + f"seed-{run_id}", + f"E2E-Seed-{run_id}", + json.dumps(["127.0.0.1"]), + json.dumps([f"e2e-host-{run_id}"]), + json.dumps({"source": "prepare_script"}), + 1, + f"e2e-{run_id}-seed", + ), + ) + summary["seed"]["resource_mapping"] = {"resource_type": "server", "resource_id": f"seed-{run_id}"} + conn.commit() + + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/run_e2e.ps1 b/scripts/run_e2e.ps1 new file mode 100644 index 0000000..f191c0e --- /dev/null +++ b/scripts/run_e2e.ps1 @@ -0,0 +1,108 @@ +# 日志管理全链路测试一键脚本: +# 1) 准备测试数据 +# 2) 运行 E2E 主测试 +# 3) 输出报告路径 +param( + # 一键模式:本地全量测试 + [switch]$Local, + # 一键模式:线上接口可控测试(自动跳过易受环境影响项) + [switch]$Online, + # 可选:指定本次测试唯一标识;不传则自动按时间生成 + [string]$RunId = "", + # 可选:接口鉴权 token(本服务要求 Authorization 头直接传 token) + [string]$Token = "", + # 可选:logs 配置文件路径 + [string]$Config = "d:/work/ops/logs/etc/logs_dev.yaml", + # 可选:logs 服务主机名(例如 127.0.0.1) + [string]$ApiHost = "127.0.0.1", + # 可选:syslog/trap 发送目标主机(默认跟随 ApiHost) + [string]$IngestHost = "", + # 可选:logs 完整 API 前缀(例如 https://ops-api.apinb.com/Logs/v1),优先级高于 ApiHost + [string]$BaseUrl = "", + # 可选:前端入口地址(用于入口联调检测) + [string]$FrontUrl = "http://127.0.0.1:5173/log-mgmt/entries" + , + # 可选:跳过前端入口检测(仅测后端链路) + [switch]$NoFront, + # 可选:跳过 resource-events 用例(线上未配置 hmac_secret 时可用) + [switch]$SkipResourceEvent, + # 可选:跳过 trap 接收用例(线上 trap 端口不可达时可用) + [switch]$SkipTrap +) + +$ErrorActionPreference = "Stop" + +# 便捷模式参数展开: +# -Online: 默认走线上 API,可控跳过前端/resource-events/trap +# -Local : 默认走本地全量 +if ($Online -and $Local) { + throw "不能同时指定 -Online 和 -Local" +} +if ($Online) { + if ([string]::IsNullOrWhiteSpace($BaseUrl)) { + $BaseUrl = "https://ops-api.apinb.com/Logs/v1" + } + $NoFront = $true + $SkipResourceEvent = $true + $SkipTrap = $true +} + +# 未传 RunId 时,按当前时间生成,便于报告文件唯一化 +if ([string]::IsNullOrWhiteSpace($RunId)) { + $RunId = Get-Date -Format "yyyyMMddHHmmss" +} + +# 先准备测试数据(清理+初始化) +python "d:/work/ops/logs/scripts/prepare_logs_e2e_data.py" --run-id $RunId --config $Config +if ($LASTEXITCODE -ne 0) { + throw "prepare_logs_e2e_data.py 执行失败,退出码: $LASTEXITCODE" +} + +# 组装主测试命令参数;按需跳过前端入口检查 +$args = @( + "d:/work/ops/logs/scripts/run_logs_e2e.py", + "--run-id", $RunId, + "--config", $Config, + "--front-url", $FrontUrl +) +if (-not [string]::IsNullOrWhiteSpace($BaseUrl)) { + $args += @("--base-url", $BaseUrl) +} elseif ($ApiHost -match "^https?://") { + $normalized = $ApiHost.TrimEnd("/") + if ($normalized.EndsWith("/Logs/v1")) { + $args += @("--base-url", $normalized) + } else { + $args += @("--base-url", "$normalized/Logs/v1") + } +} else { + $args += @("--host", $ApiHost) +} +if (-not [string]::IsNullOrWhiteSpace($Token)) { + $args += @("--token", $Token) +} +if ($NoFront) { + $args += @("--skip-front") +} +if (-not [string]::IsNullOrWhiteSpace($IngestHost)) { + $ingestTarget = $IngestHost + if ($ingestTarget -match "^https?://") { + try { + $ingestTarget = ([System.Uri]$ingestTarget).Host + } catch { + throw "IngestHost 格式无效: $IngestHost" + } + } + $args += @("--ingest-host", $ingestTarget) +} +if ($SkipResourceEvent) { + $args += @("--skip-resource-event") +} +if ($SkipTrap) { + $args += @("--skip-trap") +} +python @args +if ($LASTEXITCODE -ne 0) { + throw "run_logs_e2e.py 执行失败,退出码: $LASTEXITCODE" +} + +Write-Host "E2E报告: d:/work/ops/artifacts/logs_e2e_report_$RunId.md" diff --git a/scripts/run_logs_e2e.py b/scripts/run_logs_e2e.py new file mode 100644 index 0000000..d36e8f5 --- /dev/null +++ b/scripts/run_logs_e2e.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 +import argparse +import asyncio +import hashlib +import hmac +import json +import socket +import subprocess +import time +import uuid +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib import error, request + +import psycopg2 +import yaml +from pysnmp.hlapi.v3arch.asyncio import CommunityData, ContextData, NotificationType, ObjectIdentity, ObjectType, OctetString, SnmpEngine, UdpTransportTarget, send_notification + + +def now_utc() -> datetime: + return datetime.now(timezone.utc) + + +def rfc3339(dt: datetime) -> str: + return dt.replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def parse_pg_dsn(dsn: str) -> str: + parts = dsn.split() + kept = [] + timezone_value = None + for p in parts: + if "=" not in p: + kept.append(p) + continue + k, v = p.split("=", 1) + if k.lower() == "timezone": + timezone_value = v + continue + kept.append(p) + if timezone_value: + kept.append(f"options='-c timezone={timezone_value}'") + return " ".join(kept) + + +def load_token(default_path: Path) -> str: + if default_path.exists(): + for raw in default_path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if line.startswith("JWT_TOKEN="): + token = line.split("=", 1)[1].strip() + if token: + return token.replace("Bearer ", "") + return "" + + +def http_json(method: str, url: str, token: str = "", body: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None) -> Tuple[int, Dict[str, Any]]: + req_headers = {"Content-Type": "application/json"} + if token: + req_headers["Authorization"] = token + if headers: + req_headers.update(headers) + data = None + if body is not None: + data = json.dumps(body, ensure_ascii=False).encode("utf-8") + req = request.Request(url, data=data, method=method.upper(), headers=req_headers) + try: + with request.urlopen(req, timeout=12) as resp: + text = resp.read().decode("utf-8") + return resp.status, json.loads(text) if text else {} + except error.HTTPError as e: + text = e.read().decode("utf-8", errors="ignore") + try: + return e.code, json.loads(text) if text else {} + except json.JSONDecodeError: + return e.code, {"raw": text} + + +def payload_obj(p: Dict[str, Any]) -> Dict[str, Any]: + if isinstance(p.get("details"), dict): + return p["details"] + if isinstance(p.get("data"), dict): + return p["data"] + return {} + + +async def send_trap_async(addr: Tuple[str, int], run_id: str) -> None: + await send_notification( + SnmpEngine(), + CommunityData("public", mpModel=1), + await UdpTransportTarget.create(addr), + ContextData(), + "trap", + NotificationType(ObjectIdentity("1.3.6.1.4.1.8072.2.3.0.1")).add_varbinds( + ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0"), OctetString(f"E2E-TRAP-{run_id}")) + ), + ) + + +@dataclass +class Config: + base_url: str + syslog_addr: Tuple[str, int] + trap_addr: Tuple[str, int] + db_dsn: str + hmac_secret: str + token: str + run_id: str + front_url: str + skip_front: bool + skip_resource_event: bool + skip_trap: bool + + +class Runner: + def __init__(self, cfg: Config) -> None: + self.cfg = cfg + self.results: List[Dict[str, Any]] = [] + self.ctx: Dict[str, Any] = {} + self.failed = False + + def add(self, case_id: str, title: str, expected: str, actual: str, ok: bool, steps: List[str], severity: str = "none") -> None: + self.results.append( + { + "id": case_id, + "title": title, + "steps": steps, + "expected": expected, + "actual": actual, + "result": "PASS" if ok else "FAIL", + "severity": severity if not ok else "none", + } + ) + if not ok: + self.failed = True + print(f"[{'PASS' if ok else 'FAIL'}] {case_id} {title}") + + def query_one(self, sql: str, params: Tuple[Any, ...]) -> Optional[Dict[str, Any]]: + with psycopg2.connect(self.cfg.db_dsn) as conn: + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + if not row: + return None + cols = [x[0] for x in cur.description] + return {k: row[i] for i, k in enumerate(cols)} + + def query_all(self, sql: str, params: Tuple[Any, ...]) -> List[Dict[str, Any]]: + with psycopg2.connect(self.cfg.db_dsn) as conn: + with conn.cursor() as cur: + cur.execute(sql, params) + cols = [x[0] for x in cur.description] + out = [] + for row in cur.fetchall(): + out.append({k: row[i] for i, k in enumerate(cols)}) + return out + + def run(self) -> int: + self.case_health() + if self.cfg.skip_front: + self.add("TC-002", "前端关键入口服务可访问", "可按需跳过", "skip(--skip-front)", True, [f"GET {self.cfg.front_url}"], "major") + else: + self.case_front_smoke() + self.case_crud_rules() + if self.cfg.skip_resource_event: + self.add("TC-004", "resource-events 签名/时间窗/幂等", "可按需跳过", "skip(--skip-resource-event)", True, ["POST /resource-events"], "critical") + else: + self.case_resource_events() + self.case_syslog_ingest_and_entries() + if self.cfg.skip_trap: + self.add("TC-007", "Trap 接收与入库", "可按需跳过", "skip(--skip-trap)", True, [f"SNMP trap -> {self.cfg.trap_addr}"], "critical") + else: + self.case_trap_ingest() + self.case_outbox_flow() + self.write_report() + return 1 if self.failed else 0 + + def case_health(self) -> None: + status, payload = http_json("GET", f"{self.cfg.base_url}/ping/hello") + ok = status == 200 and payload.get("code") == 0 + self.add("TC-001", "logs 健康检查", "服务返回 code=0", f"status={status}, payload={payload}", ok, [f"GET {self.cfg.base_url}/ping/hello"], "critical") + + def case_front_smoke(self) -> None: + try: + with request.urlopen(self.cfg.front_url, timeout=8) as resp: + text = resp.read().decode("utf-8", errors="ignore") + ok = resp.status == 200 and " bool: + if not self.cfg.token: + return False + status, payload = http_json("GET", f"{self.cfg.base_url}/syslog-rules", token=self.cfg.token) + return status == 200 and payload.get("code") == 0 + + def case_crud_rules(self) -> None: + if not self.auth_ready(): + self.add( + "TC-003", + "规则 CRUD(syslog/trap/dictionary/suppression)", + "四类规则均可增删改查", + "鉴权失败(缺少有效 JWT 或 token 过期)", + False, + ["GET /syslog-rules 验证鉴权", "跳过后续 CRUD"], + "critical", + ) + return + suffix = f"[E2E:{self.cfg.run_id}]" + syslog_body = { + "name": f"{suffix}-syslog", + "enabled": True, + "priority": 999, + "device_name_contains": "127.0.0.1", + "keyword_regex": "E2E-SYSLOG", + "alert_name": f"{suffix}-syslog-alert", + "severity_code": "warning", + "policy_id": 0, + } + trap_rule_body = { + "name": f"{suffix}-trap-rule", + "enabled": True, + "priority": 998, + "oid_prefix": "1.3.6.1.4.1.8072", + "varbind_match_regex": "E2E-TRAP", + "alert_name": f"{suffix}-trap-alert", + "severity_code": "warning", + "policy_id": 0, + } + dict_body = { + "oid_prefix": f"1.3.6.1.4.1.8072.{int(time.time()) % 100000}", + "title": f"{suffix}-dict", + "description": "dict for e2e", + "severity_code": "warning", + "recovery_message": "recover", + "enabled": True, + } + suppression_body = { + "name": f"{suffix}-suppress", + "enabled": True, + "source_ip_cidr": "127.0.0.1/32", + "oid_prefix": "1.3.6.1.4.1.8072", + "interface_hint": "no-match", + "time_windows_json": "[]", + } + created_ids: List[Tuple[str, int]] = [] + try: + s1, p1 = http_json("POST", f"{self.cfg.base_url}/syslog-rules", token=self.cfg.token, body=syslog_body) + s2, p2 = http_json("POST", f"{self.cfg.base_url}/trap-rules", token=self.cfg.token, body=trap_rule_body) + s3, p3 = http_json("POST", f"{self.cfg.base_url}/trap-dictionary", token=self.cfg.token, body=dict_body) + s4, p4 = http_json("POST", f"{self.cfg.base_url}/trap-suppressions", token=self.cfg.token, body=suppression_body) + objs = [payload_obj(x) for x in [p1, p2, p3, p4]] + statuses_ok = all(x == 200 for x in [s1, s2, s3, s4]) + for ep, obj in zip(["syslog-rules", "trap-rules", "trap-dictionary", "trap-suppressions"], objs): + if obj.get("id"): + created_ids.append((ep, int(obj["id"]))) + ok = statuses_ok and len(created_ids) == 4 + self.add("TC-003", "规则 CRUD(syslog/trap/dictionary/suppression)", "四类规则创建成功", f"created={created_ids}", ok, ["POST 4类规则"]) + finally: + for ep, rid in created_ids: + http_json("DELETE", f"{self.cfg.base_url}/{ep}/{rid}", token=self.cfg.token) + + def case_resource_events(self) -> None: + if not self.auth_ready(): + self.add("TC-004", "resource-events 签名/时间窗/幂等", "签名和幂等校验生效", "鉴权失败,无法执行", False, ["POST /resource-events"], "critical") + return + base_event = { + "event_id": f"e2e-{self.cfg.run_id}-{uuid.uuid4().hex[:8]}", + "event_time": rfc3339(now_utc()), + "event_type": "resource.upsert", + "resource_type": "server", + "resource_id": f"res-{self.cfg.run_id}", + "resource_name": f"E2E Resource {self.cfg.run_id}", + "ips": ["127.0.0.1"], + "hostnames": [f"e2e-host-{self.cfg.run_id}"], + "labels": {"run_id": self.cfg.run_id}, + "version": 2, + } + raw = json.dumps(base_event, ensure_ascii=False).encode("utf-8") + sig = hmac.new(self.cfg.hmac_secret.encode("utf-8"), raw, hashlib.sha256).hexdigest() + s_ok, p_ok = http_json("POST", f"{self.cfg.base_url}/resource-events", token=self.cfg.token, body=base_event, headers={"X-Event-Signature": sig}) + s_dup, p_dup = http_json("POST", f"{self.cfg.base_url}/resource-events", token=self.cfg.token, body=base_event, headers={"X-Event-Signature": sig}) + bad = dict(base_event) + bad["event_id"] = f"{base_event['event_id']}-bad" + old_dt = now_utc() - timedelta(seconds=1000) + bad["event_time"] = rfc3339(old_dt) + raw_bad = json.dumps(bad, ensure_ascii=False).encode("utf-8") + sig_bad = hmac.new(self.cfg.hmac_secret.encode("utf-8"), raw_bad, hashlib.sha256).hexdigest() + s_old, p_old = http_json("POST", f"{self.cfg.base_url}/resource-events", token=self.cfg.token, body=bad, headers={"X-Event-Signature": sig_bad}) + invalid_sig_status, p_bad_sig = http_json("POST", f"{self.cfg.base_url}/resource-events", token=self.cfg.token, body=base_event, headers={"X-Event-Signature": "bad-sign"}) + base_ok = s_ok == 200 and payload_obj(p_ok).get("resource_id") == base_event["resource_id"] and payload_obj(p_dup).get("ignored") is True + # bsm-sdk 通常以 HTTP 200 + code!=0 返回错误,这里兼容两种语义。 + stale_rejected = s_old != 200 or p_old.get("code", 0) != 0 + bad_sig_rejected = invalid_sig_status != 200 or p_bad_sig.get("code", 0) != 0 + ok = base_ok and stale_rejected and bad_sig_rejected + self.ctx["resource_id"] = base_event["resource_id"] + self.add("TC-004", "resource-events 签名/时间窗/幂等", "首次成功、重复忽略、旧时间窗拒绝、坏签名拒绝", f"ok={s_ok}/{p_ok}, dup={p_dup}, stale={s_old}/{p_old}, bad_sig={invalid_sig_status}/{p_bad_sig}", ok, ["POST 正常事件", "POST 重复事件", "POST 超时事件", "POST 错签名事件"], "critical") + + def case_syslog_ingest_and_entries(self) -> None: + msg = f"<34>Apr 27 17:30:00 e2e-host-{self.cfg.run_id} app: E2E-SYSLOG-{self.cfg.run_id}" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(msg.encode("utf-8"), self.cfg.syslog_addr) + sock.close() + time.sleep(2) + row = self.query_one( + """ + SELECT id, source_kind, source_ip, resource_type, resource_id, match_method, dispatch_status + FROM logs_events + WHERE raw_payload LIKE %s + ORDER BY id DESC + LIMIT 1 + """, + (f"%E2E-SYSLOG-{self.cfg.run_id}%",), + ) + ok = row is not None and row.get("source_kind") == "syslog" + if row: + self.ctx["syslog_log_event_id"] = row["id"] + self.add("TC-005", "Syslog 接收与入库 + 资源关联写入", "syslog 事件入库且带 source_ip/resource/match_method", f"row={row}", ok, [f"UDP sendto {self.cfg.syslog_addr}"]) + + if not self.auth_ready(): + self.add("TC-006", "entries 查询筛选", "source_kind/resource/dispatch_status/log_event_id 可筛选", "鉴权失败,无法执行 API 筛选验证", False, ["GET /entries"], "major") + return + params = [ + f"source_kind=syslog", + f"resource_id={self.ctx.get('resource_id', '')}", + "dispatch_status=not_applicable", + f"log_event_id={self.ctx.get('syslog_log_event_id', 0)}", + "page=1&page_size=20", + ] + s, p = http_json("GET", f"{self.cfg.base_url}/entries?{'&'.join(params)}", token=self.cfg.token) + items = payload_obj(p).get("items", []) + ok2 = s == 200 and isinstance(items, list) + self.add("TC-006", "entries 查询筛选", "按组合条件可返回列表", f"status={s}, items={len(items) if isinstance(items,list) else 'n/a'}", ok2, [f"GET /entries?{'&'.join(params)}"]) + + def case_trap_ingest(self) -> None: + restored: List[Tuple[int, Dict[str, Any]]] = [] + try: + # 预处理:若存在“全量屏蔽 trap”的规则,会导致任何 trap 都不入库;测试期间暂时关闭并在结束后恢复。 + if self.auth_ready(): + s0, p0 = http_json("GET", f"{self.cfg.base_url}/trap-suppressions", token=self.cfg.token) + if s0 == 200: + for row in payload_obj(p0).get("items", []): + if not row.get("enabled", False): + continue + if str(row.get("source_ip_cidr", "")).strip() == "" and str(row.get("oid_prefix", "")).strip() == "" and str(row.get("interface_hint", "")).strip() == "" and str(row.get("time_windows_json", "")).strip() == "": + rid = int(row.get("id", 0)) + if rid > 0: + body = dict(row) + body["enabled"] = False + http_json("PUT", f"{self.cfg.base_url}/trap-suppressions/{rid}", token=self.cfg.token, body=body) + restored.append((rid, row)) + before = self.query_one( + "SELECT COUNT(1) AS cnt FROM logs_events WHERE source_kind='snmp_trap'", + (), + ) + # 先用 gosnmp 发送,保证与服务端 TrapListener 编码兼容;再发一份 pysnmp。 + subprocess.run( + ["go", "run", "./scripts/send_trap.go", self.cfg.trap_addr[0], f"E2E-TRAP-{self.cfg.run_id}"], + check=True, + capture_output=True, + text=True, + cwd="d:/work/ops/logs", + ) + asyncio.run(send_trap_async(self.cfg.trap_addr, self.cfg.run_id)) + time.sleep(3) + row = self.query_one( + """ + SELECT id, source_kind, trap_o_id, raw_payload, created_at + FROM logs_events + WHERE source_kind='snmp_trap' + ORDER BY id DESC LIMIT 1 + """, + (), + ) + after = self.query_one( + "SELECT COUNT(1) AS cnt FROM logs_events WHERE source_kind='snmp_trap'", + (), + ) + before_cnt = int((before or {}).get("cnt", 0)) + after_cnt = int((after or {}).get("cnt", 0)) + ok = row is not None and after_cnt > before_cnt + self.add( + "TC-007", + "Trap 接收与入库", + "snmp_trap 事件写入 logs_events", + f"before={before_cnt}, after={after_cnt}, latest={row}", + ok, + [f"SNMP trap -> {self.cfg.trap_addr}"], + "critical", + ) + except Exception as e: + self.add("TC-007", "Trap 接收与入库", "snmp_trap 事件写入", str(e), False, [f"SNMP trap -> {self.cfg.trap_addr}"], "critical") + finally: + for rid, row in restored: + http_json("PUT", f"{self.cfg.base_url}/trap-suppressions/{rid}", token=self.cfg.token, body=row) + + def case_outbox_flow(self) -> None: + rows = self.query_all( + """ + SELECT o.id, o.status, o.retry_count, o.log_event_id, e.dispatch_status + FROM logs_alert_outbox o + LEFT JOIN logs_events e ON e.id = o.log_event_id + ORDER BY o.id DESC + LIMIT 10 + """, + (), + ) + has_chain = any(r["status"] in ("pending", "retrying", "sent", "dead") for r in rows) + manual_retry_ok = False + detail = {"rows": rows} + if self.auth_ready() and rows: + target = rows[0]["id"] + s, p = http_json("POST", f"{self.cfg.base_url}/alert-outbox/{target}/retry", token=self.cfg.token) + manual_retry_ok = s == 200 and payload_obj(p).get("status") == "pending" + detail["manual_retry"] = {"status": s, "payload": p} + ok = has_chain and (manual_retry_ok or not self.auth_ready()) + if not self.auth_ready(): + detail["manual_retry"] = "skip(鉴权失败)" + self.add("TC-008", "outbox 链路(入队/worker/状态流转/手动重试)", "存在 outbox 状态流转,手动重试可重置 pending", json.dumps(detail, ensure_ascii=False), ok, ["查 logs_alert_outbox", "POST /alert-outbox/:id/retry"], "major") + + def write_report(self) -> None: + start = now_utc() + end = now_utc() + report_path = Path(f"d:/work/ops/artifacts/logs_e2e_report_{self.cfg.run_id}.md") + report_path.parent.mkdir(parents=True, exist_ok=True) + passed = sum(1 for x in self.results if x["result"] == "PASS") + failed = len(self.results) - passed + issues = [x for x in self.results if x["result"] == "FAIL"] + lines: List[str] = [] + lines.append("# 日志管理全链路测试报告") + lines.append("") + lines.append("## 测试范围") + lines.append("- Syslog/Trap 接收与入库") + lines.append("- 规则 CRUD(syslog/trap/dictionary/suppression)") + lines.append("- resource-events(签名、时间窗、幂等)") + lines.append("- 资源关联字段落库(resource_type/resource_id/match_method/source_ip)") + lines.append("- entries 筛选(source_kind/resource_type/resource_id/dispatch_status/log_event_id)") + lines.append("- outbox(入队、worker、状态、手动重试)") + lines.append("- 前端关键入口联调(日志页、告警队列入口)") + lines.append("") + lines.append("## 环境信息") + lines.append(f"- 执行时间: {rfc3339(start)} ~ {rfc3339(end)}") + lines.append(f"- logs API: `{self.cfg.base_url}`") + lines.append(f"- syslog: `{self.cfg.syslog_addr[0]}:{self.cfg.syslog_addr[1]}`") + lines.append(f"- trap: `{self.cfg.trap_addr[0]}:{self.cfg.trap_addr[1]}`") + lines.append(f"- front: `{self.cfg.front_url}`") + lines.append(f"- run_id: `{self.cfg.run_id}`") + lines.append("") + lines.append("## 用例清单(编号、步骤、预期、实际、结论)") + for r in self.results: + lines.append(f"- **{r['id']} {r['title']}**") + lines.append(f" - 步骤: {'; '.join(r['steps'])}") + lines.append(f" - 预期: {r['expected']}") + lines.append(f" - 实际: {r['actual']}") + lines.append(f" - 结论: {r['result']}") + lines.append("") + lines.append("## 问题清单(严重级别)") + if not issues: + lines.append("- 无失败项。") + else: + for i in issues: + lines.append(f"- [{i['severity'].upper()}] {i['id']} {i['title']}:{i['actual']}") + lines.append("") + lines.append("## 链路结论(是否可上线联调)") + if failed == 0: + lines.append(f"- 结论:可上线联调({passed} 通过 / {failed} 失败)。") + else: + lines.append(f"- 结论:暂不建议上线联调({passed} 通过 / {failed} 失败)。") + lines.append("- 建议先修复高优先级失败项后再回归。") + report_path.write_text("\n".join(lines), encoding="utf-8") + print(f"REPORT_PATH={report_path}") + + +def build_config(args: argparse.Namespace) -> Config: + data = yaml.safe_load(Path(args.config).read_text(encoding="utf-8")) + host = args.host + if args.base_url: + base_url = args.base_url.rstrip("/") + else: + base_url = f"http://{host}:{data['Port']}/Logs/v1" + syslog_port = int(str(data["Ingest"]["syslog_listen_addr"]).split(":")[-1]) + trap_port = int(str(data["Ingest"]["trap_listen_addr"]).split(":")[-1]) + token = args.token or load_token(Path("d:/work/ops/scripts/test_alert_dispatch.env")) + run_id = args.run_id or datetime.now().strftime("%Y%m%d%H%M%S") + return Config( + base_url=base_url, + syslog_addr=(args.ingest_host or host, syslog_port), + trap_addr=(args.ingest_host or host, trap_port), + db_dsn=parse_pg_dsn(data["Databases"]["Source"][0]), + hmac_secret=data["ResourceEvent"]["hmac_secret"], + token=token, + run_id=run_id, + front_url=args.front_url, + skip_front=args.skip_front, + skip_resource_event=args.skip_resource_event, + skip_trap=args.skip_trap, + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description="日志管理全链路测试脚本(真实执行)") + parser.add_argument("--config", default="d:/work/ops/logs/etc/logs_dev.yaml") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--ingest-host", default="", help="syslog/trap 发送目标主机,默认与 --host 相同") + parser.add_argument("--base-url", default="", help="完整 API 前缀,例如 https://ops-api.apinb.com/Logs/v1") + parser.add_argument("--token", default="", help="Authorization 值(例如 Bearer xxx)") + parser.add_argument("--run-id", default="") + parser.add_argument("--front-url", default="http://127.0.0.1:5173/log-mgmt/entries") + parser.add_argument("--skip-front", action="store_true", help="跳过前端入口检查") + parser.add_argument("--skip-resource-event", action="store_true", help="跳过 resource-events 用例") + parser.add_argument("--skip-trap", action="store_true", help="跳过 trap 接收用例") + args = parser.parse_args() + + cfg = build_config(args) + runner = Runner(cfg) + return runner.run() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/send_trap.go b/scripts/send_trap.go new file mode 100644 index 0000000..93ec983 --- /dev/null +++ b/scripts/send_trap.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/gosnmp/gosnmp" +) + +func main() { + host := "127.0.0.1" + port := uint16(9162) + msg := "E2E-TRAP-GO" + if len(os.Args) > 1 && os.Args[1] != "" { + host = os.Args[1] + } + if len(os.Args) > 2 && os.Args[2] != "" { + msg = os.Args[2] + } + g := &gosnmp.GoSNMP{ + Target: host, + Port: port, + Version: gosnmp.Version2c, + Community: "public", + Timeout: 2 * time.Second, + Retries: 1, + } + if err := g.Connect(); err != nil { + panic(err) + } + defer g.Conn.Close() + + trap := gosnmp.SnmpTrap{ + Variables: []gosnmp.SnmpPDU{ + { + Name: "1.3.6.1.2.1.1.1.0", + Type: gosnmp.OctetString, + Value: msg, + }, + }, + } + if _, err := g.SendTrap(trap); err != nil { + panic(err) + } + fmt.Println("trap_sent") +}