跳转至


课程  因子投资  机器学习  Python  Poetry  ppw  tools  programming  Numpy  Pandas  pandas  算法  hdbscan  聚类  选股  Algo  minimum  numpy  algo  FFT  模式识别  配对交易  GBDT  LightGBM  XGBoost  statistics  CDF  KS-Test  monte-carlo  VaR  回测  过拟合  algorithms  machine learning  strategy  python  sklearn  pdf  概率  数学  面试题  量化交易  策略分类  风险管理  Info  interview  career  xgboost  PCA  wavelet  时序事件归因  SHAP  Figures  Behavioral Economics  graduate  arma  garch  人物  职场  Quantopian  figure  Banz  金融行业  买方  卖方  story  量化传奇  rsi  zigzag  穹顶压力  因子  ESG  因子策略  投资  策略  pe  ORB  Xgboost  Alligator  Indicator  factor  alpha101  alpha  技术指标  wave  quant  algorithm  pearson  spearman  tushare  因子分析  Alphalens  涨停板  herd-behaviour  momentum  因子评估  review  SMC  聪明钱  trade  history  indicators  zscore  波动率  强化学习  顶背离  freshman  resources  others  AI  DeepSeek  network  量子计算  金融交易  IBM  weekly  LLT  backtest  backtrader  研报  papers  UBL  quantlib  jupyter-notebook  scikit-learn  pypinyin  qmt  xtquant  blog  static-site  duckdb  工具  colors  free resources  barra  world quant  Alpha  openbb  数据  risk-management  llm  prompt  CANSLIM  Augment  arsenal  copilot  vscode  code  量化数据存储  hdf5  h5py  cursor  augment  trae  Jupyter  jupysql  pyarrow  parquet  数据源  quantstats  实盘  clickhouse  notebook  redis  remote-agent  AI-tools  Moonshot  回测,研报,tushare 

21 天驯化 AI 打工仔: QMT 实时分笔数据订阅系统与多 Client 问题


当数据如潮水般涌来,如何让系统稳如磐石?本文带你深入 QMT 实时数据订阅的世界,见证 007 助手如何将一个简单的数据获取程序,升级为处理能力提升 10 倍的高性能系统!

"007,我们的日线数据定时获取系统已经很稳定了,但现在我需要更细粒度的数据——分钟线数据。"我一边查看着 ClickHouse 中的日线数据,一边对我的 AI 助手说道。

"收到🫡!分钟线数据的实时性要求更高,我们需要设计一个全新的架构。"007 立刻回应道。

这是我们量化交易系统开发的第 8 天。前面几天,我们已经成功搭建了日线数据的定时获取系统,但在实际量化策略开发中,我发现仅有日线数据是远远不够的。高频交易、日内策略、技术分析等都需要更细粒度的分钟线数据。

作为一个量化交易爱好者,我一直在寻找一个稳定、高效的实时数据获取方案。市面上的数据服务要么太贵(动辄几万元一年),要么延迟太高(几秒甚至几分钟),要么覆盖面不够(只有主流股票)。QMT(迅投 QMT 量化交易平台)提供了丰富的数据接口,可以获取实时的股票行情数据。于是,我决定基于 QMT 搭建一套属于自己的实时分钟线数据订阅系统。这个过程充满了挑战,也收获了很多经验。从最初的基础版到后来的增强版,性能大幅提升,稳定性也大幅改善。今天就来分享一下这个完整的开发历程。

📋 需求分析:我们要做什么

"在开始编码之前,我们需要明确系统的核心需求。"我对 007 说道。

经过深入思考,我们梳理出了以下关键需求:

Note

核心功能需求 1. 实时获取股票分钟线数据:包括开高低收、成交量、成交额等基础数据 2. 支持全市场股票:A 股主板、中小板、创业板、科创板,覆盖 4000+ 只股票 3. 跨平台数据传输:Windows 端获取数据,Mac 端存储和查询 4. 多时间周期支持:1 分钟、5 分钟、30 分钟、日线等多种周期 5. 高可用性:7×24 小时稳定运行,自动重连和错误恢复

性能要求 - 低延迟:数据延迟控制在 1 秒以内 - 高吞吐:支持每秒处理 1000+ 条数据 - 高可靠性:数据丢失率控制在 0.1% 以下 - 可扩展性:后续可以接入更多数据源和处理逻辑

技术约束 - QMT 限制:只能在 Windows 环境运行 - 网络环境:需要跨网络传输数据 - 存储需求:海量时序数据的高效存储和查询

"这些需求看起来很有挑战性,特别是跨平台的实时数据传输。"我对 007 说道。

"没问题!我们可以用 Redis 作为消息队列,实现 Windows 到 Mac 的数据传输。"007 信心满满地回答。

🔧 QMT 接口深度调研

"首先我们需要深入了解 QMT 提供的 API 接口。"我对 007 说道。

007 立刻开始了技术调研,QMT 提供了丰富的 Python 接口,我们重点研究了以下几个关键 API:

数据订阅相关接口

Tip

  1. 单股订阅接口(基础版使用) xtdata.subscribe_quote(stock_code, period='1m', callback=callback_func)

  2. 全推行情订阅接口(增强版的关键发现) xtdata.subscribe_whole_quote(code_list, callback=callback_func)

  3. 历史数据获取接口 xtdata.get_market_data_ex(stock_list, period, start_time, end_time)

  4. 股票列表获取接口 xtdata.get_stock_list_in_sector('沪深A股')

  5. 实时 tick 数据获取接口 xtdata.get_full_tick(stock_list)

连接和控制接口

Tip

  1. 连接 QMT xtdata.connect()

  2. 启动数据接收循环 xtdata.run()

  3. 取消订阅 xtdata.unsubscribe_quote()

技术架构设计

经过深入调研和讨论,我们确定了以下技术架构:

1
2
3
4
5
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Windows端     |    |    远程Redis     │    │    Mac端        │
│   QMT数据源      │───▶│    消息队列      │───▶│   ClickHouse    │
│                 │    │                 │    │   数据存储       │
└─────────────────┘    └─────────────────┘    └─────────────────┘

核心组件选择: - Windows 端:使用 QMT 接口获取实时数据,Python + xtquant 库 - Redis:作为消息队列,实现跨平台数据传输,部署在远程服务器 - Mac 端:使用 ClickHouse 存储数据,提供查询接口

"这个架构看起来不错,但我担心性能问题。1000 只股票的实时数据,每分钟就是 1000 条记录,一天就是几十万条数据。"我有些担心地说道。

"没关系,我们可以分阶段实现。先做一个基础版验证可行性,然后再优化性能。"007 建议道。

🏗️ 从零开始:基础版系统搭建

"好,我们先实现一个基础版本,验证整体架构的可行性。"我对 007 说道。

第一步:Windows 端数据订阅器

007 首先在 Windows 端搭建了数据订阅器。最初的想法很简单:逐个订阅股票,接收数据后发送到 Redis。

"我们先用最直接的方式实现,单股订阅模式。"007 解释道。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class QMTSubscriber:
    def __init__(self, config):
        self.config = config
        self.redis_client = redis.StrictRedis(
            host=config['redis']['host'],
            port=config['redis']['port'],
            password=config['redis']['password']
        )

    def start_subscription(self):
        # 获取股票列表
        stock_list = self.get_stock_list()

        # 逐个订阅
        for stock_code in stock_list:
            seq = xtdata.subscribe_quote(
                stock_code=stock_code,
                period='1m',
                callback=self.on_data
            )

    def on_data(self, data):
        # 处理接收到的数据
        for symbol, quote_data in data.items():
            minute_bar = self.process_data(symbol, quote_data)
            # 发送到Redis
            self.redis_client.lpush("minute_bar_queue",
                                  json.dumps(minute_bar))

第二步:Mac 端数据消费器

"Windows 端负责数据获取,Mac 端负责数据存储和查询。"我对 007 说明了分工。

在 Mac 端,007 搭建了数据消费器,从 Redis 获取数据并存储到 ClickHouse:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class DataConsumer:
    def __init__(self, config):
        self.redis_client = redis.StrictRedis(
            host="8.217.201.221",
            port=16379,
            password="quantide666"
        )
        self.clickhouse_client = Client(host='localhost')

    def consume_data(self):
        while True:
            # 从 Redis 获取数据
            data = self.redis_client.brpop("minute_bar_queue", timeout=1)
            if data:
                minute_bar = json.loads(data[1])
                # 插入 ClickHouse
                self.insert_to_clickhouse(minute_bar)

第三步:初步测试与现实的冲击

"系统搭建完成,我们来测试一下效果。"我满怀期待地说道。

基础版系统搭建完成后,我们进行了初步测试。结果让人喜忧参半:

🎉 好消息: - 系统能够正常运行,架构验证成功 - 数据能够从 Windows 传输到 Mac,跨平台方案可行 - ClickHouse 中能够看到实时数据,存储方案有效 - 基本的分钟线数据格式正确

😰 坏消息: - 订阅 1000 只股票需要 5 分钟,效率太低 - 数据处理速度只有 1-2 条/秒,远低于预期 - 偶尔出现数据异常和连接中断 - 内存使用量持续增长,存在内存泄漏

"看来基础版只能算是一个原型,距离生产环境还有很大差距。"我有些失望地说道。

"没关系,这是正常的。我们已经验证了架构的可行性,接下来就是优化的问题了。"007 安慰道。

😤 遇到的问题:基础版的瓶颈分析

"我们需要仔细分析基础版的问题,才能有针对性地优化。"我对 007 说道。

在基础版系统运行了一段时间后,我们发现了几个明显的问题:

问题 1:订阅效率低下

1
2
2025-05-29 15:00:46,949 - qmt_subscriber - INFO - 订阅完成: 成功 100 只, 失败 0 只
2025-05-29 15:00:46,949 - qmt_subscriber - WARNING - 订阅成功率较低(100/1000),切换到轮询模式...

"这个日志很有意思,明明成功率是 100%,为什么系统判断为'较低'?"我疑惑地问道。

007 分析后发现:系统误判 100% 的成功率为"较低",频繁切换到低效的轮询模式。更要命的是,单股订阅模式下,1000 只股票需要逐个订阅,每只股票耗时约 0.3 秒,总计需要 5 分钟才能完成订阅。

问题 2:性能瓶颈明显

"单线程处理明显跟不上数据流的速度。"007 指出了核心问题。

数据处理采用单线程模式,每条数据都要经历"接收→处理→发布"的串行流程: - 接收瓶颈:QMT 回调函数中处理时间过长,影响后续数据接收 - 处理瓶颈:数据格式转换和验证耗时较长 - 发布瓶颈:每条数据单独发送到 Redis,网络开销大

在市场活跃时段(如开盘前 30 分钟),数据积压严重,延迟从 1 秒增加到 10 秒以上。

🚀 技术突破:全推行情订阅的威力

"我们需要从根本上改变订阅模式。"007 开始了深度技术调研。

经过深入研究 QMT API 文档,007 发现了一个关键的 API:subscribe_whole_quote

"我找到了解决方案!这个 API 支持全推行情订阅,比单股订阅效率高得多!"007 兴奋地说。

"全推行情?这听起来很厉害,具体是什么原理?"我好奇地问道。

"简单来说,就是一次性订阅全市场的数据,而不是逐个订阅每只股票。这样可以大大减少 API 调用次数和网络开销。"007 解释道。

基础版 vs 增强版订阅对比

让我们来看看两种订阅模式的具体差异:

基础版(单股订阅):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 逐个订阅,效率低下
def start_individual_subscription(self):
    success_count = 0
    for stock_code in stock_list:
        try:
            seq = xtdata.subscribe_quote(
                stock_code=stock_code,
                period='1m',
                callback=self.on_data
            )
            success_count += 1
            time.sleep(0.1)  # 避免 API 调用过快
        except Exception as e:
            self.logger.error(f"订阅失败: {stock_code}, {e}")

    self.logger.info(f"订阅完成: 成功 {success_count} 只")

增强版(全推行情订阅):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 一次性订阅全市场,效率极高
def start_whole_quote_subscription(self):
    try:
        result = xtdata.subscribe_whole_quote(
            code_list=self.stock_list,
            callback=self.on_whole_quote_data
        )
        if result == 0:  # 0 表示成功
            self.logger.info(f"全推行情订阅成功: {len(self.stock_list)} 只股票")
            return True
    except Exception as e:
        self.logger.error(f"全推行情订阅失败: {e}")
        # 自动降级到单股订阅
        return self.start_individual_subscription()

"选择合适的 API 对性能的影响是决定性的。"007 总结道。

⚡ 批量处理性能提升

"基础版性能还是不够。单条处理太慢了,我们需要批量处理!"007 提出了关键的性能优化思路。

"批量处理确实是提升性能的关键。"我赞同道,"但要注意平衡批量大小和实时性。"

Windows 端:生产者批量处理架构

007 设计了一个精巧的生产者-消费者模式的多线程架构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class EnhancedQMTSubscriber:
    def __init__(self, config):
        self.batch_size = config['system']['batch_size']  # 100
        self.batch_timeout = config['system']['batch_timeout']  # 1.0 秒
        self.data_queue = queue.Queue(maxsize=5000)
        self.batch_data = []
        self.last_batch_time = time.time()

    def start_data_processing_threads(self):
        """启动数据处理线程"""
        # 批量发布线程
        publish_thread = threading.Thread(target=self.batch_publish_worker, daemon=True)
        publish_thread.start()

        # 数据清理线程
        cleanup_thread = threading.Thread(target=self.data_cleanup_worker, daemon=True)
        cleanup_thread.start()

        # 性能监控线程
        monitor_thread = threading.Thread(target=self.performance_monitor, daemon=True)
        monitor_thread.start()

    def batch_publish_worker(self):
        """批量发布工作线程"""
        while True:
            try:
                # 从队列获取数据
                minute_bar = self.data_queue.get(timeout=0.1)
                self.batch_data.append(minute_bar)

                # 检查是否需要发布批量数据
                current_time = time.time()
                should_publish = (
                    len(self.batch_data) >= self.batch_size or
                    current_time - self.last_batch_time >= self.batch_timeout
                )

                if should_publish and self.batch_data:
                    self.batch_publish_to_redis(self.batch_data.copy())
                    self.batch_data.clear()
                    self.last_batch_time = current_time

            except queue.Empty:
                # 超时检查
                current_time = time.time()
                if (self.batch_data and
                    current_time - self.last_batch_time >= self.batch_timeout):
                    self.batch_publish_to_redis(self.batch_data.copy())
                    self.batch_data.clear()
                    self.last_batch_time = current_time

    def batch_publish_to_redis(self, batch_data):
        """批量发布到Redis"""
        try:
            pipe = self.redis_client.pipeline()
            for data in batch_data:
                pipe.lpush("minute_bar_queue", json.dumps(data))
            pipe.execute()

            self.stats['published_count'] += len(batch_data)
            self.logger.debug(f"批量发布成功: {len(batch_data)} 条数据")

        except Exception as e:
            self.logger.error(f"批量发布失败: {e}")
            # 降级到单条发布
            self.fallback_single_publish(batch_data)

Mac 端:多工作线程批量插入

在 Mac 端,007 设计了更强大的多工作线程批量插入机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class EnhancedDataConsumer:
    def __init__(self, config):
        self.worker_count = config['system']['worker_count']  # 4
        self.batch_size = config['system']['batch_size']  # 1000
        self.batch_timeout = config['system']['batch_timeout']  # 5.0 秒
        self.worker_queues = [queue.Queue(maxsize=1000) for _ in range(self.worker_count)]

    def start_worker_threads(self):
        """启动工作线程"""
        for i in range(self.worker_count):
            worker_thread = threading.Thread(
                target=self.batch_insert_worker,
                args=(f"worker-{i}", self.worker_queues[i]),
                daemon=True
            )
            worker_thread.start()
            self.logger.info(f"启动工作线程: worker-{i}")

    def batch_insert_worker(self, worker_name, worker_queue):
        """批量插入工作线程"""
        batch_data = []
        last_batch_time = time.time()

        while True:
            try:
                # 从工作队列获取数据
                data = worker_queue.get(timeout=0.5)
                batch_data.append(data)

                # 检查是否需要批量插入
                current_time = time.time()
                should_insert = (
                    len(batch_data) >= self.batch_size or
                    current_time - last_batch_time >= self.batch_timeout
                )

                if should_insert and batch_data:
                    self.batch_insert_clickhouse(worker_name, batch_data.copy())
                    batch_data.clear()
                    last_batch_time = current_time

            except queue.Empty:
                # 超时检查
                current_time = time.time()
                if (batch_data and
                    current_time - last_batch_time >= self.batch_timeout):
                    self.batch_insert_clickhouse(worker_name, batch_data.copy())
                    batch_data.clear()
                    last_batch_time = current_time

    def batch_insert_clickhouse(self, worker_name, batch_data):
        """批量插入ClickHouse"""
        try:
            # 数据格式转换
            formatted_data = []
            for item in batch_data:
                formatted_data.append([
                    item['symbol'], item['frame'], item['open'],
                    item['high'], item['low'], item['close'],
                    item['vol'], item['amount']
                ])

            # 批量插入
            self.clickhouse_client.execute(
                "INSERT INTO minute_bars VALUES",
                formatted_data
            )

            self.stats['inserted_count'] += len(batch_data)
            self.logger.debug(f"{worker_name} 批量插入成功: {len(batch_data)} 条")

        except Exception as e:
            self.logger.error(f"{worker_name} 批量插入失败: {e}")
            self.stats['insert_errors'] += 1

"批量处理确实是性能优化的银弹,特别是在 I/O 密集型场景中。"007 总结道。

📊 实时监控:系统状态一目了然

"没有监控的系统是盲飞。"我对 007 说。

007 设计了详细的性能监控系统,每分钟输出一次报告:

Windows 端实时监控

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
2025-06-07 09:32:35,229 - INFO - 🚀 启动 QMT 分钟线全推订阅器...
2025-06-07 09:32:35,808 - INFO - ✅ Redis 连接池创建成功: 8.217.201.221:16379
2025-06-07 09:32:35,809 - INFO - 📊 连接池配置: 最大连接数=20, 启用 keepalive
***** xtdata 连接成功 *****
服务信息: {'tag': 'sp3', 'version': '1.0'}
服务地址: 127.0.0.1:58610
数据路径: C:\Program Files\国金证券 QMT 交易端\bin.x64/../userdata_mini/datadir
设置 xtdata.enable_hello = False 可隐藏此消息

2025-06-07 09:32:35,822 - INFO - QMT 连接对象: <class 'xtquant.datacenter.IPythonApiClient'>
2025-06-07 09:32:35,826 - INFO - ✅ QMT 连接成功 - 测试股票: 平安银行
2025-06-07 09:32:35,826 - INFO - 🔍 获取沪深 A 股完整列表...
2025-06-07 09:32:35,867 - INFO - 🎉 成功获取沪深 A 股: 5147 只股票!
2025-06-07 09:32:35,868 - INFO - 📋 股票范围: 600051.SH 到 300271.SZ
2025-06-07 09:32:35,872 - INFO - 📊 股票分布: 深圳 2867 只, 上海 2280 只
2025-06-07 09:32:35,873 - INFO - 🚀 启动 QMT 全推分钟线订阅...
2025-06-07 09:32:35,909 - INFO - 📊 subscribe_whole_quote 返回: 1 (类型: <class 'int'>)
2025-06-07 09:32:35,909 - INFO - ✅ 全推订阅成功! 订阅股票数: 5147
2025-06-07 09:32:35,912 - INFO - ✅ 监控线程已启动
2025-06-07 09:32:35,913 - INFO - ✅ 订阅器启动成功,开始接收数据...
2025-06-07 09:32:35,914 - INFO - 按 Ctrl+C 停止订阅
2025-06-07 09:32:35,917 - INFO - 🔔 全推数据回调 #1
2025-06-07 09:32:35,917 - INFO -    数据类型: <class 'dict'>
2025-06-07 09:32:35,918 - INFO -    数据量: 1
2025-06-07 09:32:35,927 - INFO -    样例股票: ['601878.SH']
2025-06-07 09:32:35,928 - INFO -    样例数据类型: <class 'dict'>
2025-06-07 09:32:35,929 - INFO -    样例字段: ['time', 'lastPrice', 'open', 'high', 'low', 'lastClose', 'amount', 'volume', 'pvolume', 'stockStatus']
2025-06-07 09:32:36,142 - INFO - 🔔 全推数据回调 #2
2025-06-07 09:32:36,143 - INFO -    数据类型: <class 'dict'>
2025-06-07 09:32:36,145 - INFO -    数据量: 2278
2025-06-07 09:32:36,146 - INFO -    样例股票: ['600000.SH', '600004.SH', '600006.SH']
2025-06-07 09:32:36,147 - INFO -    样例数据类型: <class 'dict'>
2025-06-07 09:32:36,148 - INFO -    样例字段: ['time', 'lastPrice', 'open', 'high', 'low', 'lastClose', 'amount', 'volume', 'pvolume', 'stockStatus']
2025-06-07 09:32:37,596 - INFO - 📊 批量发布: 100条, 总计: 500条, 平均批量: 100.0
2025-06-07 09:32:38,696 - INFO - 📊 批量发布: 100条, 总计: 1000条, 平均批量: 100.0
2025-06-07 09:32:39,756 - INFO - 📊 批量发布: 100条, 总计: 1500条, 平均批量: 100.0
2025-06-07 09:32:40,781 - INFO - 📊 批量发布: 100条, 总计: 2000条, 平均批量: 100.0
2025-06-07 09:32:41,196 - INFO - 🔔 全推数据回调 #3
2025-06-07 09:32:41,196 - INFO -    数据类型: <class 'dict'>
2025-06-07 09:32:41,198 - INFO -    数据量: 2865
2025-06-07 09:32:41,200 - INFO -    样例股票: ['000001.SZ', '000002.SZ', '000004.SZ']
2025-06-07 09:32:41,200 - INFO -    样例数据类型: <class 'dict'>
2025-06-07 09:32:41,201 - INFO -    样例字段: ['time', 'lastPrice', 'open', 'high', 'low', 'lastClose', 'amount', 'volume', 'pvolume', 'stockStatus']
2025-06-07 09:32:41,688 - INFO - 📊 批量发布: 100条, 总计: 2500条, 平均批量: 100.0
2025-06-07 09:32:42,564 - INFO - 📊 批量发布: 100条, 总计: 3000条, 平均批量: 100.0
2025-06-07 09:32:43,415 - INFO - 📊 批量发布: 100条, 总计: 3500条, 平均批量: 100.0
2025-06-07 09:32:44,233 - INFO - 📊 批量发布: 100条, 总计: 4000条, 平均批量: 100.0
2025-06-07 09:32:45,120 - INFO - 📊 批量发布: 100条, 总计: 4500条, 平均批量: 100.0
2025-06-07 09:32:45,955 - INFO - 📊 批量发布: 100条, 总计: 5000条, 平均批量: 100.0
2025-06-07 09:32:46,128 - INFO - 🔔 全推数据回调 #4
2025-06-07 09:32:46,129 - INFO -    数据类型: <class 'dict'>
2025-06-07 09:32:46,131 - INFO -    数据量: 1
2025-06-07 09:32:46,132 - INFO -    样例股票: ['688757.SH']
2025-06-07 09:32:46,133 - INFO -    样例数据类型: <class 'dict'>
2025-06-07 09:32:46,134 - INFO -    样例字段: ['time', 'lastPrice', 'open', 'high', 'low', 'lastClose', 'amount', 'volume', 'pvolume', 'stockStatus']
2025-06-07 09:32:46,135 - INFO - 🔔 全推数据回调 #5
2025-06-07 09:32:46,136 - INFO -    数据类型: <class 'dict'>
2025-06-07 09:32:46,136 - INFO -    数据量: 4
2025-06-07 09:32:46,137 - INFO -    样例股票: ['601225.SH', '601658.SH', '688458.SH']
2025-06-07 09:32:46,138 - INFO -    样例数据类型: <class 'dict'>
2025-06-07 09:32:46,139 - INFO -    样例字段: ['time', 'lastPrice', 'open', 'high', 'low', 'lastClose', 'amount', 'volume', 'pvolume', 'stockStatus']
2025-06-07 09:32:46,844 - INFO - 📊 批量发布: 100条, 总计: 5500条, 平均批量: 100.0
2025-06-07 09:33:05,913 - INFO - 📊 Windows 端性能统计 - 运行时间: 0:00:30.001766
2025-06-07 09:33:05,913 - INFO -    回调: 464 次 (15.5 次/秒)
2025-06-07 09:33:05,916 - INFO -    接收: 7689 条 (256.3 条/秒)
2025-06-07 09:33:05,917 - INFO -    发布: 7678 条 (255.9 条/秒)
2025-06-07 09:33:05,917 - INFO -    成功率: 99.9%
2025-06-07 09:33:05,918 - INFO -    错误统计: 处理错误 0 次
2025-06-07 09:33:05,919 - INFO -    批量统计: 82 次, 平均: 93.6 条/批
2025-06-07 09:33:05,920 - INFO -    缓冲区: 11 条 | Redis 操作: 82 次
2025-06-07 09:33:05,920 - INFO -    处理股票: 5143 只 | 数据完整性: ✅
2025-06-07 09:33:35,922 - INFO - 📊 Windows端性能统计 - 运行时间: 0:01:00.011022
2025-06-07 09:33:35,923 - INFO -    回调: 848次 (14.1次/秒)
2025-06-07 09:33:35,925 - INFO -    接收: 8909条 (148.5条/秒)
2025-06-07 09:33:35,926 - INFO -    发布: 8885条 (148.1条/秒)
2025-06-07 09:33:35,927 - INFO -    成功率: 99.7%
2025-06-07 09:33:35,927 - INFO -    错误统计: 处理错误0次
2025-06-07 09:33:35,928 - INFO -    批量统计: 108次, 平均: 82.3条/批
2025-06-07 09:33:35,929 - INFO -    缓冲区: 24条 | Redis操作: 108次
2025-06-07 09:33:35,930 - INFO -    处理股票: 5143只 | 数据完整性: ✅
2025-06-07 09:34:05,931 - INFO - 📊 Windows端性能统计 - 运行时间: 0:01:30.020076
2025-06-07 09:34:05,931 - INFO -    回调: 1268次 (14.1次/秒)
2025-06-07 09:34:05,934 - INFO -    接收: 10972条 (121.9条/秒)
2025-06-07 09:34:05,934 - INFO -    发布: 10960条 (121.8条/秒)
2025-06-07 09:34:05,935 - INFO -    成功率: 99.9%
2025-06-07 09:34:05,936 - INFO -    错误统计: 处理错误0次
2025-06-07 09:34:05,937 - INFO -    批量统计: 141次, 平均: 77.7条/批
2025-06-07 09:34:05,937 - INFO -    缓冲区: 12条 | Redis操作: 141次
2025-06-07 09:34:05,938 - INFO -    处理股票: 5143只 | 数据完整性: ✅

Question

为什么日志中的成功率不是100%呢?

007给出了以下回答。

在我们的程序中,有检测数据完整性的验证逻辑:

1
2
3
4
5
6
def validate_minute_bar_data(self, minute_bar: Dict) -> bool:
    # 检查必要字段存在性
    required_fields = ['symbol', 'open', 'high', 'low', 'close', 'vol']
    for field in required_fields:
        if field not in minute_bar or minute_bar[field] is None:
            return False
如果QMT返回的某些股票数据缺少必要字段或字段值为None,这些数据会被过滤掉,不会发布到Redis。在 build_minute_bar 函数中,如果QMT返回的数据格式异常、数据类型转换失败或其他构建过程中的异常都会导致数据不被发布。此外,从日志中可以看到“缓冲区: 12条”,说明还有12条数据在等待发布,这些数据已经被“接收”计数,但还没有被“发布”到Redis,会在下次批量发布或程序停止时才发布。

由于统计是实时计算的,在某个时间点(例如 09:34:05): - 接收计数:10972 条(已处理的股票数据) - 发布计数:10960 条(已成功发布到 Redis) - 差异:12 条(主要是缓冲区中的数据)

总结:99.9% 的成功率是正常的,主要原因是缓冲区中有少量数据还未发布,以及极少数数据因完整性验证失败被过滤。这个成功率表明系统运行良好,数据质量控制有效。

Mac 端实时监控

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
    ╔══════════════════════════════════════════════════════════════╗
    ║                增强版数据消费系统 v2.0 - Mac端                 ║
    ║                                                              ║
    ║  基于最佳实践的高性能股票数据消费与存储系统                     ║
    ║                                                              ║
    ║  主要特性:                                                    ║
    ║  • 多线程批量数据处理                                         ║
    ║  • 数据质量检查和评分                                         ║
    ║  • ClickHouse高性能存储                                      ║
    ║  • 实时监控和资源管理                                         ║
    ║  • 自动数据清理和维护                                         ║
    ║                                                              ║
    ╚══════════════════════════════════════════════════════════════╝

正在加载配置...
============================================================
Mac端系统配置信息
============================================================
批量大小: 1000
批量超时: 5.0秒
工作线程数: 4
数据质量检查: 启用
数据保留天数: 30
日志级别: INFO
============================================================
2025-06-06 18:44:36,876 - __main__ - INFO - 正在初始化增强版数据消费器...
2025-06-06 18:44:36,876 - __main__ - INFO - 正在启动增强版数据消费...
2025-06-06 18:44:37,404 - enhanced_data_consumer - INFO - Redis 连接正常
2025-06-06 18:44:37,442 - enhanced_data_consumer - INFO - ClickHouse 连接正常
2025-06-06 18:44:37,446 - enhanced_data_consumer - INFO - 数据库表初始化完成
2025-06-06 18:44:37,449 - enhanced_data_consumer - INFO - 聚合表创建完成
2025-06-06 18:44:37,449 - enhanced_data_consumer - INFO - 物化视图创建跳过,使用手动聚合方式
2025-06-06 18:44:37,449 - enhanced_data_consumer - INFO - 启动批量插入工作线程: worker-0
2025-06-06 18:44:37,450 - enhanced_data_consumer - INFO - 启动批量插入工作线程: worker-1
2025-06-06 18:44:37,450 - enhanced_data_consumer - INFO - 启动批量插入工作线程: worker-2
2025-06-06 18:44:37,450 - enhanced_data_consumer - INFO - 启动批量插入工作线程: worker-3
2025-06-06 18:44:37,451 - enhanced_data_consumer - INFO - 增强版数据消费器启动成功
2025-06-06 18:44:37,451 - enhanced_data_consumer - INFO - 开始消费 Redis 数据...
2025-06-06 18:45:39,047 - enhanced_data_consumer - INFO - ============================================================
2025-06-06 18:45:39,047 - enhanced_data_consumer - INFO - Mac 端性能监控报告
2025-06-06 18:45:39,048 - enhanced_data_consumer - INFO - ============================================================
2025-06-06 18:45:39,048 - enhanced_data_consumer - INFO - 运行时间: 0:01:02.170886
2025-06-06 18:45:39,048 - enhanced_data_consumer - INFO - 消费数据: 363 条 (5.8/秒)
2025-06-06 18:45:39,048 - enhanced_data_consumer - INFO - 插入数据: 341 条 (5.5/秒)
2025-06-06 18:45:39,049 - enhanced_data_consumer - INFO - 成功率: 93.9%
2025-06-06 18:45:39,049 - enhanced_data_consumer - INFO - 插入错误: 0 次
2025-06-06 18:45:39,050 - enhanced_data_consumer - INFO - 质量错误: 0 次
2025-06-06 18:45:39,050 - enhanced_data_consumer - INFO - 队列大小: 0
2025-06-06 18:45:39,050 - enhanced_data_consumer - INFO - 处理股票: 363 只
2025-06-06 18:45:39,050 - enhanced_data_consumer - INFO - 最后插入: 2025-06-06 18:45:36.063544
2025-06-06 18:45:39,050 - enhanced_data_consumer - INFO - ============================================================
2025-06-06 18:46:44,277 - enhanced_data_consumer - INFO - ============================================================
2025-06-06 18:46:44,277 - enhanced_data_consumer - INFO - Mac端性能监控报告
2025-06-06 18:46:44,277 - enhanced_data_consumer - INFO - ============================================================
2025-06-06 18:46:44,277 - enhanced_data_consumer - INFO - 运行时间: 0:02:07.400751
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 消费数据: 716条 (5.6/秒)
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 插入数据: 700条 (5.5/秒)
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 成功率: 97.8%
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 插入错误: 0次
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 质量错误: 0次
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 队列大小: 0
2025-06-06 18:46:44,278 - enhanced_data_consumer - INFO - 处理股票: 716只
2025-06-06 18:46:44,279 - enhanced_data_consumer - INFO - 最后插入: 2025-06-06 18:46:41.544365
2025-06-06 18:46:44,279 - enhanced_data_consumer - INFO - ============================================================

🛠️ 开发过程中的技术挑战

挑战 1:QMT API 的兼容性问题

在测试全推行情订阅时,我们遇到了 API 兼容性问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 第一次尝试失败
try:
    result = xtdata.subscribe_whole_quote(
        code_list=self.stock_list,
        callback=self.on_whole_quote_data
    )
except Exception as e:
    self.logger.error(f"全推行情订阅失败: {e}")
    # 自动降级到单股订阅
    return self.start_individual_subscription()

007 机智地设计了降级机制:如果全推行情订阅失败,自动切换到单股订阅模式,确保系统的鲁棒性。

挑战 2:数据格式的多样性

QMT 返回的数据格式并不统一,有时是字典,有时是列表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process_quote_data(self, symbol, quote_data):
    """处理行情数据"""
    if isinstance(quote_data, dict):
        # 字典格式处理
        minute_bar = {
            "symbol": symbol,
            "open": float(quote_data.get('open', 0)),
            # ...
        }
    elif isinstance(quote_data, list) and len(quote_data) >= 6:
        # 列表格式处理
        minute_bar = {
            "symbol": symbol,
            "open": float(quote_data[1]) if len(quote_data) > 1 else 0,
            # ...
        }

007 设计了智能的数据格式适配器,能够自动识别和处理不同的数据格式。

系统功能验证

增强版系统自动完成了以下功能:

  • 全市场覆盖:1000 只股票实时订阅,覆盖主板、中小板、创业板、科创板
  • 智能数据质量:90% 以上的数据质量,自动过滤异常数据
  • 批量高效处理:6 条/秒的处理速度
  • 实时性能监控:每分钟输出详细监控报告
  • 自动错误恢复:网络中断自动重连,API 失败自动降级
  • 资源优化管理:内存使用稳定,CPU 使用率低
  • 7×24 小时运行:连续运行 48 小时无故障

数据存储效果验证

"让我们快速查询一下最近 3 分钟的数据,验证系统的实时性。"我打开 ClickHouse 客户端。

1
2
3
4
5
6
7
8
9
-- 查询最近 3 分钟的数据
SELECT
    count() as total_records,
    count(DISTINCT symbol) as unique_symbols,
    min(frame) as earliest_time,
    max(frame) as latest_time,
    round(avg(vol), 0) as avg_volume
FROM minute_bars
WHERE frame >= now() - INTERVAL 3 MINUTE;

"太棒了!"我兴奋地指着查询结果,"你看: - 实时性验证:最近 3 分钟内有 5434 条记录 - 系统状态:数据持续更新到最新时间,系统运行正常"

007 也很满意:"收到🫡!简单的查询就能验证系统的实时性和稳定性,增强版表现优异!"

❓多 client 消费 Redis 的数据

Question

如果我希望支持多个客户端(例如 5 个)同时消费相同的 Redis 数据,应该怎么设计方案呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据生产者     │───▶│  Redis Streams  │───▶│   消费者组      │
│  (QMT Windows)  │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                       ┌────────────────────────────────┼────────────────────────────────┐
                       │                                │                                │
                       ▼                                ▼                                ▼
              ┌─────────────────┐              ┌─────────────────┐              ┌─────────────────┐
              │   消费者客户端 1  │              │   消费者客户端 2  │               │   消费者客户端 N  │
              │   (Mac 端)      │              │   (Linux 端)    │              │   (其他端)      │
              └─────────────────┘              └─────────────────┘              └─────────────────┘
                       │                                │                                │
                       ▼                                ▼                                ▼
              ┌─────────────────┐              ┌─────────────────┐              ┌─────────────────┐
              │  ClickHouse DB  │              │  ClickHouse DB  │              │  其他存储系统     │
              └─────────────────┘              └─────────────────┘              └─────────────────┘

007 非常贴心地给出了以下的方案:

方案设计

方案一:发布/订阅模式(Pub/Sub)- 推荐

1
数据生产者 → Redis Pub/Sub Channel → 多个消费者客户端
优点: - 天然支持多个订阅者 - 实时性好 - 实现简单

缺点: - 消息不持久化,客户端离线时会丢失数据 - 无法保证消息送达

方案二:Redis Streams + 消费者组 - 最佳方案

1
数据生产者 → Redis Stream → 消费者组 → 多个消费者客户端
优点: - 消息持久化 - 支持消费者组,自动负载均衡 - 支持消息确认机制 - 支持故障恢复

方案三:多队列分片模式

1
数据生产者 → 多个 Redis 队列 → 多个消费者客户端(每个客户端消费所有队列)

具体实现

🏗️ 架构设计 - 基于 Redis Streams + 消费者组:这是最佳方案,支持消息持久化、自动负载均衡和故障恢复 - 支持 3 种消费模式: - 广播模式:每个客户端接收所有数据副本 - 分片模式:数据按规则分片,客户端处理不同数据子集 - 混合模式:结合广播和分片的优势

📁 实现文件 - multi_client_design.md - 完整的架构设计文档 - multi_client_consumer.py - 多客户端消费器核心实现 - multi_client_config.yaml - 详细的配置文件模板 - multi_client_main.py - 主程序和管理界面 - stream_producer_example.py - 生产者端示例代码

Redis Streams 数据结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Stream Key: minute_bar_stream
消息格式: {
    "symbol": "000001.SZ",
    "frame": "2025-06-05 10:30:00",
    "open": 10.50,
    "high": 10.80,
    "low": 10.45,
    "close": 10.75,
    "vol": 1000000,
    "amount": 10750000,
    "timestamp": 1733356200.123,
    "quality_score": 0.95
}

消费者组配置:

1
2
消费者组名: minute_bar_consumers
消费者ID: client_mac_001, client_linux_002, etc.

消费者端的架构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class MultiClientDataConsumer:
    def __init__(self, client_id, consumer_group="minute_bar_consumers"):
        self.client_id = client_id
        self.consumer_group = consumer_group
        self.stream_key = "minute_bar_stream"

    def start_consumption(self):
        # 创建消费者组
        self.create_consumer_group()

        # 开始消费数据
        self.consume_stream_data()

    def create_consumer_group(self):
        try:
            self.redis_client.xgroup_create(
                self.stream_key,
                self.consumer_group,
                id='0',
                mkstream=True
            )
        except redis.ResponseError:
            # 消费者组已存在
            pass

    def consume_stream_data(self):
        while self.is_running:
            # 从Stream读取数据
            messages = self.redis_client.xreadgroup(
                self.consumer_group,
                self.client_id,
                {self.stream_key: '>'},
                count=100,
                block=1000
            )

            for stream, msgs in messages:
                for msg_id, fields in msgs:
                    self.process_message(msg_id, fields)

    def process_message(self, msg_id, fields):
        try:
            # 处理数据
            self.handle_minute_bar_data(fields)

            # 确认消息处理完成
            self.redis_client.xack(
                self.stream_key,
                self.consumer_group,
                msg_id
            )
        except Exception as e:
            self.logger.error(f"处理消息失败: {e}")
            # 可以选择重试或放入死信队列

实现效果

我们设计了一个多客户端消费器管理页面,在没有任何输入的时候会返回平安银行的实时价格:

1. 查看消费者状态

2. 查看统计信息

3. 停止消费者

总结

"007,这次的增强版开发真的很棒!从基础版的原型验证到增强版的性能飞跃,并且给我提供了多 Client 问题的方案,你的表现超出了我的预期。"我由衷地赞叹。

"谢谢你的认可!这得益于我们团队的合作和不断的学习。"007 谦虚地说。

下一节,我和 007 将继续解决分钟线合成的问题,并且对我们的系统逻辑进一步修改和完善。