From f27bd7527876b291f48769d4968b6b8442a3e187 Mon Sep 17 00:00:00 2001 From: yanweidong Date: Wed, 8 Apr 2026 10:26:22 +0800 Subject: [PATCH] optz --- models/models.go | 4 -- storage/storage.go | 155 ++++++++++++++++++++++++++------------------- 2 files changed, 89 insertions(+), 70 deletions(-) diff --git a/models/models.go b/models/models.go index 678a6b7..e313a06 100644 --- a/models/models.go +++ b/models/models.go @@ -77,10 +77,6 @@ type CollectorTick struct { Volume int64 `json:"volume" gorm:"not null;default:0;comment:成交量(股)"` Amount float64 `json:"amount" gorm:"type:decimal(15,2);not null;default:0;comment:成交额(元)"` PVolume int64 `json:"pvolume" gorm:"not null;default:0;column:pvolume;comment:累积成交量"` - BidPrices []float64 `json:"bid_prices" gorm:"type:decimal(10,4)[];column:bid_prices;comment:买盘价格数组(买一到买五)"` - BidVolumes []int `json:"bid_volumes" gorm:"type:integer[];column:bid_volumes;comment:买盘数量数组(买一到买五)"` - AskPrices []float64 `json:"ask_prices" gorm:"type:decimal(10,4)[];column:ask_prices;comment:卖盘价格数组(卖一到卖五)"` - AskVolumes []int `json:"ask_volumes" gorm:"type:integer[];column:ask_volumes;comment:卖盘数量数组(卖一到卖五)"` Time int64 `json:"time" gorm:"not null;index;comment:行情时间戳"` TimeTag string `json:"timetag" gorm:"type:varchar(50);column:timetag;comment:时间标签(格式化时间字符串)"` StockStatus int `json:"stock_status" gorm:"not null;default:0;column:stock_status;comment:股票状态"` diff --git a/storage/storage.go b/storage/storage.go index 4a8538b..1d83c5d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -9,7 +9,6 @@ import ( "git.apinb.com/quant/collector/types" "gorm.io/driver/postgres" "gorm.io/gorm" - "gorm.io/gorm/clause" "gorm.io/gorm/logger" ) @@ -97,7 +96,10 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { now := time.Now() ymd := now.Year()*10000 + int(now.Month())*100 + now.Day() - // 保存资产快照 (Upsert: 存在则更新,不存在则插入) + // 保存资产快照 (先查询后更新/插入) + var existingAsset models.CollectorAssets + err := tx.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error + asset := models.CollectorAssets{ AccountID: status.Data.Assets.AccountID, Ymd: ymd, @@ -110,19 +112,26 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { CollectedAt: now, } - // 使用GORM的Clauses实现Upsert - if err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "account_id"}, {Name: "ymd"}}, - DoUpdates: clause.AssignmentColumns([]string{"cash", "frozen_cash", "market_value", "profit", "total_asset", "data_hash", "collected_at"}), - }).Create(&asset).Error; err != nil { - return fmt.Errorf("保存资产快照失败: %w", err) + if err == gorm.ErrRecordNotFound { + // 记录不存在,插入新记录 + if err := tx.Create(&asset).Error; err != nil { + return fmt.Errorf("插入资产快照失败: %w", err) + } + } else if err != nil { + // 查询出错 + return fmt.Errorf("查询资产快照失败: %w", err) + } else { + // 记录存在,更新现有记录 + asset.ID = existingAsset.ID + if err := tx.Save(&asset).Error; err != nil { + return fmt.Errorf("更新资产快照失败: %w", err) + } } - // 批量保存订单 (Upsert: 存在则更新,不存在则插入) + // 批量保存订单 (先查询后更新/插入) if len(status.Data.Orders) > 0 { - orders := make([]models.CollectorOrder, 0, len(status.Data.Orders)) for _, order := range status.Data.Orders { - // 验证Upsert必要条件: OrderID和StockCode + // 验证必要条件: OrderID和StockCode if order.OrderID == 0 { continue } @@ -130,7 +139,12 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { continue } - orders = append(orders, models.CollectorOrder{ + // 查询是否存在 + var existingOrder models.CollectorOrder + err := tx.Where("account_id = ? AND order_id = ? AND ymd = ?", + status.Data.Assets.AccountID, order.OrderID, ymd).First(&existingOrder).Error + + orderRecord := models.CollectorOrder{ OrderID: order.OrderID, AccountID: status.Data.Assets.AccountID, StockCode: order.StockCode, @@ -144,33 +158,40 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { OrderRemark: order.OrderRemark, DataHash: dataHash, CollectedAt: now, - }) - } - // 使用Upsert逻辑: 以 account_id, order_id, ymd 为条件 - if len(orders) > 0 { - if err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "account_id"}, {Name: "order_id"}, {Name: "ymd"}}, - DoUpdates: clause.AssignmentColumns([]string{ - "stock_code", "price", "volume", - "traded_price", "traded_volume", "order_status", - "order_time", "order_remark", "data_hash", "collected_at", - }), - }).Create(&orders).Error; err != nil { - return fmt.Errorf("保存订单失败: %w", err) + } + + if err == gorm.ErrRecordNotFound { + // 记录不存在,插入新记录 + if err := tx.Create(&orderRecord).Error; err != nil { + return fmt.Errorf("插入订单失败: %w", err) + } + } else if err != nil { + // 查询出错 + return fmt.Errorf("查询订单失败: %w", err) + } else { + // 记录存在,更新现有记录 + orderRecord.ID = existingOrder.ID + if err := tx.Save(&orderRecord).Error; err != nil { + return fmt.Errorf("更新订单失败: %w", err) + } } } } - // 批量保存持仓 (Upsert: 存在则更新,不存在则插入) + // 批量保存持仓 (先查询后更新/插入) if len(status.Data.Positions) > 0 { - positions := make([]models.CollectorPosition, 0, len(status.Data.Positions)) for _, pos := range status.Data.Positions { - // 验证Upsert必要条件: Code + // 验证必要条件: Code if pos.Code == "" { continue } - positions = append(positions, models.CollectorPosition{ + // 查询是否存在 + var existingPosition models.CollectorPosition + err := tx.Where("account_id = ? AND code = ? AND ymd = ?", + status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error + + positionRecord := models.CollectorPosition{ AccountID: status.Data.Assets.AccountID, Code: pos.Code, Ymd: ymd, @@ -186,34 +207,39 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { MinProfitRate: pos.MinProfitRate, DataHash: dataHash, CollectedAt: now, - }) - } - // 使用Upsert逻辑: 以 account_id, code, ymd 为条件 - if len(positions) > 0 { - if err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "account_id"}, {Name: "code"}, {Name: "ymd"}}, - DoUpdates: clause.AssignmentColumns([]string{ - "volume", "can_use_volume", "frozen_volume", - "avg_price", "open_price", "current_price", - "market_value", "profit", "profit_rate", - "min_profit_rate", "data_hash", "collected_at", - }), - }).Create(&positions).Error; err != nil { - return fmt.Errorf("保存持仓失败: %w", err) + } + + if err == gorm.ErrRecordNotFound { + // 记录不存在,插入新记录 + if err := tx.Create(&positionRecord).Error; err != nil { + return fmt.Errorf("插入持仓失败: %w", err) + } + } else if err != nil { + // 查询出错 + return fmt.Errorf("查询持仓失败: %w", err) + } else { + // 记录存在,更新现有记录 + positionRecord.ID = existingPosition.ID + if err := tx.Save(&positionRecord).Error; err != nil { + return fmt.Errorf("更新持仓失败: %w", err) + } } } } - // 批量保存行情数据 (Upsert: 存在则更新,不存在则插入) + // 批量保存行情数据 (先查询后更新/插入) if len(status.Data.TickData) > 0 { - ticks := make([]models.CollectorTick, 0, len(status.Data.TickData)) for code, tick := range status.Data.TickData { - // 验证Upsert必要条件: StockCode + // 验证必要条件: StockCode if code == "" { continue } - ticks = append(ticks, models.CollectorTick{ + // 查询是否存在 + var existingTick models.CollectorTick + err := tx.Where("stock_code = ? AND ymd = ?", code, ymd).First(&existingTick).Error + + tickRecord := models.CollectorTick{ StockCode: code, Ymd: ymd, LastPrice: tick.LastPrice, @@ -224,30 +250,27 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { Volume: tick.Volume, Amount: tick.Amount, PVolume: tick.PVolume, - BidPrices: tick.BidPrice, - BidVolumes: tick.BidVol, - AskPrices: tick.AskPrice, - AskVolumes: tick.AskVol, Time: tick.Time, TimeTag: tick.TimeTag, StockStatus: tick.StockStatus, DataHash: dataHash, CollectedAt: now, - }) - } - // 使用Upsert逻辑: 以 stock_code, ymd 为条件 - if len(ticks) > 0 { - if err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "stock_code"}, {Name: "ymd"}}, - DoUpdates: clause.AssignmentColumns([]string{ - "last_price", "open", "high", "low", "last_close", - "volume", "amount", "pvolume", - "bid_prices", "bid_volumes", "ask_prices", "ask_volumes", - "time", "timetag", "stock_status", - "data_hash", "collected_at", - }), - }).Create(&ticks).Error; err != nil { - return fmt.Errorf("保存行情数据失败: %w", err) + } + + if err == gorm.ErrRecordNotFound { + // 记录不存在,插入新记录 + if err := tx.Create(&tickRecord).Error; err != nil { + return fmt.Errorf("插入行情数据失败: %w", err) + } + } else if err != nil { + // 查询出错 + return fmt.Errorf("查询行情数据失败: %w", err) + } else { + // 记录存在,更新现有记录 + tickRecord.ID = existingTick.ID + if err := tx.Save(&tickRecord).Error; err != nil { + return fmt.Errorf("更新行情数据失败: %w", err) + } } } }