optz
This commit is contained in:
@@ -77,10 +77,6 @@ type CollectorTick struct {
|
||||
Volume int64 `json:"volume" gorm:"not null;default:0;comment:成交量(股)"`
|
||||
Amount float64 `json:"amount" gorm:"type:decimal(15,2);not null;default:0;comment:成交额(元)"`
|
||||
PVolume int64 `json:"pvolume" gorm:"not null;default:0;column:pvolume;comment:累积成交量"`
|
||||
BidPrices []float64 `json:"bid_prices" gorm:"type:decimal(10,4)[];column:bid_prices;comment:买盘价格数组(买一到买五)"`
|
||||
BidVolumes []int `json:"bid_volumes" gorm:"type:integer[];column:bid_volumes;comment:买盘数量数组(买一到买五)"`
|
||||
AskPrices []float64 `json:"ask_prices" gorm:"type:decimal(10,4)[];column:ask_prices;comment:卖盘价格数组(卖一到卖五)"`
|
||||
AskVolumes []int `json:"ask_volumes" gorm:"type:integer[];column:ask_volumes;comment:卖盘数量数组(卖一到卖五)"`
|
||||
Time int64 `json:"time" gorm:"not null;index;comment:行情时间戳"`
|
||||
TimeTag string `json:"timetag" gorm:"type:varchar(50);column:timetag;comment:时间标签(格式化时间字符串)"`
|
||||
StockStatus int `json:"stock_status" gorm:"not null;default:0;column:stock_status;comment:股票状态"`
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"git.apinb.com/quant/collector/types"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
@@ -97,7 +96,10 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
now := time.Now()
|
||||
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
|
||||
|
||||
// 保存资产快照 (Upsert: 存在则更新,不存在则插入)
|
||||
// 保存资产快照 (先查询后更新/插入)
|
||||
var existingAsset models.CollectorAssets
|
||||
err := tx.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
|
||||
|
||||
asset := models.CollectorAssets{
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
Ymd: ymd,
|
||||
@@ -110,19 +112,26 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
CollectedAt: now,
|
||||
}
|
||||
|
||||
// 使用GORM的Clauses实现Upsert
|
||||
if err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "account_id"}, {Name: "ymd"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"cash", "frozen_cash", "market_value", "profit", "total_asset", "data_hash", "collected_at"}),
|
||||
}).Create(&asset).Error; err != nil {
|
||||
return fmt.Errorf("保存资产快照失败: %w", err)
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := tx.Create(&asset).Error; err != nil {
|
||||
return fmt.Errorf("插入资产快照失败: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询资产快照失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,更新现有记录
|
||||
asset.ID = existingAsset.ID
|
||||
if err := tx.Save(&asset).Error; err != nil {
|
||||
return fmt.Errorf("更新资产快照失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 批量保存订单 (Upsert: 存在则更新,不存在则插入)
|
||||
// 批量保存订单 (先查询后更新/插入)
|
||||
if len(status.Data.Orders) > 0 {
|
||||
orders := make([]models.CollectorOrder, 0, len(status.Data.Orders))
|
||||
for _, order := range status.Data.Orders {
|
||||
// 验证Upsert必要条件: OrderID和StockCode
|
||||
// 验证必要条件: OrderID和StockCode
|
||||
if order.OrderID == 0 {
|
||||
continue
|
||||
}
|
||||
@@ -130,7 +139,12 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
orders = append(orders, models.CollectorOrder{
|
||||
// 查询是否存在
|
||||
var existingOrder models.CollectorOrder
|
||||
err := tx.Where("account_id = ? AND order_id = ? AND ymd = ?",
|
||||
status.Data.Assets.AccountID, order.OrderID, ymd).First(&existingOrder).Error
|
||||
|
||||
orderRecord := models.CollectorOrder{
|
||||
OrderID: order.OrderID,
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
StockCode: order.StockCode,
|
||||
@@ -144,33 +158,40 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
OrderRemark: order.OrderRemark,
|
||||
DataHash: dataHash,
|
||||
CollectedAt: now,
|
||||
})
|
||||
}
|
||||
// 使用Upsert逻辑: 以 account_id, order_id, ymd 为条件
|
||||
if len(orders) > 0 {
|
||||
if err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "account_id"}, {Name: "order_id"}, {Name: "ymd"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{
|
||||
"stock_code", "price", "volume",
|
||||
"traded_price", "traded_volume", "order_status",
|
||||
"order_time", "order_remark", "data_hash", "collected_at",
|
||||
}),
|
||||
}).Create(&orders).Error; err != nil {
|
||||
return fmt.Errorf("保存订单失败: %w", err)
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := tx.Create(&orderRecord).Error; err != nil {
|
||||
return fmt.Errorf("插入订单失败: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询订单失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,更新现有记录
|
||||
orderRecord.ID = existingOrder.ID
|
||||
if err := tx.Save(&orderRecord).Error; err != nil {
|
||||
return fmt.Errorf("更新订单失败: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 批量保存持仓 (Upsert: 存在则更新,不存在则插入)
|
||||
// 批量保存持仓 (先查询后更新/插入)
|
||||
if len(status.Data.Positions) > 0 {
|
||||
positions := make([]models.CollectorPosition, 0, len(status.Data.Positions))
|
||||
for _, pos := range status.Data.Positions {
|
||||
// 验证Upsert必要条件: Code
|
||||
// 验证必要条件: Code
|
||||
if pos.Code == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
positions = append(positions, models.CollectorPosition{
|
||||
// 查询是否存在
|
||||
var existingPosition models.CollectorPosition
|
||||
err := tx.Where("account_id = ? AND code = ? AND ymd = ?",
|
||||
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
|
||||
|
||||
positionRecord := models.CollectorPosition{
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
Code: pos.Code,
|
||||
Ymd: ymd,
|
||||
@@ -186,34 +207,39 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
MinProfitRate: pos.MinProfitRate,
|
||||
DataHash: dataHash,
|
||||
CollectedAt: now,
|
||||
})
|
||||
}
|
||||
// 使用Upsert逻辑: 以 account_id, code, ymd 为条件
|
||||
if len(positions) > 0 {
|
||||
if err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "account_id"}, {Name: "code"}, {Name: "ymd"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{
|
||||
"volume", "can_use_volume", "frozen_volume",
|
||||
"avg_price", "open_price", "current_price",
|
||||
"market_value", "profit", "profit_rate",
|
||||
"min_profit_rate", "data_hash", "collected_at",
|
||||
}),
|
||||
}).Create(&positions).Error; err != nil {
|
||||
return fmt.Errorf("保存持仓失败: %w", err)
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := tx.Create(&positionRecord).Error; err != nil {
|
||||
return fmt.Errorf("插入持仓失败: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询持仓失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,更新现有记录
|
||||
positionRecord.ID = existingPosition.ID
|
||||
if err := tx.Save(&positionRecord).Error; err != nil {
|
||||
return fmt.Errorf("更新持仓失败: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 批量保存行情数据 (Upsert: 存在则更新,不存在则插入)
|
||||
// 批量保存行情数据 (先查询后更新/插入)
|
||||
if len(status.Data.TickData) > 0 {
|
||||
ticks := make([]models.CollectorTick, 0, len(status.Data.TickData))
|
||||
for code, tick := range status.Data.TickData {
|
||||
// 验证Upsert必要条件: StockCode
|
||||
// 验证必要条件: StockCode
|
||||
if code == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
ticks = append(ticks, models.CollectorTick{
|
||||
// 查询是否存在
|
||||
var existingTick models.CollectorTick
|
||||
err := tx.Where("stock_code = ? AND ymd = ?", code, ymd).First(&existingTick).Error
|
||||
|
||||
tickRecord := models.CollectorTick{
|
||||
StockCode: code,
|
||||
Ymd: ymd,
|
||||
LastPrice: tick.LastPrice,
|
||||
@@ -224,30 +250,27 @@ func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
|
||||
Volume: tick.Volume,
|
||||
Amount: tick.Amount,
|
||||
PVolume: tick.PVolume,
|
||||
BidPrices: tick.BidPrice,
|
||||
BidVolumes: tick.BidVol,
|
||||
AskPrices: tick.AskPrice,
|
||||
AskVolumes: tick.AskVol,
|
||||
Time: tick.Time,
|
||||
TimeTag: tick.TimeTag,
|
||||
StockStatus: tick.StockStatus,
|
||||
DataHash: dataHash,
|
||||
CollectedAt: now,
|
||||
})
|
||||
}
|
||||
// 使用Upsert逻辑: 以 stock_code, ymd 为条件
|
||||
if len(ticks) > 0 {
|
||||
if err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "stock_code"}, {Name: "ymd"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{
|
||||
"last_price", "open", "high", "low", "last_close",
|
||||
"volume", "amount", "pvolume",
|
||||
"bid_prices", "bid_volumes", "ask_prices", "ask_volumes",
|
||||
"time", "timetag", "stock_status",
|
||||
"data_hash", "collected_at",
|
||||
}),
|
||||
}).Create(&ticks).Error; err != nil {
|
||||
return fmt.Errorf("保存行情数据失败: %w", err)
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := tx.Create(&tickRecord).Error; err != nil {
|
||||
return fmt.Errorf("插入行情数据失败: %w", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询行情数据失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,更新现有记录
|
||||
tickRecord.ID = existingTick.ID
|
||||
if err := tx.Save(&tickRecord).Error; err != nil {
|
||||
return fmt.Errorf("更新行情数据失败: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user