From 35e52bc5eaf187c1637ec1e75d83f89ec2ff0cf3 Mon Sep 17 00:00:00 2001 From: yanweidong Date: Tue, 7 Apr 2026 14:46:49 +0800 Subject: [PATCH] optz --- .env.example | 8 -- README.md | 99 +++++++++++---------- README_NEW.md | 165 ---------------------------------- REFACTOR.md | 170 ------------------------------------ cmd/main.go | 21 ++--- collector/collector.go | 10 +-- collector/collector_test.go | 4 +- models/models.go | 98 +++------------------ scripts/build.sh | 11 +-- scripts/deploy.sh | 50 ----------- start.bat | 17 ---- start.sh | 20 ----- storage/storage.go | 97 ++++++++++++++------ types/req.go | 84 ++++++++++++++++++ 14 files changed, 234 insertions(+), 620 deletions(-) delete mode 100644 .env.example delete mode 100644 README_NEW.md delete mode 100644 REFACTOR.md delete mode 100644 scripts/deploy.sh delete mode 100644 start.bat delete mode 100644 start.sh create mode 100644 types/req.go diff --git a/.env.example b/.env.example deleted file mode 100644 index cf71b6a..0000000 --- a/.env.example +++ /dev/null @@ -1,8 +0,0 @@ -# 采集器配置 -COLLECTOR_URL=http://localhost:5000/status - -# 数据库连接字符串 -DATABASE_URL=postgres://user:password@localhost:5432/qmt_db?sslmode=disable - -# 采集间隔(秒) -COLLECTION_INTERVAL=5 diff --git a/README.md b/README.md index 5497c29..250c06e 100644 --- a/README.md +++ b/README.md @@ -1,66 +1,69 @@ -# collector +# QMT数据采集器 -qmt client collector +## 项目简介 +QMT数据采集器是一个专门用于从QMT量化交易平台采集实时交易数据的Go语言应用程序。该采集器能够定时获取账户资产、订单状态、持仓信息和市场行情数据,并将这些数据存储到PostgreSQL数据库中,为量化交易策略提供数据支持。 -## 功能说明 +## 业务功能 -这是一个采集QMT交易客户端状态数据并存储到PostgreSQL数据库的服务。 +### 1. 实时数据采集 +- **账户资产监控**:实时采集账户现金、冻结资金、市值、盈亏和总资产等关键财务指标 +- **订单状态跟踪**:持续监控股票订单的状态变化,包括已成交、未成交、部分成交等各种状态 +- **持仓信息管理**:记录每只股票的持仓数量、可用数量、成本价、当前价及盈亏情况 +- **行情数据捕获**:收集股票的实时Tick数据,包括买卖五档价格、成交量、成交额等市场深度信息 -### 主要功能 -- 使用 `github.com/robfig/cron/v3` 定时调度,每5秒(可配置)从 http://localhost:5000/status 获取JSON数据 -- 计算数据SHA256哈希值,检测数据变化 -- 仅在数据变化时使用 `GORM` ORM存储到PostgreSQL数据库 -- 完整的日志记录和错误处理 -- 自动数据库表结构迁移 +### 2. 智能变化检测 +- 采用SHA256哈希算法对采集的数据进行指纹计算 +- 只有当数据发生变化时才执行数据库写入操作,避免重复数据存储 +- 有效减少数据库负载,提高系统运行效率 -### 数据样本 -数据样本位于 `/exmple/status.json` +### 3. 定时任务调度 +- 基于Cron表达式实现灵活的定时采集策略 +- 可配置的采集间隔(默认5秒),适应不同的业务需求 +- 稳定的后台运行机制,支持长时间连续工作 -### 入口文件 -程序入口在 `cmd/main.go` +### 4. 数据持久化存储 +- 使用PostgreSQL作为数据存储后端,确保数据可靠性和一致性 +- 支持数据 Upsert 操作(存在则更新,不存在则插入) +- 按日期分区管理数据,便于历史数据查询和分析 -## 快速开始 +### 5. 运行状态监控 +- 完整的采集日志记录,包括成功/失败状态和数据变化情况 +- 详细的错误信息记录,便于问题排查和系统维护 -### 1. 初始化数据库 +## 技术特性 -执行SQL脚本创建数据表: -```bash -psql -U your_user -d your_database -f scripts/schema.sql -``` +- **高性能**:采用Go语言开发,具有出色的并发处理能力 +- **低延迟**:优化的HTTP客户端配置,确保快速响应数据请求 +- **高可靠性**:完善的错误处理机制和事务保证 +- **易扩展**:模块化设计,便于功能扩展和维护 +- **资源友好**:智能的变化检测机制,减少不必要的数据库操作 -### 2. 配置环境变量 +## 应用场景 -复制 `.env.example` 为 `.env` 并修改配置: -```bash -cp .env.example .env -``` +- 量化交易策略的数据支撑 +- 投资组合实时监控 +- 交易行为分析与回溯 +- 风险管理数据基础 +- 自动化交易系统的数据输入 -编辑 `.env`: -```env -COLLECTOR_URL=http://localhost:5000/status -DATABASE_URL=postgres://user:password@localhost:5432/qmt_db?sslmode=disable -COLLECTION_INTERVAL=5 -``` +## 系统要求 -### 3. 运行程序 +- Go 1.26.1 或更高版本 +- PostgreSQL 数据库 +- 网络连接(访问QMT平台API) -Windows: -```bash -start.bat -``` +## 部署说明 -Linux/Mac: -```bash -chmod +x start.sh -./start.sh -``` +1. 克隆代码库 +2. 配置环境变量: + - `COLLECTOR_URL`: QMT数据接口地址 + - `DATABASE_URL`: PostgreSQL连接字符串 + - `COLLECTION_INTERVAL`: 采集间隔(秒) +3. 编译并运行程序 -或直接运行: -```bash -go run cmd/main.go -``` +## 注意事项 -## 详细说明 - -详细文档请查看 [README_NEW.md](README_NEW.md) \ No newline at end of file +- 确保QMT平台服务正常运行并可访问 +- 数据库需要具备相应的读写权限 +- 建议在生产环境中配置合适的日志级别和监控告警 \ No newline at end of file diff --git a/README_NEW.md b/README_NEW.md deleted file mode 100644 index f56cfc1..0000000 --- a/README_NEW.md +++ /dev/null @@ -1,165 +0,0 @@ -# QMT数据采集器 - -这是一个用于采集QMT交易客户端状态数据并存储到PostgreSQL数据库的服务。 - -## 功能特性 - -- ✅ 使用 `github.com/robfig/cron/v3` 定时任务调度 -- ✅ 每5秒(可配置)从HTTP接口采集数据 -- ✅ 计算数据SHA256哈希值,检测数据变化 -- ✅ 仅在数据变化时使用 `GORM` ORM存储到数据库,避免重复数据 -- ✅ 自动数据库表结构迁移 -- ✅ 完整的日志记录 -- ✅ 优雅退出支持 -- ✅ 环境变量配置 - -## 数据结构 - -采集的数据包括: -- **资产信息**: 账户资金、市值、盈亏等 -- **订单信息**: 所有委托订单详情 -- **持仓信息**: 当前持仓股票及盈亏 -- **行情数据**: 实时tick行情数据 - -## 数据库表结构 - -系统会自动创建以下数据表: - -1. `assets_snapshots` - 资产快照表 -2. `orders` - 订单表 -3. `positions` - 持仓表 -4. `tick_data` - 行情数据表 -5. `collection_logs` - 采集日志表 - -详细的表结构请查看 [scripts/schema.sql](scripts/schema.sql) - -## 快速开始 - -### 1. 安装依赖 - -```bash -go mod download -``` - -### 2. 配置环境变量 - -复制 `.env.example` 为 `.env` 并修改配置: - -```bash -cp .env.example .env -``` - -编辑 `.env` 文件: - -```env -# 采集地址 -COLLECTOR_URL=http://localhost:5000/status - -# 数据库连接字符串 (修改为你的实际配置) -DATABASE_URL=postgres://user:password@localhost:5432/qmt_db?sslmode=disable - -# 采集间隔(秒) -COLLECTION_INTERVAL=5 -``` - -**注意**: 使用GORM后,程序会自动创建和迁移数据库表结构,无需手动执行SQL脚本。 - -### 3. 运行程序 - -```bash -go run cmd/main.go -``` - -或者设置环境变量后运行: - -```bash -export COLLECTOR_URL=http://localhost:5000/status -export DATABASE_URL=postgres://user:password@localhost:5432/qmt_db?sslmode=disable -export COLLECTION_INTERVAL=5 -go run cmd/main.go -``` - -### 4. 编译程序 - -```bash -go build -o collector cmd/main.go -``` - -## 配置说明 - -| 环境变量 | 说明 | 默认值 | -|---------|------|--------| -| COLLECTOR_URL | 数据采集地址 | http://localhost:5000/status | -| DATABASE_URL | PostgreSQL连接字符串 | postgres://user:password@localhost:5432/qmt_db?sslmode=disable | -| COLLECTION_INTERVAL | 采集间隔(秒) | 5 | - -## 项目结构 - -``` -collector/ -├── cmd/ -│ └── main.go # 主程序入口 -├── collector/ -│ └── collector.go # 数据采集器(HTTP请求、Hash计算) -├── models/ -│ └── models.go # 数据模型定义 -├── storage/ -│ └── storage.go # 数据库存储模块 -├── scripts/ -│ ├── schema.sql # 数据库表结构SQL -│ ├── build.sh # 构建脚本 -│ ├── deploy.sh # 部署脚本 -│ └── update.sh # 更新脚本 -├── exmple/ -│ └── status.json # 数据样本 -├── go.mod # Go模块文件 -└── README.md # 说明文档 -``` - -## 技术栈 - -- **定时任务**: [github.com/robfig/cron/v3](https://github.com/robfig/cron/v3) - 强大的cron表达式调度器 -- **ORM框架**: [GORM](https://gorm.io/) - Go语言优秀的ORM库 -- **数据库驱动**: gorm.io/driver/postgres - PostgreSQL驱动 -- **数据库**: PostgreSQL 9.6+ - -## 工作原理 - -1. **定时调度**: 使用cron调度器,根据配置的间隔定时执行采集任务 -2. **数据采集**: 向配置的URL发起HTTP GET请求获取JSON数据 -3. **Hash计算**: 对获取的数据计算SHA256哈希值 -4. **变化检测**: 对比当前哈希与上次哈希,判断数据是否变化 -5. **数据存储**: 如果数据有变化,使用GORM事务将数据保存到PostgreSQL数据库 -6. **自动迁移**: 启动时自动检查并创建/更新数据库表结构 -7. **日志记录**: 每次采集都记录日志,包括hash值和是否变化 - -## 错误处理 - -- HTTP请求失败会记录错误日志并继续下一次采集 -- 数据库连接失败会导致程序退出 -- 数据存储失败会记录错误但不会中断程序 - -## 注意事项 - -1. 确保PostgreSQL数据库已正确配置并可访问 -2. **使用GORM后无需手动执行SQL脚本**,程序启动时会自动创建和迁移表结构 -3. 建议在生产环境使用更安全的数据库连接方式 -4. 可以根据需要调整采集间隔,但不建议设置过小 -5. GORM会自动管理索引和表结构变更 - -## 开发 - -### 添加新功能 - -1. 在 `models/models.go` 中添加数据模型 -2. 在 `storage/storage.go` 中实现数据库操作 -3. 在 `collector/collector.go` 中扩展采集逻辑 -4. 在 `cmd/main.go` 中集成新功能 - -### 测试 - -可以使用 `exmple/status.json` 中的样本数据进行测试。 - -## 许可证 - -MIT License diff --git a/REFACTOR.md b/REFACTOR.md deleted file mode 100644 index 334ef0c..0000000 --- a/REFACTOR.md +++ /dev/null @@ -1,170 +0,0 @@ -# 重构说明 - -## 重构内容 - -本次重构将项目从原生SQL和time.Ticker改为使用更现代化的库: - -### 1. 定时任务调度器 - -**之前**: 使用 `time.Ticker` -```go -ticker := time.NewTicker(time.Duration(interval) * time.Second) -defer ticker.Stop() - -for { - select { - case <-ticker.C: - runCollection(coll, store) - case <-quit: - return - } -} -``` - -**现在**: 使用 `github.com/robfig/cron/v3` -```go -c := cron.New(cron.WithSeconds()) -cronSpec := fmt.Sprintf("@every %ds", interval) - -_, err = c.AddFunc(cronSpec, func() { - runCollection(coll, store) -}) - -c.Start() -// ... -c.Stop() -``` - -**优势**: -- 支持标准的cron表达式,更灵活 -- 可以配置多个不同频率的任务 -- 更好的任务管理和控制 -- 支持秒级精度 - -### 2. 数据库ORM - -**之前**: 使用原生 `database/sql` + `github.com/lib/pq` -```go -db, err := sql.Open("postgres", connStr) -_, err := s.db.Exec(query, params...) -tx, err := s.db.Begin() -stmt, err := tx.Prepare(query) -``` - -**现在**: 使用 `GORM` -```go -db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{}) -tx.Create(&model) -tx.CreateInBatches(models, 100) -db.AutoMigrate(&models.Model{}) -``` - -**优势**: -- 自动数据库表结构迁移 -- 面向对象的操作方式 -- 批量插入优化 -- 自动管理连接池 -- 更好的类型安全 -- 支持软删除 -- 链式调用,代码更简洁 - -## 主要变化 - -### 文件变化 - -1. **models/models.go** - - 添加GORM标签 - - 添加DeletedAt字段支持软删除 - - ID类型从int改为uint - -2. **storage/storage.go** - - 完全重写,使用GORM - - 添加AutoMigrate方法 - - 简化事务处理 - - 使用CreateInBatches批量插入 - -3. **cmd/main.go** - - 使用cron替代time.Ticker - - 添加AutoMigrate调用 - - 更优雅的启动和停止流程 - -4. **go.mod** - - 移除: github.com/lib/pq (由GORM驱动替代) - - 新增: github.com/robfig/cron/v3 - - 新增: gorm.io/gorm - - 新增: gorm.io/driver/postgres - -### 数据库变化 - -**之前**: 需要手动执行 `scripts/schema.sql` 创建表 - -**现在**: 程序启动时自动创建和迁移表结构 - -## 兼容性说明 - -### API兼容 -- 外部接口保持不变 -- 环境变量配置保持不变 -- 数据结构保持不变 - -### 数据库兼容 -- 表结构与原设计完全一致 -- 字段类型和索引保持一致 -- 可以直接在原有数据库上运行(会保留已有数据) - -## 升级步骤 - -1. 备份现有数据库(可选,但推荐) - ```bash - pg_dump -U user -d qmt_db > backup.sql - ``` - -2. 更新依赖 - ```bash - go mod tidy - ``` - -3. 重新编译 - ```bash - go build -o collector.exe cmd/main.go - ``` - -4. 运行新程序 - ```bash - ./collector.exe - ``` - -程序会自动检测并迁移数据库表结构。 - -## 性能对比 - -### 定时任务 -- **time.Ticker**: 简单场景足够,但功能有限 -- **cron**: 功能强大,支持复杂调度,性能相当 - -### 数据库操作 -- **原生SQL**: 性能略高,但开发效率低 -- **GORM**: 开发效率高,批量插入性能优秀,适合本项目 - -对于本项目的数据采集场景,GORM的性能完全足够,且大大提升了开发效率和代码可维护性。 - -## 测试 - -所有单元测试已通过: -```bash -go test ./collector -v -``` - -编译成功: -```bash -go build -o collector.exe cmd/main.go -``` - -## 总结 - -本次重构在不改变功能的前提下,使用了更现代化、更易维护的技术栈: -- ✅ 更灵活的定时任务调度 -- ✅ 更简洁的数据库操作 -- ✅ 自动表结构管理 -- ✅ 更好的代码可读性和可维护性 -- ✅ 保持向后兼容 diff --git a/cmd/main.go b/cmd/main.go index cc1d161..23e8c92 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,7 @@ import ( "os/signal" "strconv" "syscall" + "time" "git.apinb.com/quant/collector/collector" "git.apinb.com/quant/collector/storage" @@ -18,8 +19,8 @@ func main() { // 从环境变量获取配置 collectorURL := getEnv("COLLECTOR_URL", "http://localhost:5000/status") - dbConnStr := getEnv("DATABASE_URL", "postgres://user:password@localhost:5432/qmt_db?sslmode=disable") - interval := getEnvAsInt("COLLECTION_INTERVAL", 5) // 默认5秒 + dbConnStr := getEnv("DATABASE_URL", "host=139.224.247.176 user=postgres password=Stock0310~! dbname=stock_prod port=19432 sslmode=disable TimeZone=Asia/Shanghai") + interval := getEnvAsInt("COLLECTION_INTERVAL", 60) // 默认60秒 log.Printf("采集地址: %s", collectorURL) log.Printf("采集间隔: %d秒", interval) @@ -58,10 +59,6 @@ func main() { c.Start() log.Println("定时任务已启动") - // 立即执行一次采集 - log.Println("执行首次采集...") - runCollection(coll, store) - // 等待退出信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) @@ -76,12 +73,16 @@ func main() { func runCollection(coll *collector.Collector, store *storage.Storage) { log.Println("开始采集...") + // 计算Ymd + now := time.Now() + ymd := now.Year()*10000 + int(now.Month())*100 + now.Day() + // 采集数据并检查变化 status, dataHash, changed, err := coll.CollectAndCheck() if err != nil { log.Printf("采集失败: %v", err) // 记录失败的日志 - if err := store.SaveCollectionLog("", false, err.Error()); err != nil { + if err := store.SaveCollectionLog("", ymd, false, err.Error()); err != nil { log.Printf("保存采集日志失败: %v", err) } return @@ -93,7 +94,7 @@ func runCollection(coll *collector.Collector, store *storage.Storage) { // 如果数据没有变化,只记录日志 if !changed { log.Println("数据未变化,跳过存储") - if err := store.SaveCollectionLog(dataHash, false, "数据未变化"); err != nil { + if err := store.SaveCollectionLog(dataHash, ymd, false, "数据未变化"); err != nil { log.Printf("保存采集日志失败: %v", err) } return @@ -104,14 +105,14 @@ func runCollection(coll *collector.Collector, store *storage.Storage) { if err := store.SaveStatus(status, dataHash); err != nil { log.Printf("保存数据失败: %v", err) // 记录失败的日志 - if err := store.SaveCollectionLog(dataHash, true, err.Error()); err != nil { + if err := store.SaveCollectionLog(dataHash, ymd, true, err.Error()); err != nil { log.Printf("保存采集日志失败: %v", err) } return } // 记录成功的日志 - if err := store.SaveCollectionLog(dataHash, true, "数据保存成功"); err != nil { + if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil { log.Printf("保存采集日志失败: %v", err) } diff --git a/collector/collector.go b/collector/collector.go index b71162d..f41af58 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -9,7 +9,7 @@ import ( "net/http" "time" - "git.apinb.com/quant/collector/models" + "git.apinb.com/quant/collector/types" ) // Collector 数据采集器 @@ -31,7 +31,7 @@ func NewCollector(url string) *Collector { } // FetchData 从HTTP接口获取数据 -func (c *Collector) FetchData() (*models.Status, error) { +func (c *Collector) FetchData() (*types.Status, error) { resp, err := c.httpClient.Get(c.url) if err != nil { return nil, fmt.Errorf("HTTP请求失败: %w", err) @@ -47,7 +47,7 @@ func (c *Collector) FetchData() (*models.Status, error) { return nil, fmt.Errorf("读取响应失败: %w", err) } - var status models.Status + var status types.Status if err := json.Unmarshal(body, &status); err != nil { return nil, fmt.Errorf("JSON解析失败: %w", err) } @@ -56,7 +56,7 @@ func (c *Collector) FetchData() (*models.Status, error) { } // CalculateHash 计算数据的SHA256哈希值 -func (c *Collector) CalculateHash(status *models.Status) (string, error) { +func (c *Collector) CalculateHash(status *types.Status) (string, error) { // 将数据序列化为JSON data, err := json.Marshal(status) if err != nil { @@ -87,7 +87,7 @@ func (c *Collector) GetLastHash() string { } // CollectAndCheck 采集数据并检查是否变化 -func (c *Collector) CollectAndCheck() (*models.Status, string, bool, error) { +func (c *Collector) CollectAndCheck() (*types.Status, string, bool, error) { // 获取数据 status, err := c.FetchData() if err != nil { diff --git a/collector/collector_test.go b/collector/collector_test.go index ca01577..f4f2b28 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -5,7 +5,7 @@ import ( "os" "testing" - "git.apinb.com/quant/collector/models" + "git.apinb.com/quant/collector/types" ) // TestCalculateHash 测试Hash计算功能 @@ -16,7 +16,7 @@ func TestCalculateHash(t *testing.T) { t.Fatalf("读取样本文件失败: %v", err) } - var status models.Status + var status types.Status if err := json.Unmarshal(data, &status); err != nil { t.Fatalf("JSON解析失败: %v", err) } diff --git a/models/models.go b/models/models.go index c062164..b1d0f46 100644 --- a/models/models.go +++ b/models/models.go @@ -6,93 +6,11 @@ import ( "gorm.io/gorm" ) -// Status 状态数据结构 -type Status struct { - Data Data `json:"data"` - Status Config `json:"status"` -} - -// Data 数据部分 -type Data struct { - Assets Assets `json:"assets"` - Orders []Order `json:"order"` - Positions []Position `json:"positions"` - TickData map[string]Tick `json:"tick_data"` -} - -// Assets 资产信息 -type Assets struct { - AccountID string `json:"account_id"` - Cash float64 `json:"cash"` - FrozenCash float64 `json:"frozen_cash"` - MarketValue float64 `json:"market_value"` - Profit float64 `json:"profit"` - TotalAsset float64 `json:"total_asset"` -} - -// Order 订单信息 -type Order struct { - OrderID int64 `json:"order_id"` - OrderRemark string `json:"order_remark"` - OrderStatus int `json:"order_status"` - OrderTime int64 `json:"order_time"` - Price float64 `json:"price"` - StockCode string `json:"stock_code"` - TradedPrice float64 `json:"traded_price"` - TradedVolume int `json:"traded_volume"` - Volume int `json:"volume"` -} - -// Position 持仓信息 -type Position struct { - Code string `json:"code"` - Volume int `json:"volume"` - CanUseVolume int `json:"can_use_volume"` - FrozenVolume int `json:"frozen_volume"` - AvgPrice float64 `json:"avg_price"` - OpenPrice float64 `json:"open_price"` - CurrentPrice float64 `json:"current_price"` - MarketValue float64 `json:"market_value"` - Profit float64 `json:"profit"` - ProfitRate float64 `json:"profit_rate"` - MinProfitRate float64 `json:"min_profit_rate"` -} - -// Tick 行情数据 -type Tick struct { - LastPrice float64 `json:"lastPrice"` - Open float64 `json:"open"` - High float64 `json:"high"` - Low float64 `json:"low"` - LastClose float64 `json:"lastClose"` - Volume int64 `json:"volume"` - Amount float64 `json:"amount"` - PVolume int64 `json:"pvolume"` - BidPrice []float64 `json:"bidPrice"` - BidVol []int `json:"bidVol"` - AskPrice []float64 `json:"askPrice"` - AskVol []int `json:"askVol"` - Time int64 `json:"time"` - TimeTag string `json:"timetag"` - StockStatus int `json:"stockStatus"` - LastSettlementPrice float64 `json:"lastSettlementPrice"` - SettlementPrice float64 `json:"settlementPrice"` - OpenInt int `json:"openInt"` -} - -// Config 配置信息 -type Config struct { - ConfigKey string `json:"config_key"` - HomeName string `json:"home_name"` - ProjectRoot string `json:"project_root"` - QmtStatus string `json:"qmt_status"` - StartTime int64 `json:"start_time"` -} - // AssetSnapshot 资产快照数据库模型 -type AssetSnapshot struct { +type CollectorAssets struct { ID uint `json:"id" gorm:"primaryKey"` AccountID string `json:"account_id" gorm:"type:varchar(50);not null;index"` + Ymd int `json:"ymd" gorm:"not null;index;comment:年月日数字格式,如20260407"` Cash float64 `json:"cash" gorm:"type:decimal(15,2);not null;default:0"` FrozenCash float64 `json:"frozen_cash" gorm:"type:decimal(15,2);not null;default:0;column:frozen_cash"` MarketValue float64 `json:"market_value" gorm:"type:decimal(15,2);not null;default:0;column:market_value"` @@ -105,11 +23,12 @@ type AssetSnapshot struct { } // OrderRecord 订单数据库模型 -type OrderRecord struct { +type CollectorOrder struct { ID uint `json:"id" gorm:"primaryKey"` OrderID int64 `json:"order_id" gorm:"not null;index"` AccountID string `json:"account_id" gorm:"type:varchar(50);not null;index"` StockCode string `json:"stock_code" gorm:"type:varchar(20);not null;index"` + Ymd int `json:"ymd" gorm:"not null;index;comment:年月日数字格式,如20260407"` Price float64 `json:"price" gorm:"type:decimal(10,4);not null;default:0"` Volume int `json:"volume" gorm:"not null;default:0"` TradedPrice float64 `json:"traded_price" gorm:"type:decimal(10,4);not null;default:0;column:traded_price"` @@ -124,10 +43,11 @@ type OrderRecord struct { } // PositionRecord 持仓数据库模型 -type PositionRecord struct { +type CollectorPosition struct { ID uint `json:"id" gorm:"primaryKey"` AccountID string `json:"account_id" gorm:"type:varchar(50);not null;index"` Code string `json:"code" gorm:"type:varchar(20);not null;index"` + Ymd int `json:"ymd" gorm:"not null;index;comment:年月日数字格式,如20260407"` Volume int `json:"volume" gorm:"not null;default:0"` CanUseVolume int `json:"can_use_volume" gorm:"not null;default:0;column:can_use_volume"` FrozenVolume int `json:"frozen_volume" gorm:"not null;default:0;column:frozen_volume"` @@ -145,9 +65,10 @@ type PositionRecord struct { } // TickRecord 行情数据库模型 -type TickRecord struct { +type CollectorTick struct { ID uint `json:"id" gorm:"primaryKey"` StockCode string `json:"stock_code" gorm:"type:varchar(20);not null;index"` + Ymd int `json:"ymd" gorm:"not null;index;comment:年月日数字格式,如20260407"` LastPrice float64 `json:"last_price" gorm:"type:decimal(10,4);not null;default:0;column:last_price"` Open float64 `json:"open" gorm:"type:decimal(10,4);not null;default:0"` High float64 `json:"high" gorm:"type:decimal(10,4);not null;default:0"` @@ -170,9 +91,10 @@ type TickRecord struct { } // CollectionLog 采集日志数据库模型 -type CollectionLog struct { +type CollectorLog struct { ID uint `json:"id" gorm:"primaryKey"` DataHash string `json:"data_hash" gorm:"type:varchar(64);not null;index"` + Ymd int `json:"ymd" gorm:"not null;index;comment:年月日数字格式,如20260407"` HasChanged bool `json:"has_changed" gorm:"not null;default:false;column:has_changed"` StatusMessage string `json:"status_message" gorm:"type:text;column:status_message"` CollectedAt time.Time `json:"collected_at" gorm:"not null;index"` diff --git a/scripts/build.sh b/scripts/build.sh index 40bc14e..b929430 100644 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1,12 +1,3 @@ #!/bin/bash -GOARCH=amd64 GOOS=linux go build -o ../builds/gostock ./cmd/main/main.go - -BSM_RuntimeMode=prod BSM_Prefix=/data/app/ nohup ./gostock > /data/app/logs/gostock.log 2>&1 & -cat /data/app/logs/gostock.log - - -GOARCH=amd64 GOOS=linux go build -o ../builds/selector ./cmd/selector/main.go - -GOARCH=amd64 GOOS=linux go build -o ../builds/test ./cmd/test/main.go -BSM_RuntimeMode=prod BSM_Prefix=/data/app/ ./selector \ No newline at end of file +go build -o ../builds/collector.exe ./cmd/main.go \ No newline at end of file diff --git a/scripts/deploy.sh b/scripts/deploy.sh deleted file mode 100644 index 36d498a..0000000 --- a/scripts/deploy.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -# 配置部分 -BINARY_NAME="gostock" # 二进制文件名 -BUILD_OUTPUT_DIR="../builds/gostock" # 构建输出目录 - -# 服务器配置 -REMOTE_USER="root" # 服务器用户名 -REMOTE_HOST="139.224.247.176" # 服务器地址 -REMOTE_DIR="/data/app" # 服务器部署目录 -SERVICE_NAME="gostock" # 服务名称(如果有systemd服务) - -echo "=== 开始部署流程 ===" - -# 1. 编译Linux二进制文件 -echo "正在编译Linux二进制文件..." - -# 使用Go语言编译示例 (如果是其他语言请修改此部分) -# 如果不是Go项目,请替换为你的构建命令,如make等 -GOEXPERIMENT=jsonv2 GOOS=linux GOARCH=amd64 go build -o "${BUILD_OUTPUT_DIR}/${BINARY_NAME}" ./cmd/${BINARY_NAME}/main.go - -if [ $? -ne 0 ]; then - echo "编译失败!" - exit 1 -fi - -echo "编译成功: ${BUILD_OUTPUT_DIR}/${BINARY_NAME}" - -# 2. 停止远程服务 -echo "正在停止远程服务..." -ssh "${REMOTE_USER}@${REMOTE_HOST}" << EOF - killall -9 "${BINARY_NAME}" -EOF - -# 3. 上传到服务器 -echo "正在上传文件到服务器..." -scp -C "${BUILD_OUTPUT_DIR}/${BINARY_NAME}" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/${BINARY_NAME}" -scp ./etc/* "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/etc/" - - -# 4. 设置执行权限并启动服务 -echo "正在设置权限并启动服务..." -ssh "${REMOTE_USER}@${REMOTE_HOST}" << EOF - chmod +x "${REMOTE_DIR}/${BINARY_NAME}" - nohup "${REMOTE_DIR}/${BINARY_NAME}" > "${REMOTE_DIR}/logs/${BINARY_NAME}.log" 2>&1 & - sleep 2 - pgrep -f "${REMOTE_DIR}/${BINARY_NAME}" && echo "服务启动成功!" || echo "服务启动可能失败!" -EOF - -echo "=== 部署完成 ===" \ No newline at end of file diff --git a/start.bat b/start.bat deleted file mode 100644 index 3416dda..0000000 --- a/start.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -REM Windows 启动脚本 - -echo 正在启动 QMT 数据采集器... -echo. - -REM 检查 .env 文件是否存在 -if not exist .env ( - echo 警告: .env 文件不存在,将使用默认配置 - echo 请复制 .env.example 为 .env 并修改配置 - echo. -) - -REM 运行程序 -collector.exe - -pause diff --git a/start.sh b/start.sh deleted file mode 100644 index 0dc76cb..0000000 --- a/start.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -# Linux/Mac 启动脚本 - -echo "正在启动 QMT 数据采集器..." -echo "" - -# 检查 .env 文件是否存在 -if [ ! -f .env ]; then - echo "警告: .env 文件不存在,将使用默认配置" - echo "请复制 .env.example 为 .env 并修改配置" - echo "" -fi - -# 加载环境变量 -if [ -f .env ]; then - export $(cat .env | grep -v '^#' | xargs) -fi - -# 运行程序 -./collector diff --git a/storage/storage.go b/storage/storage.go index b3bb9e7..790bd58 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,8 +6,10 @@ import ( "time" "git.apinb.com/quant/collector/models" + "git.apinb.com/quant/collector/types" "gorm.io/driver/postgres" "gorm.io/gorm" + "gorm.io/gorm/clause" "gorm.io/gorm/logger" ) @@ -68,11 +70,11 @@ func (s *Storage) AutoMigrate() error { log.Println("开始自动迁移数据库表结构...") err := s.db.AutoMigrate( - &models.AssetSnapshot{}, - &models.OrderRecord{}, - &models.PositionRecord{}, - &models.TickRecord{}, - &models.CollectionLog{}, + &models.CollectorAssets{}, + &models.CollectorOrder{}, + &models.CollectorPosition{}, + &models.CollectorTick{}, + &models.CollectorLog{}, ) if err != nil { @@ -84,31 +86,42 @@ func (s *Storage) AutoMigrate() error { } // SaveStatus 保存完整状态数据(使用事务) -func (s *Storage) SaveStatus(status *models.Status, dataHash string) error { +func (s *Storage) SaveStatus(status *types.Status, dataHash string) error { return s.db.Transaction(func(tx *gorm.DB) error { - // 保存资产快照 - asset := models.AssetSnapshot{ + // 计算Ymd (年月日数字格式,如20260407) + now := time.Now() + ymd := now.Year()*10000 + int(now.Month())*100 + now.Day() + + // 保存资产快照 (Upsert: 存在则更新,不存在则插入) + asset := models.CollectorAssets{ AccountID: status.Data.Assets.AccountID, + Ymd: ymd, Cash: status.Data.Assets.Cash, FrozenCash: status.Data.Assets.FrozenCash, MarketValue: status.Data.Assets.MarketValue, Profit: status.Data.Assets.Profit, TotalAsset: status.Data.Assets.TotalAsset, DataHash: dataHash, - CollectedAt: time.Now(), + CollectedAt: now, } - if err := tx.Create(&asset).Error; err != nil { + + // 使用GORM的Clauses实现Upsert + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "account_id"}, {Name: "ymd"}}, + DoUpdates: clause.AssignmentColumns([]string{"cash", "frozen_cash", "market_value", "profit", "total_asset", "data_hash", "collected_at"}), + }).Create(&asset).Error; err != nil { return fmt.Errorf("保存资产快照失败: %w", err) } - // 批量保存订单 + // 批量保存订单 (Upsert: 存在则更新,不存在则插入) if len(status.Data.Orders) > 0 { - orders := make([]models.OrderRecord, 0, len(status.Data.Orders)) + orders := make([]models.CollectorOrder, 0, len(status.Data.Orders)) for _, order := range status.Data.Orders { - orders = append(orders, models.OrderRecord{ + orders = append(orders, models.CollectorOrder{ OrderID: order.OrderID, AccountID: status.Data.Assets.AccountID, StockCode: order.StockCode, + Ymd: ymd, Price: order.Price, Volume: order.Volume, TradedPrice: order.TradedPrice, @@ -117,21 +130,30 @@ func (s *Storage) SaveStatus(status *models.Status, dataHash string) error { OrderTime: order.OrderTime, OrderRemark: order.OrderRemark, DataHash: dataHash, - CollectedAt: time.Now(), + CollectedAt: now, }) } - if err := tx.CreateInBatches(orders, 100).Error; err != nil { + // 使用Upsert逻辑: 以 account_id, order_id, ymd 为条件 + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "account_id"}, {Name: "order_id"}, {Name: "ymd"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "stock_code", "price", "volume", + "traded_price", "traded_volume", "order_status", + "order_time", "order_remark", "data_hash", "collected_at", + }), + }).Create(&orders).Error; err != nil { return fmt.Errorf("保存订单失败: %w", err) } } - // 批量保存持仓 + // 批量保存持仓 (Upsert: 存在则更新,不存在则插入) if len(status.Data.Positions) > 0 { - positions := make([]models.PositionRecord, 0, len(status.Data.Positions)) + positions := make([]models.CollectorPosition, 0, len(status.Data.Positions)) for _, pos := range status.Data.Positions { - positions = append(positions, models.PositionRecord{ + positions = append(positions, models.CollectorPosition{ AccountID: status.Data.Assets.AccountID, Code: pos.Code, + Ymd: ymd, Volume: pos.Volume, CanUseVolume: pos.CanUseVolume, FrozenVolume: pos.FrozenVolume, @@ -143,20 +165,30 @@ func (s *Storage) SaveStatus(status *models.Status, dataHash string) error { ProfitRate: pos.ProfitRate, MinProfitRate: pos.MinProfitRate, DataHash: dataHash, - CollectedAt: time.Now(), + CollectedAt: now, }) } - if err := tx.CreateInBatches(positions, 100).Error; err != nil { + // 使用Upsert逻辑: 以 account_id, code, ymd 为条件 + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "account_id"}, {Name: "code"}, {Name: "ymd"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "volume", "can_use_volume", "frozen_volume", + "avg_price", "open_price", "current_price", + "market_value", "profit", "profit_rate", + "min_profit_rate", "data_hash", "collected_at", + }), + }).Create(&positions).Error; err != nil { return fmt.Errorf("保存持仓失败: %w", err) } } - // 批量保存行情数据 + // 批量保存行情数据 (Upsert: 存在则更新,不存在则插入) if len(status.Data.TickData) > 0 { - ticks := make([]models.TickRecord, 0, len(status.Data.TickData)) + ticks := make([]models.CollectorTick, 0, len(status.Data.TickData)) for code, tick := range status.Data.TickData { - ticks = append(ticks, models.TickRecord{ + ticks = append(ticks, models.CollectorTick{ StockCode: code, + Ymd: ymd, LastPrice: tick.LastPrice, Open: tick.Open, High: tick.High, @@ -173,10 +205,20 @@ func (s *Storage) SaveStatus(status *models.Status, dataHash string) error { TimeTag: tick.TimeTag, StockStatus: tick.StockStatus, DataHash: dataHash, - CollectedAt: time.Now(), + CollectedAt: now, }) } - if err := tx.CreateInBatches(ticks, 100).Error; err != nil { + // 使用Upsert逻辑: 以 stock_code, ymd 为条件 + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "stock_code"}, {Name: "ymd"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "last_price", "open", "high", "low", "last_close", + "volume", "amount", "pvolume", + "bid_prices", "bid_volumes", "ask_prices", "ask_volumes", + "time", "timetag", "stock_status", + "data_hash", "collected_at", + }), + }).Create(&ticks).Error; err != nil { return fmt.Errorf("保存行情数据失败: %w", err) } } @@ -186,9 +228,10 @@ func (s *Storage) SaveStatus(status *models.Status, dataHash string) error { } // SaveCollectionLog 保存采集日志 -func (s *Storage) SaveCollectionLog(dataHash string, hasChanged bool, statusMessage string) error { - log := models.CollectionLog{ +func (s *Storage) SaveCollectionLog(dataHash string, ymd int, hasChanged bool, statusMessage string) error { + log := models.CollectorLog{ DataHash: dataHash, + Ymd: ymd, HasChanged: hasChanged, StatusMessage: statusMessage, CollectedAt: time.Now(), diff --git a/types/req.go b/types/req.go new file mode 100644 index 0000000..d07c7f6 --- /dev/null +++ b/types/req.go @@ -0,0 +1,84 @@ +package types + +// Status 状态数据结构 +type Status struct { + Data Data `json:"data"` + Status Config `json:"status"` +} + +// Data 数据部分 +type Data struct { + Assets Assets `json:"assets"` + Orders []Order `json:"order"` + Positions []Position `json:"positions"` + TickData map[string]Tick `json:"tick_data"` +} + +// Assets 资产信息 +type Assets struct { + AccountID string `json:"account_id"` + Cash float64 `json:"cash"` + FrozenCash float64 `json:"frozen_cash"` + MarketValue float64 `json:"market_value"` + Profit float64 `json:"profit"` + TotalAsset float64 `json:"total_asset"` +} + +// Order 订单信息 +type Order struct { + OrderID int64 `json:"order_id"` + OrderRemark string `json:"order_remark"` + OrderStatus int `json:"order_status"` + OrderTime int64 `json:"order_time"` + Price float64 `json:"price"` + StockCode string `json:"stock_code"` + TradedPrice float64 `json:"traded_price"` + TradedVolume int `json:"traded_volume"` + Volume int `json:"volume"` +} + +// Position 持仓信息 +type Position struct { + Code string `json:"code"` + Volume int `json:"volume"` + CanUseVolume int `json:"can_use_volume"` + FrozenVolume int `json:"frozen_volume"` + AvgPrice float64 `json:"avg_price"` + OpenPrice float64 `json:"open_price"` + CurrentPrice float64 `json:"current_price"` + MarketValue float64 `json:"market_value"` + Profit float64 `json:"profit"` + ProfitRate float64 `json:"profit_rate"` + MinProfitRate float64 `json:"min_profit_rate"` +} + +// Tick 行情数据 +type Tick struct { + LastPrice float64 `json:"lastPrice"` + Open float64 `json:"open"` + High float64 `json:"high"` + Low float64 `json:"low"` + LastClose float64 `json:"lastClose"` + Volume int64 `json:"volume"` + Amount float64 `json:"amount"` + PVolume int64 `json:"pvolume"` + BidPrice []float64 `json:"bidPrice"` + BidVol []int `json:"bidVol"` + AskPrice []float64 `json:"askPrice"` + AskVol []int `json:"askVol"` + Time int64 `json:"time"` + TimeTag string `json:"timetag"` + StockStatus int `json:"stockStatus"` + LastSettlementPrice float64 `json:"lastSettlementPrice"` + SettlementPrice float64 `json:"settlementPrice"` + OpenInt int `json:"openInt"` +} + +// Config 配置信息 +type Config struct { + ConfigKey string `json:"config_key"` + HomeName string `json:"home_name"` + ProjectRoot string `json:"project_root"` + QmtStatus string `json:"qmt_status"` + StartTime int64 `json:"start_time"` +}