package storage import ( "fmt" "log" "time" "git.apinb.com/quant/collector/models" "git.apinb.com/quant/collector/types" "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/clause" "gorm.io/gorm/logger" ) // Storage 数据库存储器 type Storage struct { db *gorm.DB } // NewStorage 创建新的数据库连接 func NewStorage(connStr string) (*Storage, error) { // 配置GORM日志 newLogger := logger.New( log.New(log.Writer(), "\r\n", log.LstdFlags), logger.Config{ SlowThreshold: time.Second, LogLevel: logger.Warn, IgnoreRecordNotFoundError: true, Colorful: true, }, ) db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{ Logger: newLogger, }) if err != nil { return nil, fmt.Errorf("打开数据库连接失败: %w", err) } // 获取底层的sql.DB以设置连接池 sqlDB, err := db.DB() if err != nil { return nil, fmt.Errorf("获取数据库实例失败: %w", err) } // 设置连接池参数 sqlDB.SetMaxOpenConns(25) sqlDB.SetMaxIdleConns(5) sqlDB.SetConnMaxLifetime(5 * time.Minute) log.Println("数据库连接成功") return &Storage{db: db}, nil } // Close 关闭数据库连接 func (s *Storage) Close() error { if s.db != nil { sqlDB, err := s.db.DB() if err != nil { return err } return sqlDB.Close() } return nil } // AutoMigrate 自动迁移数据库表结构 func (s *Storage) AutoMigrate() error { log.Println("开始自动迁移数据库表结构...") err := s.db.AutoMigrate( &models.CollectorAssets{}, &models.CollectorOrder{}, &models.CollectorPosition{}, &models.CollectorTick{}, &models.CollectorLog{}, ) if err != nil { return fmt.Errorf("自动迁移失败: %w", err) } log.Println("数据库表结构迁移完成") return nil } // SaveStatus 保存完整状态数据(使用事务) func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { return s.db.Transaction(func(tx *gorm.DB) error { // 计算Ymd (年月日数字格式,如20260407) now := time.Now() ymd := now.Year()*10000 + int(now.Month())*100 + now.Day() // 保存资产快照 (Upsert: 存在则更新,不存在则插入) asset := models.CollectorAssets{ AccountID: status.Data.Assets.AccountID, Ymd: ymd, Cash: status.Data.Assets.Cash, FrozenCash: status.Data.Assets.FrozenCash, MarketValue: status.Data.Assets.MarketValue, Profit: status.Data.Assets.Profit, TotalAsset: status.Data.Assets.TotalAsset, DataHash: dataHash, CollectedAt: now, } // 使用GORM的Clauses实现Upsert if err := tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "account_id"}, {Name: "ymd"}}, DoUpdates: clause.AssignmentColumns([]string{"cash", "frozen_cash", "market_value", "profit", "total_asset", "data_hash", "collected_at"}), }).Create(&asset).Error; err != nil { return fmt.Errorf("保存资产快照失败: %w", err) } // 批量保存订单 (Upsert: 存在则更新,不存在则插入) if len(status.Data.Orders) > 0 { orders := make([]models.CollectorOrder, 0, len(status.Data.Orders)) for _, order := range status.Data.Orders { orders = append(orders, models.CollectorOrder{ OrderID: order.OrderID, AccountID: status.Data.Assets.AccountID, StockCode: order.StockCode, Ymd: ymd, Price: order.Price, Volume: order.Volume, TradedPrice: order.TradedPrice, TradedVolume: order.TradedVolume, OrderStatus: order.OrderStatus, OrderTime: order.OrderTime, OrderRemark: order.OrderRemark, DataHash: dataHash, CollectedAt: now, }) } // 使用Upsert逻辑: 以 account_id, order_id, ymd 为条件 if err := tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "account_id"}, {Name: "order_id"}, {Name: "ymd"}}, DoUpdates: clause.AssignmentColumns([]string{ "stock_code", "price", "volume", "traded_price", "traded_volume", "order_status", "order_time", "order_remark", "data_hash", "collected_at", }), }).Create(&orders).Error; err != nil { return fmt.Errorf("保存订单失败: %w", err) } } // 批量保存持仓 (Upsert: 存在则更新,不存在则插入) if len(status.Data.Positions) > 0 { positions := make([]models.CollectorPosition, 0, len(status.Data.Positions)) for _, pos := range status.Data.Positions { positions = append(positions, models.CollectorPosition{ AccountID: status.Data.Assets.AccountID, Code: pos.Code, Ymd: ymd, Volume: pos.Volume, CanUseVolume: pos.CanUseVolume, FrozenVolume: pos.FrozenVolume, AvgPrice: pos.AvgPrice, OpenPrice: pos.OpenPrice, CurrentPrice: pos.CurrentPrice, MarketValue: pos.MarketValue, Profit: pos.Profit, ProfitRate: pos.ProfitRate, MinProfitRate: pos.MinProfitRate, DataHash: dataHash, CollectedAt: now, }) } // 使用Upsert逻辑: 以 account_id, code, ymd 为条件 if err := tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "account_id"}, {Name: "code"}, {Name: "ymd"}}, DoUpdates: clause.AssignmentColumns([]string{ "volume", "can_use_volume", "frozen_volume", "avg_price", "open_price", "current_price", "market_value", "profit", "profit_rate", "min_profit_rate", "data_hash", "collected_at", }), }).Create(&positions).Error; err != nil { return fmt.Errorf("保存持仓失败: %w", err) } } // 批量保存行情数据 (Upsert: 存在则更新,不存在则插入) if len(status.Data.TickData) > 0 { ticks := make([]models.CollectorTick, 0, len(status.Data.TickData)) for code, tick := range status.Data.TickData { ticks = append(ticks, models.CollectorTick{ StockCode: code, Ymd: ymd, LastPrice: tick.LastPrice, Open: tick.Open, High: tick.High, Low: tick.Low, LastClose: tick.LastClose, Volume: tick.Volume, Amount: tick.Amount, PVolume: tick.PVolume, BidPrices: tick.BidPrice, BidVolumes: tick.BidVol, AskPrices: tick.AskPrice, AskVolumes: tick.AskVol, Time: tick.Time, TimeTag: tick.TimeTag, StockStatus: tick.StockStatus, DataHash: dataHash, CollectedAt: now, }) } // 使用Upsert逻辑: 以 stock_code, ymd 为条件 if err := tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "stock_code"}, {Name: "ymd"}}, DoUpdates: clause.AssignmentColumns([]string{ "last_price", "open", "high", "low", "last_close", "volume", "amount", "pvolume", "bid_prices", "bid_volumes", "ask_prices", "ask_volumes", "time", "timetag", "stock_status", "data_hash", "collected_at", }), }).Create(&ticks).Error; err != nil { return fmt.Errorf("保存行情数据失败: %w", err) } } return nil }) } // SaveCollectionLog 保存采集日志 func (s *Storage) SaveCollectionLog(dataHash string, ymd int, hasChanged bool, statusMessage string) error { log := models.CollectorLog{ DataHash: dataHash, Ymd: ymd, HasChanged: hasChanged, StatusMessage: statusMessage, CollectedAt: time.Now(), } if err := s.db.Create(&log).Error; err != nil { return fmt.Errorf("保存采集日志失败: %w", err) } return nil } // GetDB 获取GORM DB实例(用于高级查询) func (s *Storage) GetDB() *gorm.DB { return s.db }