Files
collector/storage/storage.go

280 lines
8.0 KiB
Go
Raw Normal View History

2026-04-07 12:22:57 +08:00
package storage
import (
"fmt"
"log"
"time"
"git.apinb.com/quant/collector/models"
2026-04-07 14:46:49 +08:00
"git.apinb.com/quant/collector/types"
2026-04-07 12:22:57 +08:00
"gorm.io/driver/postgres"
"gorm.io/gorm"
2026-04-07 14:46:49 +08:00
"gorm.io/gorm/clause"
2026-04-07 12:22:57 +08:00
"gorm.io/gorm/logger"
)
// Storage 数据库存储器
type Storage struct {
db *gorm.DB
}
// NewStorage 创建新的数据库连接
func NewStorage(connStr string) (*Storage, error) {
// 配置GORM日志
newLogger := logger.New(
log.New(log.Writer(), "\r\n", log.LstdFlags),
logger.Config{
SlowThreshold: time.Second,
LogLevel: logger.Warn,
IgnoreRecordNotFoundError: true,
Colorful: true,
},
)
db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{
Logger: newLogger,
})
if err != nil {
return nil, fmt.Errorf("打开数据库连接失败: %w", err)
}
// 获取底层的sql.DB以设置连接池
sqlDB, err := db.DB()
if err != nil {
return nil, fmt.Errorf("获取数据库实例失败: %w", err)
}
// 设置连接池参数
sqlDB.SetMaxOpenConns(25)
sqlDB.SetMaxIdleConns(5)
sqlDB.SetConnMaxLifetime(5 * time.Minute)
log.Println("数据库连接成功")
return &Storage{db: db}, nil
}
// Close 关闭数据库连接
func (s *Storage) Close() error {
if s.db != nil {
sqlDB, err := s.db.DB()
if err != nil {
return err
}
return sqlDB.Close()
}
return nil
}
// AutoMigrate 自动迁移数据库表结构
func (s *Storage) AutoMigrate() error {
log.Println("开始自动迁移数据库表结构...")
err := s.db.AutoMigrate(
2026-04-07 14:46:49 +08:00
&models.CollectorAssets{},
&models.CollectorOrder{},
&models.CollectorPosition{},
&models.CollectorTick{},
&models.CollectorLog{},
2026-04-07 12:22:57 +08:00
)
if err != nil {
return fmt.Errorf("自动迁移失败: %w", err)
}
log.Println("数据库表结构迁移完成")
return nil
}
// SaveStatus 保存完整状态数据(使用事务)
2026-04-07 14:46:49 +08:00
func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
2026-04-07 21:40:29 +08:00
// 验证必要字段 - AccountID是所有Upsert的共同条件
if status.Data.Assets.AccountID == "" {
return fmt.Errorf("账户ID不能为空")
}
2026-04-07 12:22:57 +08:00
return s.db.Transaction(func(tx *gorm.DB) error {
2026-04-07 14:46:49 +08:00
// 计算Ymd (年月日数字格式,如20260407)
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 保存资产快照 (Upsert: 存在则更新,不存在则插入)
asset := models.CollectorAssets{
2026-04-07 12:22:57 +08:00
AccountID: status.Data.Assets.AccountID,
2026-04-07 14:46:49 +08:00
Ymd: ymd,
2026-04-07 12:22:57 +08:00
Cash: status.Data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue,
Profit: status.Data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset,
DataHash: dataHash,
2026-04-07 14:46:49 +08:00
CollectedAt: now,
2026-04-07 12:22:57 +08:00
}
2026-04-07 14:46:49 +08:00
// 使用GORM的Clauses实现Upsert
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "account_id"}, {Name: "ymd"}},
DoUpdates: clause.AssignmentColumns([]string{"cash", "frozen_cash", "market_value", "profit", "total_asset", "data_hash", "collected_at"}),
}).Create(&asset).Error; err != nil {
2026-04-07 12:22:57 +08:00
return fmt.Errorf("保存资产快照失败: %w", err)
}
2026-04-07 14:46:49 +08:00
// 批量保存订单 (Upsert: 存在则更新,不存在则插入)
2026-04-07 12:22:57 +08:00
if len(status.Data.Orders) > 0 {
2026-04-07 14:46:49 +08:00
orders := make([]models.CollectorOrder, 0, len(status.Data.Orders))
2026-04-07 12:22:57 +08:00
for _, order := range status.Data.Orders {
2026-04-07 21:40:29 +08:00
// 验证Upsert必要条件: OrderID和StockCode
if order.OrderID == 0 {
continue
}
if order.StockCode == "" {
continue
}
2026-04-07 14:46:49 +08:00
orders = append(orders, models.CollectorOrder{
2026-04-07 12:22:57 +08:00
OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID,
StockCode: order.StockCode,
2026-04-07 14:46:49 +08:00
Ymd: ymd,
2026-04-07 12:22:57 +08:00
Price: order.Price,
Volume: order.Volume,
TradedPrice: order.TradedPrice,
TradedVolume: order.TradedVolume,
OrderStatus: order.OrderStatus,
OrderTime: order.OrderTime,
OrderRemark: order.OrderRemark,
DataHash: dataHash,
2026-04-07 14:46:49 +08:00
CollectedAt: now,
2026-04-07 12:22:57 +08:00
})
}
2026-04-07 14:46:49 +08:00
// 使用Upsert逻辑: 以 account_id, order_id, ymd 为条件
2026-04-07 21:40:29 +08:00
if len(orders) > 0 {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "account_id"}, {Name: "order_id"}, {Name: "ymd"}},
DoUpdates: clause.AssignmentColumns([]string{
"stock_code", "price", "volume",
"traded_price", "traded_volume", "order_status",
"order_time", "order_remark", "data_hash", "collected_at",
}),
}).Create(&orders).Error; err != nil {
return fmt.Errorf("保存订单失败: %w", err)
}
2026-04-07 12:22:57 +08:00
}
}
2026-04-07 14:46:49 +08:00
// 批量保存持仓 (Upsert: 存在则更新,不存在则插入)
2026-04-07 12:22:57 +08:00
if len(status.Data.Positions) > 0 {
2026-04-07 14:46:49 +08:00
positions := make([]models.CollectorPosition, 0, len(status.Data.Positions))
2026-04-07 12:22:57 +08:00
for _, pos := range status.Data.Positions {
2026-04-07 21:40:29 +08:00
// 验证Upsert必要条件: Code
if pos.Code == "" {
continue
}
2026-04-07 14:46:49 +08:00
positions = append(positions, models.CollectorPosition{
2026-04-07 12:22:57 +08:00
AccountID: status.Data.Assets.AccountID,
Code: pos.Code,
2026-04-07 14:46:49 +08:00
Ymd: ymd,
2026-04-07 12:22:57 +08:00
Volume: pos.Volume,
CanUseVolume: pos.CanUseVolume,
FrozenVolume: pos.FrozenVolume,
AvgPrice: pos.AvgPrice,
OpenPrice: pos.OpenPrice,
CurrentPrice: pos.CurrentPrice,
MarketValue: pos.MarketValue,
Profit: pos.Profit,
ProfitRate: pos.ProfitRate,
MinProfitRate: pos.MinProfitRate,
DataHash: dataHash,
2026-04-07 14:46:49 +08:00
CollectedAt: now,
2026-04-07 12:22:57 +08:00
})
}
2026-04-07 14:46:49 +08:00
// 使用Upsert逻辑: 以 account_id, code, ymd 为条件
2026-04-07 21:40:29 +08:00
if len(positions) > 0 {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "account_id"}, {Name: "code"}, {Name: "ymd"}},
DoUpdates: clause.AssignmentColumns([]string{
"volume", "can_use_volume", "frozen_volume",
"avg_price", "open_price", "current_price",
"market_value", "profit", "profit_rate",
"min_profit_rate", "data_hash", "collected_at",
}),
}).Create(&positions).Error; err != nil {
return fmt.Errorf("保存持仓失败: %w", err)
}
2026-04-07 12:22:57 +08:00
}
}
2026-04-07 14:46:49 +08:00
// 批量保存行情数据 (Upsert: 存在则更新,不存在则插入)
2026-04-07 12:22:57 +08:00
if len(status.Data.TickData) > 0 {
2026-04-07 14:46:49 +08:00
ticks := make([]models.CollectorTick, 0, len(status.Data.TickData))
2026-04-07 12:22:57 +08:00
for code, tick := range status.Data.TickData {
2026-04-07 21:40:29 +08:00
// 验证Upsert必要条件: StockCode
if code == "" {
continue
}
2026-04-07 14:46:49 +08:00
ticks = append(ticks, models.CollectorTick{
2026-04-07 12:22:57 +08:00
StockCode: code,
2026-04-07 14:46:49 +08:00
Ymd: ymd,
2026-04-07 12:22:57 +08:00
LastPrice: tick.LastPrice,
Open: tick.Open,
High: tick.High,
Low: tick.Low,
LastClose: tick.LastClose,
Volume: tick.Volume,
Amount: tick.Amount,
PVolume: tick.PVolume,
BidPrices: tick.BidPrice,
BidVolumes: tick.BidVol,
AskPrices: tick.AskPrice,
AskVolumes: tick.AskVol,
Time: tick.Time,
TimeTag: tick.TimeTag,
StockStatus: tick.StockStatus,
DataHash: dataHash,
2026-04-07 14:46:49 +08:00
CollectedAt: now,
2026-04-07 12:22:57 +08:00
})
}
2026-04-07 14:46:49 +08:00
// 使用Upsert逻辑: 以 stock_code, ymd 为条件
2026-04-07 21:40:29 +08:00
if len(ticks) > 0 {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "stock_code"}, {Name: "ymd"}},
DoUpdates: clause.AssignmentColumns([]string{
"last_price", "open", "high", "low", "last_close",
"volume", "amount", "pvolume",
"bid_prices", "bid_volumes", "ask_prices", "ask_volumes",
"time", "timetag", "stock_status",
"data_hash", "collected_at",
}),
}).Create(&ticks).Error; err != nil {
return fmt.Errorf("保存行情数据失败: %w", err)
}
2026-04-07 12:22:57 +08:00
}
}
return nil
})
}
// SaveCollectionLog 保存采集日志
2026-04-07 14:46:49 +08:00
func (s *Storage) SaveCollectionLog(dataHash string, ymd int, hasChanged bool, statusMessage string) error {
log := models.CollectorLog{
2026-04-07 12:22:57 +08:00
DataHash: dataHash,
2026-04-07 14:46:49 +08:00
Ymd: ymd,
2026-04-07 12:22:57 +08:00
HasChanged: hasChanged,
StatusMessage: statusMessage,
CollectedAt: time.Now(),
}
if err := s.db.Create(&log).Error; err != nil {
return fmt.Errorf("保存采集日志失败: %w", err)
}
return nil
}
// GetDB 获取GORM DB实例(用于高级查询)
func (s *Storage) GetDB() *gorm.DB {
return s.db
}