跳转至


课程  因子投资  机器学习  Python  Poetry  ppw  tools  programming  Numpy  Pandas  pandas  算法  hdbscan  聚类  选股  Algo  minimum  numpy  algo  FFT  模式识别  配对交易  GBDT  LightGBM  XGBoost  statistics  CDF  KS-Test  monte-carlo  VaR  回测  过拟合  algorithms  machine learning  strategy  python  sklearn  pdf  概率  数学  面试题  量化交易  策略分类  风险管理  Info  interview  career  xgboost  PCA  wavelet  时序事件归因  SHAP  Figures  Behavioral Economics  graduate  arma  garch  人物  职场  Quantopian  figure  Banz  金融行业  买方  卖方  story  量化传奇  rsi  zigzag  穹顶压力  因子  ESG  因子策略  投资  策略  pe  ORB  Xgboost  Alligator  Indicator  factor  alpha101  alpha  技术指标  wave  quant  algorithm  pearson  spearman  tushare  因子分析  Alphalens  涨停板  herd-behaviour  momentum  因子评估  review  SMC  聪明钱  trade  history  indicators  zscore  波动率  强化学习  顶背离  freshman  resources  others  AI  DeepSeek  network  量子计算  金融交易  IBM  weekly  LLT  backtest  backtrader  研报  papers  UBL  quantlib  jupyter-notebook  scikit-learn  pypinyin  qmt  xtquant  blog  static-site  duckdb  工具  colors  free resources  barra  world quant  Alpha  openbb  数据  risk-management  llm  prompt  CANSLIM  Augment  arsenal  copilot  vscode  code  量化数据存储  hdf5  h5py  cursor  augment  trae  Jupyter  jupysql  pyarrow  parquet  数据源  quantstats  实盘  clickhouse  notebook  redis  remote-agent  AI-tools  Moonshot  回测,研报,tushare 

21 天驯化 AI 打工仔:系统逻辑优化与分钟线数据合成


当分笔数据如潮水般涌来,如何让系统智能地将它们合成为有价值的分钟线数据?本文带你深入量化交易系统的核心——数据合成与系统架构优化的世界!

"007,我们的实时分笔数据订阅系统已经基本完成,但现在我遇到了一个新的挑战。"我一边查看着 Redis 中堆积如山的分笔数据,一边对我的 AI 助手说道。

"什么挑战?"007 立刻回应道。

"我们现在有了海量的分笔数据,但量化策略需要的是分钟线数据。而且,我希望系统能够智能地处理当日数据和历史数据,让多个客户端能够无缝查询。"我指着屏幕上密密麻麻的数据说道。

这是我们量化交易系统开发的第 9 天。前面几天,我们已经成功搭建了从 Tushare 获取数据的基础架构,也实现了 QMT 实时分笔数据的订阅。但在实际使用中,我发现了一个关键问题:分笔数据虽然精确,但对于大多数量化策略来说,分钟线数据才是真正需要的

更重要的是,我需要一个智能的系统架构,能够: - 实时将分笔数据合成为多周期分钟线数据 - 智能区分当日数据和历史数据的存储与查询 - 支持多个客户端同时查询,而不影响系统性能 - 确保数据的完整性和一致性

🎯 需求分析:构建智能数据合成系统

"在开始编码之前,我们需要明确系统的核心需求。"我对 007 说道。

经过深入思考,我梳理出了以下关键系统架构:

注意: - 所有的分钟线数据(不论是当日合成的数据还是订阅的历史分钟线数据)都必须是交易时间内的数据。不然没有意义。 - 当日合成的分钟线数据是保存在Redis中的,不要存入Clickhouse。 - 只有从QMT订阅的历史的分钟线数据是通过Redis保存在Clickhouse中的,请注意与当日合成的分钟线数据进行区分。

参考内容:Clickhouse 的 Redis 插件Clickhouse 的物化视图

"这个架构看起来很复杂,特别是数据合成的部分。"我有些担心地说道。

"没关系!我们可以分步实现。先搭建基础架构,然后逐步优化数据合成算法。"007 信心满满地回答。

🏗️ 系统架构设计:三端分离的智能架构

"我们需要设计一个真正智能的三端分离架构。"007 开始了架构设计。

整体架构图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Windows端     │    │    远程Redis     │    │    Mac端        │
│   数据生产者     │───▶│   消息队列+缓存   │───▶│   数据消费者     │
│                 │    │                 │    │                 │
│ • QMT分笔订阅    │    │ • 分笔数据队列   │    │ • 历史数据存储   │
│ • 分钟线合成     │    │ • 当日分钟线缓存 │    │ • ClickHouse管理 │
│ • 交易时间验证   │    │ • 数据路由       │    │ • 数据清理       │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                ▼
                       ┌─────────────────┐
                       │   多Client端    │
                       │   数据查询者     │
                       │                 │
                       │ • Web查询界面   │
                       │ • 智能数据路由   │
                       │ • 24小时制时间   │
                       └─────────────────┘

数据流设计

"数据流的设计是整个系统的核心。"我对 007 强调道。

数据流向: 1. 分笔数据流:QMT → Windows端 → Redis队列 2. 当日分钟线流:Windows端合成 → Redis缓存 → Client端查询 3. 历史分钟线流:Mac端处理 → ClickHouse存储 → Client端查询 4. 混合查询流:Client端 → Redis+ClickHouse → 数据合并 → 返回结果

🔧 Windows 端的实现

"Windows 端是整个系统的心脏,负责数据的实时合成。"007 开始了 Windows 端的设计。

分笔数据如何合成分钟线数据?

007 设计了一个精巧的数据合成引擎:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BarDataSynthesizer:
    """分钟线数据合成器"""

    def __init__(self):
        # 存储各个股票的分笔数据缓存
        self.tick_cache: Dict[str, List[TickData]] = defaultdict(list)
        # 存储各个周期的分钟线缓存
        self.bar_cache: Dict[int, Dict[str, List[BarData]]] = {
            1: defaultdict(list),
            5: defaultdict(list),
            15: defaultdict(list),
            30: defaultdict(list)
        }
        # 交易时间验证器
        self.trading_validator = TradingTimeValidator()

    def add_tick_data(self, tick_data: TickData):
        """添加分笔数据并触发合成"""
        # 验证交易时间
        if not self.trading_validator.validate_tick_data(tick_dict):
            return

        # 缓存分笔数据
        self.tick_cache[tick_data.symbol].append(tick_data)

        # 合成1分钟线
        bar_1min = self._synthesize_1min_bar(tick_data.symbol)
        if bar_1min:
            self.bar_cache[1][tick_data.symbol].append(bar_1min)

            # 基于1分钟线合成其他周期
            for period in [5, 15, 30]:
                bar = self._synthesize_multi_min_bar(tick_data.symbol, period)
                if bar:
                    self.bar_cache[period][tick_data.symbol].append(bar)

1分钟线合成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def _synthesize_1min_bar(self, symbol: str) -> BarData:
    """合成1分钟线"""
    ticks = self.tick_cache[symbol]
    if not ticks:
        return None

    # 获取当前分钟的开始时间
    current_time = ticks[-1].time
    minute_start = current_time.replace(second=0, microsecond=0)
    minute_end = minute_start + timedelta(minutes=1)

    # 筛选当前分钟的分笔数据
    minute_ticks = [
        tick for tick in ticks
        if minute_start <= tick.time < minute_end
    ]

    if not minute_ticks:
        return None

    # 计算OHLCV
    prices = [tick.price for tick in minute_ticks]
    volumes = [tick.volume for tick in minute_ticks]
    amounts = [tick.amount for tick in minute_ticks]

    return BarData(
        symbol=symbol,
        frame=minute_start,
        open=prices[0],
        high=max(prices),
        low=min(prices),
        close=prices[-1],
        vol=sum(volumes),
        amount=sum(amounts)
    )

多周期合成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _synthesize_multi_min_bar(self, symbol: str, period: int) -> BarData:
    """合成多分钟线(5分钟、15分钟、30分钟)"""
    bars_1min = self.bar_cache[1][symbol]
    if not bars_1min:
        return None

    # 获取当前周期的开始时间
    current_time = bars_1min[-1].frame
    period_start = self._get_period_start(current_time, period)
    period_end = period_start + timedelta(minutes=period)

    # 筛选当前周期的1分钟线数据
    period_bars = [
        bar for bar in bars_1min
        if period_start <= bar.frame < period_end
    ]

    if len(period_bars) == 0:
        return None

    # 合成多分钟线
    return BarData(
        symbol=symbol,
        frame=period_start,
        open=period_bars[0].open,
        high=max(bar.high for bar in period_bars),
        low=min(bar.low for bar in period_bars),
        close=period_bars[-1].close,
        vol=sum(bar.vol for bar in period_bars),
        amount=sum(bar.amount for bar in period_bars)
    )

我们的合成方法是:先合成1分钟线,再基于1分钟线合成其他周期,确保了数据的一致性。

Windows 端监控页面

但是我觉得这个监控页面的排版布局太简陋了,于是,我便让 007 帮我设计一个新的前端页面以更好地呈现监控情况。

🍎 Mac 端的实现

数据消费与存储

Mac 端的核心职责是将 Redis 中的历史数据转移到 ClickHouse:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MacDataService:
    """Mac端数据服务"""

    def __init__(self):
        self.redis_manager = RedisManager()
        self.clickhouse_manager = ClickHouseManager()

    def consume_historical_data(self):
        """消费历史数据"""
        while self.is_running:
            try:
                # 从Redis获取历史分钟线数据
                for period in [1, 5, 15, 30]:
                    queue_name = f"historical_bar_data_{period}min"
                    data = self.redis_manager.client.brpop(queue_name, timeout=1)

                    if data:
                        bar_data = BarData(**json.loads(data[1]))

                        # 检查是否已存在
                        if not self.clickhouse_manager.data_exists(bar_data, period):
                            # 插入ClickHouse
                            self.clickhouse_manager.insert_bar_data(bar_data, period)
                        else:
                            # 数据已存在,直接删除Redis中的数据
                            self.logger.info(f"数据已存在,跳过: {bar_data.symbol} {bar_data.frame}")

            except Exception as e:
                self.logger.error(f"数据消费错误: {e}")
                time.sleep(5)

凌晨2点的特殊处理

"系统需要在凌晨2点进行特殊的数据清理。"我对 007 说明了需求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def handle_cleanup_time(self):
    """处理凌晨2点的数据清理"""
    try:
        # 1. 处理前一天的历史数据
        self.process_previous_day_data()

        # 2. 清理Redis的订阅消息队列
        self.cleanup_redis_queues()

        # 3. 数据完整性检查
        self.verify_data_integrity()

        self.logger.info("凌晨2点数据清理完成")

    except Exception as e:
        self.logger.error(f"数据清理错误: {e}")

Mac 端的监控页面

为了检查Redis和Clickhouse之间的数据传送是正常的,并且能实时监控数据传输的速度和进度,我要求 007 设计一个 Mac 端端监控页面:

💻 Client 端的实现

"Client 端是用户直接接触的界面,必须做到完美。"我对 007 强调道。

但是,在 Client 端的开发过程中,我们遇到了一系列挑战...

第一次尝试:复杂的调试系统

最初,007 为 Client 端设计了一个功能丰富的调试系统:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 复杂的调试逻辑
def query_bar_data(self, symbol: str, start_time: datetime, end_time: datetime, period: int):
    debug_info = []
    debug_info.append(f"查询参数: {symbol}, {period}分钟, {start_time}{end_time}")
    debug_info.append(f"今天: {today}, 查询日期范围: {start_date}{end_date}")

    # 大量的调试信息...
    if len(all_redis_data) > 0 and len(redis_data) == 0:
        debug_info.append(f"⚠️ 时间过滤导致数据为空")
        debug_info.append(f"数据样本时间: {sample_bar.frame}")
        # 更多调试信息...

"这个调试系统太复杂了!"我看着满屏的调试代码,有些头疼。

第二次尝试:时间格式的噩梦

接着,我们遇到了时间格式的问题。用户抱怨 Web 界面显示的是 AM/PM 格式:

"我要24小时时间制查询,怎么前端还有AM和PM,我不要AM和PM,气死了!"

007 立刻进行了修复,将 datetime-local 输入框替换为分离的时间输入:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<!-- 24小时制时间输入 -->
<div class="col-md-3">
    <label for="start_time" class="form-label">开始时间 (24小时制)</label>
    <div class="row g-1">
        <div class="col-6">
            <input type="date" class="form-control" id="start_date" required>
        </div>
        <div class="col-3">
            <input type="number" class="form-control" id="start_hour" min="0" max="23" placeholder="时" required>
        </div>
        <div class="col-3">
            <input type="number" class="form-control" id="start_minute" min="0" max="59" placeholder="分" required>
        </div>
    </div>
</div>

第三次尝试:JSON序列化的陷阱

然后,我们遇到了 JSON 序列化错误:

1
查询失败: 请求处理错误: Object of type datetime is not JSON serializable

"这个错误很常见,datetime 对象无法直接序列化。"007 解释道。

我们尝试了多种解决方案,最终采用了手动序列化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 手动序列化,确保datetime正确转换
data_list = []
for bar in result.data:
    data_list.append({
        "symbol": bar.symbol,
        "frame": bar.frame.isoformat(),  # 手动转换datetime为字符串
        "open": float(bar.open),
        "high": float(bar.high),
        "low": float(bar.low),
        "close": float(bar.close),
        "vol": float(bar.vol),
        "amount": float(bar.amount)
    })

最终重构:简洁而强大

"007!我觉得你把Client的查询代码改的乱七八糟的,你应该全部推翻,按照Windows和Mac的逻辑重新写查询。"我终于忍不住了。

007 立刻进行了彻底重构,按照系统架构的原始设计重新编写:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def query_bar_data(self, symbol: str, start_time: datetime, end_time: datetime, period: int) -> QueryResponse:
    """
    查询分钟线数据

    按照系统架构:
    1. 如果查询的分钟线数据是当日的,则直接从Redis中读取合成的分钟线数据
    2. 如果查询的分钟线数据是历史的,则直接从ClickHouse中读取
    3. 如果查询的分钟线数据是既有当日的又有历史的,则合并数据返回给Client
    """
    try:
        today = date.today()
        start_date = start_time.date()
        end_date = end_time.date()

        redis_data = []
        clickhouse_data = []

        # 1. 查询当日数据(从Redis读取)
        if end_date >= today:
            redis_data = self.redis_manager.get_current_bar_data(period, symbol)
            # 过滤时间范围
            redis_data = [bar for bar in redis_data if start_time <= bar.frame <= end_time]

        # 2. 查询历史数据(从ClickHouse读取)
        if start_date < today:
            # 避免与当日数据重复,历史数据查询到今天之前
            hist_end_time = min(end_time, datetime.combine(today, datetime.min.time()))
            if start_time < hist_end_time:
                clickhouse_data = self.clickhouse_manager.query_bar_data(
                    symbol, start_time, hist_end_time, period
                )

        # 3. 合并数据
        merged_data = self.data_merger.merge_bar_data(redis_data, clickhouse_data)

        return QueryResponse(
            success=True,
            message=f"当日数据: {len(redis_data)} 条,历史数据: {len(clickhouse_data)} 条,合并后: {len(merged_data)} 条",
            data=merged_data,
            total_count=len(merged_data)
        )

    except Exception as e:
        return QueryResponse(
            success=False,
            message=f"查询失败: {str(e)}",
            data=[],
            total_count=0
        )

Client 端的查询页面

为了易于查询,且可视化查询结果,我让 007 给 Client 端设计了一个查询页面:

⚠️ 请注意:数据合并和交易时间验证

数据合并:确保查询结果的准确性

如果用户需要查询的日期既有历史分钟线数据又有当日分钟线数据,那么我们要对 Redis 和 Clickhouse 的数据进行合并。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class DataMerger:
    """数据合并器 - 合并Redis当日数据和ClickHouse历史数据"""

    @staticmethod
    def merge_bar_data(redis_data: List[BarData], clickhouse_data: List[BarData]) -> List[BarData]:
        """合并分钟线数据"""
        # 合并数据并按时间排序
        all_data = redis_data + clickhouse_data

        # 去重(以frame和symbol为键)
        unique_data = {}
        for bar in all_data:
            key = (bar.symbol, bar.frame)
            unique_data[key] = bar

        # 按时间排序
        merged_data = list(unique_data.values())
        merged_data.sort(key=lambda x: x.frame)

        return merged_data

交易时间验证:确保数据质量

"所有的分钟线数据都必须是交易时间内的数据,不然没有意义。"我对 007 强调了数据质量的重要性。

007 设计了一个专门的交易时间验证器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TradingTimeValidator:
    """交易时间验证器"""

    def __init__(self):
        self.trading_hours = {
            'morning_start': '09:30:00',
            'morning_end': '11:30:00',
            'afternoon_start': '13:00:00',
            'afternoon_end': '15:00:00'
        }

    def is_trading_time(self, dt: datetime) -> bool:
        """检查是否为交易时间"""
        time_obj = dt.time()

        morning_start = dt_time.fromisoformat(self.trading_hours['morning_start'])
        morning_end = dt_time.fromisoformat(self.trading_hours['morning_end'])
        afternoon_start = dt_time.fromisoformat(self.trading_hours['afternoon_start'])
        afternoon_end = dt_time.fromisoformat(self.trading_hours['afternoon_end'])

        return (
            (morning_start <= time_obj <= morning_end) or
            (afternoon_start <= time_obj <= afternoon_end)
        )

    def validate_tick_data(self, tick_dict: dict) -> bool:
        """验证分笔数据"""
        return self.is_trading_time(tick_dict['time'])

    def validate_bar_data(self, bar_dict: dict) -> bool:
        """验证分钟线数据"""
        return self.is_trading_time(bar_dict['frame'])

🎯 系统测试

"系统搭建完成,我们来测试一下效果。"我满怀期待地说道。

历史分钟线数据的获取

Windows 端运行效果:

Mac 端运行效果:

可以注意到 Mac 端的 Redis 队列中的元素正在减少,ClickhouseHouse 存储的数据正在增加。

我们再来看一下Clickhouse:

可以发现历史数据已经存入Clickhouse中(而且都是符合在交易时间内这一要求)。

当日分钟线数据的获取

在交易时间启动 Windows 和 Mac 端,可以看到 Redis 中保存着分笔数据和合成的当日分钟线数据。(这些数据到凌晨2点会统一清除)

Redis:

我们可以通过 Client 去查询当日的分钟线数据(以 15min 为例):

📝 总结

通过这次系统逻辑优化与分钟线数据合成的开发,我和 007 成功构建了一个智能、高效、稳定的量化交易数据处理系统。从最初的复杂设计到最终的简洁实现,我们不仅解决了技术难题,更重要的是建立了一套可扩展、可维护的系统架构。

21 天的挑战已经过去了 9 天,我们的量化交易系统正在变得越来越强大。从最初的简单数据获取,到现在的智能数据合成,每一步都充满了挑战和收获。

007 的表现再次让我刮目相看,从架构设计到代码实现,从问题诊断到系统优化,它都展现出了专业的技术能力。更重要的是,它能够在我的指导下快速调整方向,最终实现了一个简洁而强大的系统。

特别是在 Client 端的重构过程中,007 展现了很强的学习能力。当我指出代码过于复杂时,它能够立刻理解问题所在,并按照系统架构的原始设计进行彻底重构。这种快速响应和自我纠正的能力,正是一个优秀的 AI 助手应该具备的品质。

但是,我们的分钟合成真的合理吗?我们在下一章节,将采用 Tushare 和 Akshare 对我们合成的当日实时分钟线的准确性进行验证。对于多Client问题,我们也将采用多个Client机器对系统进行测试。敬请期待!