跳转至



21 天驯化 AI 打工仔:系统逻辑优化与分钟线数据合成


当分笔数据如潮水般涌来,如何让系统智能地将它们合成为有价值的分钟线数据?本文带你深入量化交易系统的核心——数据合成与系统架构优化的世界!

"007,我们的实时分笔数据订阅系统已经基本完成,但现在我遇到了一个新的挑战。"我一边查看着 Redis 中堆积如山的分笔数据,一边对我的 AI 助手说道。

"什么挑战?"007 立刻回应道。

"我们现在有了海量的分笔数据,但量化策略需要的是分钟线数据。而且,我希望系统能够智能地处理当日数据和历史数据,让多个客户端能够无缝查询。"我指着屏幕上密密麻麻的数据说道。

这是我们量化交易系统开发的第 9 天。前面几天,我们已经成功搭建了从 Tushare 获取数据的基础架构,也实现了 QMT 实时分笔数据的订阅。但在实际使用中,我发现了一个关键问题:分笔数据虽然精确,但对于大多数量化策略来说,分钟线数据才是真正需要的

更重要的是,我需要一个智能的系统架构,能够: - 实时将分笔数据合成为多周期分钟线数据 - 智能区分当日数据和历史数据的存储与查询 - 支持多个客户端同时查询,而不影响系统性能 - 确保数据的完整性和一致性

🎯 需求分析:构建智能数据合成系统

"在开始编码之前,我们需要明确系统的核心需求。"我对 007 说道。

经过深入思考,我梳理出了以下关键系统架构:

注意: - 所有的分钟线数据(不论是当日合成的数据还是订阅的历史分钟线数据)都必须是交易时间内的数据。不然没有意义。 - 当日合成的分钟线数据是保存在Redis中的,不要存入Clickhouse。 - 只有从QMT订阅的历史的分钟线数据是通过Redis保存在Clickhouse中的,请注意与当日合成的分钟线数据进行区分。

参考内容:Clickhouse 的 Redis 插件Clickhouse 的物化视图

"这个架构看起来很复杂,特别是数据合成的部分。"我有些担心地说道。

"没关系!我们可以分步实现。先搭建基础架构,然后逐步优化数据合成算法。"007 信心满满地回答。

🏗️ 系统架构设计:三端分离的智能架构

"我们需要设计一个真正智能的三端分离架构。"007 开始了架构设计。

整体架构图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Windows端     │    │    远程Redis     │    │    Mac端        │
│   数据生产者     │───▶│   消息队列+缓存   │───▶│   数据消费者     │
│                 │    │                 │    │                 │
│ • QMT分笔订阅    │    │ • 分笔数据队列   │    │ • 历史数据存储   │
│ • 分钟线合成     │    │ • 当日分钟线缓存 │    │ • ClickHouse管理 │
│ • 交易时间验证   │    │ • 数据路由       │    │ • 数据清理       │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                ▼
                       ┌─────────────────┐
                       │   多Client端    │
                       │   数据查询者     │
                       │                 │
                       │ • Web查询界面   │
                       │ • 智能数据路由   │
                       │ • 24小时制时间   │
                       └─────────────────┘

数据流设计

"数据流的设计是整个系统的核心。"我对 007 强调道。

数据流向: 1. 分笔数据流:QMT → Windows端 → Redis队列 2. 当日分钟线流:Windows端合成 → Redis缓存 → Client端查询 3. 历史分钟线流:Mac端处理 → ClickHouse存储 → Client端查询 4. 混合查询流:Client端 → Redis+ClickHouse → 数据合并 → 返回结果

🔧 Windows 端的实现

"Windows 端是整个系统的心脏,负责数据的实时合成。"007 开始了 Windows 端的设计。

分笔数据如何合成分钟线数据?

007 设计了一个精巧的数据合成引擎:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BarDataSynthesizer:
    """分钟线数据合成器"""

    def __init__(self):
        # 存储各个股票的分笔数据缓存
        self.tick_cache: Dict[str, List[TickData]] = defaultdict(list)
        # 存储各个周期的分钟线缓存
        self.bar_cache: Dict[int, Dict[str, List[BarData]]] = {
            1: defaultdict(list),
            5: defaultdict(list),
            15: defaultdict(list),
            30: defaultdict(list)
        }
        # 交易时间验证器
        self.trading_validator = TradingTimeValidator()

    def add_tick_data(self, tick_data: TickData):
        """添加分笔数据并触发合成"""
        # 验证交易时间
        if not self.trading_validator.validate_tick_data(tick_dict):
            return

        # 缓存分笔数据
        self.tick_cache[tick_data.symbol].append(tick_data)

        # 合成1分钟线
        bar_1min = self._synthesize_1min_bar(tick_data.symbol)
        if bar_1min:
            self.bar_cache[1][tick_data.symbol].append(bar_1min)

            # 基于1分钟线合成其他周期
            for period in [5, 15, 30]:
                bar = self._synthesize_multi_min_bar(tick_data.symbol, period)
                if bar:
                    self.bar_cache[period][tick_data.symbol].append(bar)

1分钟线合成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def _synthesize_1min_bar(self, symbol: str) -> BarData:
    """合成1分钟线"""
    ticks = self.tick_cache[symbol]
    if not ticks:
        return None

    # 获取当前分钟的开始时间
    current_time = ticks[-1].time
    minute_start = current_time.replace(second=0, microsecond=0)
    minute_end = minute_start + timedelta(minutes=1)

    # 筛选当前分钟的分笔数据
    minute_ticks = [
        tick for tick in ticks
        if minute_start <= tick.time < minute_end
    ]

    if not minute_ticks:
        return None

    # 计算OHLCV
    prices = [tick.price for tick in minute_ticks]
    volumes = [tick.volume for tick in minute_ticks]
    amounts = [tick.amount for tick in minute_ticks]

    return BarData(
        symbol=symbol,
        frame=minute_start,
        open=prices[0],
        high=max(prices),
        low=min(prices),
        close=prices[-1],
        vol=sum(volumes),
        amount=sum(amounts)
    )

多周期合成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _synthesize_multi_min_bar(self, symbol: str, period: int) -> BarData:
    """合成多分钟线(5分钟、15分钟、30分钟)"""
    bars_1min = self.bar_cache[1][symbol]
    if not bars_1min:
        return None

    # 获取当前周期的开始时间
    current_time = bars_1min[-1].frame
    period_start = self._get_period_start(current_time, period)
    period_end = period_start + timedelta(minutes=period)

    # 筛选当前周期的1分钟线数据
    period_bars = [
        bar for bar in bars_1min
        if period_start <= bar.frame < period_end
    ]

    if len(period_bars) == 0:
        return None

    # 合成多分钟线
    return BarData(
        symbol=symbol,
        frame=period_start,
        open=period_bars[0].open,
        high=max(bar.high for bar in period_bars),
        low=min(bar.low for bar in period_bars),
        close=period_bars[-1].close,
        vol=sum(bar.vol for bar in period_bars),
        amount=sum(bar.amount for bar in period_bars)
    )

我们的合成方法是:先合成1分钟线,再基于1分钟线合成其他周期,确保了数据的一致性。

Windows 端监控页面

但是我觉得这个监控页面的排版布局太简陋了,于是,我便让 007 帮我设计一个新的前端页面以更好地呈现监控情况。

🍎 Mac 端的实现

数据消费与存储

Mac 端的核心职责是将 Redis 中的历史数据转移到 ClickHouse:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MacDataService:
    """Mac端数据服务"""

    def __init__(self):
        self.redis_manager = RedisManager()
        self.clickhouse_manager = ClickHouseManager()

    def consume_historical_data(self):
        """消费历史数据"""
        while self.is_running:
            try:
                # 从Redis获取历史分钟线数据
                for period in [1, 5, 15, 30]:
                    queue_name = f"historical_bar_data_{period}min"
                    data = self.redis_manager.client.brpop(queue_name, timeout=1)

                    if data:
                        bar_data = BarData(**json.loads(data[1]))

                        # 检查是否已存在
                        if not self.clickhouse_manager.data_exists(bar_data, period):
                            # 插入ClickHouse
                            self.clickhouse_manager.insert_bar_data(bar_data, period)
                        else:
                            # 数据已存在,直接删除Redis中的数据
                            self.logger.info(f"数据已存在,跳过: {bar_data.symbol} {bar_data.frame}")

            except Exception as e:
                self.logger.error(f"数据消费错误: {e}")
                time.sleep(5)

凌晨2点的特殊处理

"系统需要在凌晨2点进行特殊的数据清理。"我对 007 说明了需求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def handle_cleanup_time(self):
    """处理凌晨2点的数据清理"""
    try:
        # 1. 处理前一天的历史数据
        self.process_previous_day_data()

        # 2. 清理Redis的订阅消息队列
        self.cleanup_redis_queues()

        # 3. 数据完整性检查
        self.verify_data_integrity()

        self.logger.info("凌晨2点数据清理完成")

    except Exception as e:
        self.logger.error(f"数据清理错误: {e}")

Mac 端的监控页面

为了检查Redis和Clickhouse之间的数据传送是正常的,并且能实时监控数据传输的速度和进度,我要求 007 设计一个 Mac 端端监控页面:

💻 Client 端的实现

"Client 端是用户直接接触的界面,必须做到完美。"我对 007 强调道。

但是,在 Client 端的开发过程中,我们遇到了一系列挑战...

第一次尝试:复杂的调试系统

最初,007 为 Client 端设计了一个功能丰富的调试系统:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 复杂的调试逻辑
def query_bar_data(self, symbol: str, start_time: datetime, end_time: datetime, period: int):
    debug_info = []
    debug_info.append(f"查询参数: {symbol}, {period}分钟, {start_time}{end_time}")
    debug_info.append(f"今天: {today}, 查询日期范围: {start_date}{end_date}")

    # 大量的调试信息...
    if len(all_redis_data) > 0 and len(redis_data) == 0:
        debug_info.append(f"⚠️ 时间过滤导致数据为空")
        debug_info.append(f"数据样本时间: {sample_bar.frame}")
        # 更多调试信息...

"这个调试系统太复杂了!"我看着满屏的调试代码,有些头疼。

第二次尝试:时间格式的噩梦

接着,我们遇到了时间格式的问题。用户抱怨 Web 界面显示的是 AM/PM 格式:

"我要24小时时间制查询,怎么前端还有AM和PM,我不要AM和PM,气死了!"

007 立刻进行了修复,将 datetime-local 输入框替换为分离的时间输入:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<!-- 24小时制时间输入 -->
<div class="col-md-3">
    <label for="start_time" class="form-label">开始时间 (24小时制)</label>
    <div class="row g-1">
        <div class="col-6">
            <input type="date" class="form-control" id="start_date" required>
        </div>
        <div class="col-3">
            <input type="number" class="form-control" id="start_hour" min="0" max="23" placeholder="时" required>
        </div>
        <div class="col-3">
            <input type="number" class="form-control" id="start_minute" min="0" max="59" placeholder="分" required>
        </div>
    </div>
</div>

第三次尝试:JSON序列化的陷阱

然后,我们遇到了 JSON 序列化错误:

1
查询失败: 请求处理错误: Object of type datetime is not JSON serializable

"这个错误很常见,datetime 对象无法直接序列化。"007 解释道。

我们尝试了多种解决方案,最终采用了手动序列化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 手动序列化,确保datetime正确转换
data_list = []
for bar in result.data:
    data_list.append({
        "symbol": bar.symbol,
        "frame": bar.frame.isoformat(),  # 手动转换datetime为字符串
        "open": float(bar.open),
        "high": float(bar.high),
        "low": float(bar.low),
        "close": float(bar.close),
        "vol": float(bar.vol),
        "amount": float(bar.amount)
    })

最终重构:简洁而强大

"007!我觉得你把Client的查询代码改的乱七八糟的,你应该全部推翻,按照Windows和Mac的逻辑重新写查询。"我终于忍不住了。

007 立刻进行了彻底重构,按照系统架构的原始设计重新编写:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def query_bar_data(self, symbol: str, start_time: datetime, end_time: datetime, period: int) -> QueryResponse:
    """
    查询分钟线数据

    按照系统架构:
    1. 如果查询的分钟线数据是当日的,则直接从Redis中读取合成的分钟线数据
    2. 如果查询的分钟线数据是历史的,则直接从ClickHouse中读取
    3. 如果查询的分钟线数据是既有当日的又有历史的,则合并数据返回给Client
    """
    try:
        today = date.today()
        start_date = start_time.date()
        end_date = end_time.date()

        redis_data = []
        clickhouse_data = []

        # 1. 查询当日数据(从Redis读取)
        if end_date >= today:
            redis_data = self.redis_manager.get_current_bar_data(period, symbol)
            # 过滤时间范围
            redis_data = [bar for bar in redis_data if start_time <= bar.frame <= end_time]

        # 2. 查询历史数据(从ClickHouse读取)
        if start_date < today:
            # 避免与当日数据重复,历史数据查询到今天之前
            hist_end_time = min(end_time, datetime.combine(today, datetime.min.time()))
            if start_time < hist_end_time:
                clickhouse_data = self.clickhouse_manager.query_bar_data(
                    symbol, start_time, hist_end_time, period
                )

        # 3. 合并数据
        merged_data = self.data_merger.merge_bar_data(redis_data, clickhouse_data)

        return QueryResponse(
            success=True,
            message=f"当日数据: {len(redis_data)} 条,历史数据: {len(clickhouse_data)} 条,合并后: {len(merged_data)} 条",
            data=merged_data,
            total_count=len(merged_data)
        )

    except Exception as e:
        return QueryResponse(
            success=False,
            message=f"查询失败: {str(e)}",
            data=[],
            total_count=0
        )

Client 端的查询页面

为了易于查询,且可视化查询结果,我让 007 给 Client 端设计了一个查询页面:

⚠️ 请注意:数据合并和交易时间验证

数据合并:确保查询结果的准确性

如果用户需要查询的日期既有历史分钟线数据又有当日分钟线数据,那么我们要对 Redis 和 Clickhouse 的数据进行合并。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class DataMerger:
    """数据合并器 - 合并Redis当日数据和ClickHouse历史数据"""

    @staticmethod
    def merge_bar_data(redis_data: List[BarData], clickhouse_data: List[BarData]) -> List[BarData]:
        """合并分钟线数据"""
        # 合并数据并按时间排序
        all_data = redis_data + clickhouse_data

        # 去重(以frame和symbol为键)
        unique_data = {}
        for bar in all_data:
            key = (bar.symbol, bar.frame)
            unique_data[key] = bar

        # 按时间排序
        merged_data = list(unique_data.values())
        merged_data.sort(key=lambda x: x.frame)

        return merged_data

交易时间验证:确保数据质量

"所有的分钟线数据都必须是交易时间内的数据,不然没有意义。"我对 007 强调了数据质量的重要性。

007 设计了一个专门的交易时间验证器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TradingTimeValidator:
    """交易时间验证器"""

    def __init__(self):
        self.trading_hours = {
            'morning_start': '09:30:00',
            'morning_end': '11:30:00',
            'afternoon_start': '13:00:00',
            'afternoon_end': '15:00:00'
        }

    def is_trading_time(self, dt: datetime) -> bool:
        """检查是否为交易时间"""
        time_obj = dt.time()

        morning_start = dt_time.fromisoformat(self.trading_hours['morning_start'])
        morning_end = dt_time.fromisoformat(self.trading_hours['morning_end'])
        afternoon_start = dt_time.fromisoformat(self.trading_hours['afternoon_start'])
        afternoon_end = dt_time.fromisoformat(self.trading_hours['afternoon_end'])

        return (
            (morning_start <= time_obj <= morning_end) or
            (afternoon_start <= time_obj <= afternoon_end)
        )

    def validate_tick_data(self, tick_dict: dict) -> bool:
        """验证分笔数据"""
        return self.is_trading_time(tick_dict['time'])

    def validate_bar_data(self, bar_dict: dict) -> bool:
        """验证分钟线数据"""
        return self.is_trading_time(bar_dict['frame'])

🎯 系统测试

"系统搭建完成,我们来测试一下效果。"我满怀期待地说道。

历史分钟线数据的获取

Windows 端运行效果:

Mac 端运行效果:

可以注意到 Mac 端的 Redis 队列中的元素正在减少,ClickhouseHouse 存储的数据正在增加。

我们再来看一下Clickhouse:

可以发现历史数据已经存入Clickhouse中(而且都是符合在交易时间内这一要求)。

当日分钟线数据的获取

在交易时间启动 Windows 和 Mac 端,可以看到 Redis 中保存着分笔数据和合成的当日分钟线数据。(这些数据到凌晨2点会统一清除)

Redis:

我们可以通过 Client 去查询当日的分钟线数据(以 15min 为例):

📝 总结

通过这次系统逻辑优化与分钟线数据合成的开发,我和 007 成功构建了一个智能、高效、稳定的量化交易数据处理系统。从最初的复杂设计到最终的简洁实现,我们不仅解决了技术难题,更重要的是建立了一套可扩展、可维护的系统架构。

21 天的挑战已经过去了 9 天,我们的量化交易系统正在变得越来越强大。从最初的简单数据获取,到现在的智能数据合成,每一步都充满了挑战和收获。

007 的表现再次让我刮目相看,从架构设计到代码实现,从问题诊断到系统优化,它都展现出了专业的技术能力。更重要的是,它能够在我的指导下快速调整方向,最终实现了一个简洁而强大的系统。

特别是在 Client 端的重构过程中,007 展现了很强的学习能力。当我指出代码过于复杂时,它能够立刻理解问题所在,并按照系统架构的原始设计进行彻底重构。这种快速响应和自我纠正的能力,正是一个优秀的 AI 助手应该具备的品质。

但是,我们的分钟合成真的合理吗?我们在下一章节,将采用 Tushare 和 Akshare 对我们合成的当日实时分钟线的准确性进行验证。对于多Client问题,我们也将采用多个Client机器对系统进行测试。敬请期待!