Files
collector/cmd/main.go
2026-04-07 14:46:49 +08:00

149 lines
3.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"git.apinb.com/quant/collector/collector"
"git.apinb.com/quant/collector/storage"
"github.com/robfig/cron/v3"
)
func main() {
log.Println("=== QMT数据采集器启动 ===")
// 从环境变量获取配置
collectorURL := getEnv("COLLECTOR_URL", "http://localhost:5000/status")
dbConnStr := getEnv("DATABASE_URL", "host=139.224.247.176 user=postgres password=Stock0310~! dbname=stock_prod port=19432 sslmode=disable TimeZone=Asia/Shanghai")
interval := getEnvAsInt("COLLECTION_INTERVAL", 60) // 默认60秒
log.Printf("采集地址: %s", collectorURL)
log.Printf("采集间隔: %d秒", interval)
// 创建采集器
coll := collector.NewCollector(collectorURL)
// 创建数据库存储
store, err := storage.NewStorage(dbConnStr)
if err != nil {
log.Fatalf("数据库连接失败: %v", err)
}
defer store.Close()
// 自动迁移数据库表结构
if err := store.AutoMigrate(); err != nil {
log.Fatalf("数据库迁移失败: %v", err)
}
// 创建cron调度器
c := cron.New(cron.WithSeconds())
// 构建cron表达式 (每N秒执行一次)
cronSpec := fmt.Sprintf("@every %ds", interval)
log.Printf("定时任务表达式: %s", cronSpec)
// 添加定时任务
_, err = c.AddFunc(cronSpec, func() {
runCollection(coll, store)
})
if err != nil {
log.Fatalf("添加定时任务失败: %v", err)
}
// 启动调度器
c.Start()
log.Println("定时任务已启动")
// 等待退出信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("收到退出信号,正在关闭...")
c.Stop()
log.Println("采集器已停止")
}
// runCollection 执行一次数据采集和存储
func runCollection(coll *collector.Collector, store *storage.Storage) {
log.Println("开始采集...")
// 计算Ymd
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 采集数据并检查变化
status, dataHash, changed, err := coll.CollectAndCheck()
if err != nil {
log.Printf("采集失败: %v", err)
// 记录失败的日志
if err := store.SaveCollectionLog("", ymd, false, err.Error()); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
log.Printf("数据哈希: %s", dataHash)
log.Printf("数据是否变化: %v", changed)
// 如果数据没有变化,只记录日志
if !changed {
log.Println("数据未变化,跳过存储")
if err := store.SaveCollectionLog(dataHash, ymd, false, "数据未变化"); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
// 数据有变化,保存到数据库
log.Println("数据已变化,开始存储到数据库...")
if err := store.SaveStatus(status, dataHash); err != nil {
log.Printf("保存数据失败: %v", err)
// 记录失败的日志
if err := store.SaveCollectionLog(dataHash, ymd, true, err.Error()); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
// 记录成功的日志
if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
log.Printf("数据存储成功 - 资产账户: %s, 订单数: %d, 持仓数: %d, 行情数: %d",
status.Data.Assets.AccountID,
len(status.Data.Orders),
len(status.Data.Positions),
len(status.Data.TickData))
}
// getEnv 获取环境变量,如果不存在则返回默认值
func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
// getEnvAsInt 获取环境变量并转换为整数
func getEnvAsInt(key string, defaultValue int) int {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
result, err := strconv.Atoi(value)
if err != nil {
log.Printf("环境变量 %s 转换失败: %v使用默认值 %d", key, err, defaultValue)
return defaultValue
}
return result
}