跳转至

21天驯化AI打工仔 - 数据库的优化

五一小长假之前,我在搭档 007 的帮助下已经成功实现了从 Tushare 获取 OHLC 数据,并通过 Redis 消息队列将数据存储到 ClickHouse 数据库。为了进一步完善量化交易系统的数据支持,今天我们将聚焦于数据库的优化设计,主要涉及获取日线复权因子、获取分钟线数据以及计算分钟线数据(例如 15 分钟)三个方面。五一结束之后,我们计划通过这些优化,能够为量化交易策略提供更丰富、更精细的数据,从而提升系统的性能和决策能力。


1. 获取日线的复权因子

我:007!帮我在 data_producer.pydata_consumer.py 中添加获取日线因子的代码。

007:🫡收到请求,正在处理...

为了在 data_consumer.pydata_producer.py 中补充因子的数据,我们需要对原代码进行一些修改。主要思路是在生产者代码中获取因子数据并封装到数据结构中,然后在消费者代码中处理这些因子数据。因为后续我们要收集分钟级的数据,所以我新建了一张表,命名为 daily_data,用于存储日线因子数据。

然后,我需要针对这个数据表,对 data_producer.pydata_consumer.py 进行修改。

007 回答我的完整代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""data_producer.py"""
import redis
import tushare as ts
import json
from datetime import datetime

# Tushare和Redis配置
TUSHARE_TOKEN = "YOUR TOKEN"
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

# 初始化连接
pro = ts.pro_api(TUSHARE_TOKEN)
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def fetch_ohlc_daily_data(ts_code: str, start_date: str, end_date: str) -> list:
    try:
        df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'vol']].to_dict('records')
    except Exception as e:
        print(f"OHLC数据获取失败: {str(e)}")
        return []

def fetch_adj_factor(ts_code: str, start_date: str, end_date: str) -> list:
    try:
        df = pro.adj_factor(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'adj_factor']].to_dict('records')
    except Exception as e:
        print(f"复权因子获取失败: {str(e)}")
        return []
def produce_data(ts_code_list: list, date_range: tuple):
    start_date, end_date = date_range
    for ts_code in ts_code_list:
        data_package = {
            "timestamp": datetime.now().isoformat(),
            "ts_code": ts_code,
            "ohlc_data": fetch_ohlc_daily_data(ts_code, start_date, end_date),
            "adj_factor": fetch_adj_factor(ts_code, start_date, end_date)
        }
        redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
        print(f"已推送数据: {ts_code} - {start_date}{end_date}")

if __name__ == "__main__":
    STOCK_CODES = ["000001.SZ", "600519.SH"]
    DATE_RANGE = ("20230101", "20231231")
    produce_data(STOCK_CODES, DATE_RANGE)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""data_consumer.py"""
import redis
import json
from clickhouse_driver import Client
from datetime import datetime

# 配置参数
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 9000
CLICKHOUSE_DB = "default"

# 初始化 Redis 和 ClickHouse 客户端
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
clickhouse_client = Client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, database=CLICKHOUSE_DB)

def insert_to_clickhouse(data):
    """将数据插入到 ClickHouse"""
    query = """
    INSERT INTO daily_data (ts_code, trade_date, open, high, low, close, vol, adj_factor)
    VALUES
    """
    values = []
    for record in data["ohlc_data"]:
        adj_factor_record = next((adj for adj in data["adj_factor"] if adj["trade_date"] == record["trade_date"]), None)
        adj_factor = adj_factor_record["adj_factor"] if adj_factor_record else None
        values.append((
            record["ts_code"],
            datetime.strptime(record["trade_date"], "%Y%m%d").date(),
            record["open"],
            record["high"],
            record["low"],
            record["close"],
            record["vol"],
            adj_factor
        ))
    clickhouse_client.execute(query, values)

def consume_data():
    """数据消费主函数"""
    print("启动数据消费者,等待队列数据...")
    while True:
        try:
            # 阻塞式获取队列数据
            result = redis_client.brpop(REDIS_QUEUE_NAME, timeout=1)
            if result is None:
                # 如果没有获取到数据,说明队列为空,退出循环
                print("Redis 队列为空,停止消费数据。")
                break
            _, json_data = result
            data_package = json.loads(json_data)
            insert_to_clickhouse(data_package)
            print(f"成功插入数据: {len(data_package['ohlc_data'])} 条")
        except Exception as e:
            print(f"数据处理异常: {str(e)}")
            continue

if __name__ == "__main__":
    consume_data()

2. 获取分钟线数据

同上述步骤,我需要获取分钟线数据,并添加到数据表中。

007 给我提供了以下建议: 1. 首先创建分钟线数据表; 2. 修改生产者代码,添加分钟线数据获取功能; 3. 创建对应的消费者代码。

2.1. 创建分钟线数据表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE IF NOT EXISTS minute_data (
    ts_code String,
    trade_time DateTime,
    open Float32,
    high Float32,
    low Float32,
    close Float32,
    vol Float32,
    amount Float32
) ENGINE = MergeTree()
ORDER BY (ts_code, trade_time);

2.2. 修改生产者代码,添加分钟线数据获取功能

007 还是采用 tushare 来获取分钟线数据,并添加到数据表中。但是,这里获取分钟线数据我并不打算用 tushare 作为数据源,而是打算采用 qmt 提供的 API 接口来获取分钟级的数据。

007 添加了一个新的生产者代码文件 minute_producer.py,在 data_producer.py 的基础上主要修改内容: - 替换了 Tushare 为 QMT 的数据接口 - 添加了数据下载功能 - 使用 QMT 的 get_local_data 获取分钟线数据 - 增加了交易日期的处理

但是我在此处遇到了一个问题,qmt 目前只支持 windows 系统,而我的系统是 macos ,所以无法使用 qmt。

007 提供了一个解决方案,使用 redis 作为中间件,把 windows 的数据传递给 macos 的程序,并最终存储在 clickhouse 中。

依据 007 的好建议,我完成了以下代码。

2.2.3. windows 的数据生产者代码

007 原先的代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
import redis
import json
from datetime import datetime
from xtquant.xtdata import (
    init,
    download_history_data,
    get_local_data,
    get_trading_dates,
    close
)

# Redis配置 - 使用Mac的IP地址
REDIS_HOST = "替换为Mac的IP地址"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "qmt_minute_queue"
REDIS_PASSWORD = None  # 如果有密码,请设置

def setup_redis_client():
    """初始化Redis客户端"""
    return redis.StrictRedis(
        host=REDIS_HOST, 
        port=REDIS_PORT, 
        password=REDIS_PASSWORD,
        decode_responses=True
    )

def fetch_minute_data(stock_code, date_str):
    """获取指定日期的分钟线数据"""
    try:
        # 获取分钟线数据
        df = get_local_data(stock_code, 'min1', date_str, date_str)
        if df is None or len(df) == 0:
            return []

        # 转换数据格式
        records = []
        for time, row in df.iterrows():
            records.append({
                "ts_code": stock_code,
                "trade_time": time.strftime("%Y-%m-%d %H:%M:%S"),
                "open": float(row['open']),
                "high": float(row['high']),
                "low": float(row['low']),
                "close": float(row['close']),
                "vol": float(row['volume']),
                "amount": float(row['amount'])
            })
        return records
    except Exception as e:
        print(f"获取分钟线数据失败 {stock_code} {date_str}: {str(e)}")
        return []

def main():
    """主函数"""
    # 初始化QMT接口
    init()

    # 初始化Redis客户端
    redis_client = setup_redis_client()

    try:
        # 配置参数
        stock_list = ["000001.SZ", "600519.SH"]
        start_date = "20230101"
        end_date = "20230131"

        # 下载历史数据
        print(f"开始下载历史数据: {start_date}{end_date}")
        download_history_data(stock_list, 'min1', start_date, end_date)
        print("历史数据下载完成")

        # 获取交易日列表
        trading_dates = get_trading_dates(start_date, end_date)

        # 按日期和股票代码获取分钟线数据并发送到Redis
        for trade_date in trading_dates:
            date_str = trade_date.strftime("%Y%m%d")
            print(f"处理日期: {date_str}")

            for stock_code in stock_list:
                minute_data = fetch_minute_data(stock_code, date_str)

                if minute_data:
                    # 封装数据
                    data_package = {
                        "timestamp": datetime.now().isoformat(),
                        "ts_code": stock_code,
                        "trade_date": date_str,
                        "minute_data": minute_data
                    }

                    # 发送到Redis
                    redis_client.lpush(REDIS_QUEUE_NAME, json.dump(data_package))
                    print(f"已推送分钟线数据: {stock_code} - {date_str} ({len(minute_data)}条)")

    except Exception as e:
        print(f"程序执行异常: {str(e)}")

    finally:
        # 关闭QMT接口
        close()
        print("程序执行完毕")

if __name__ == "__main__":
    main()

但是这个代码是不可以运行的,因为 qmt 库的版本可能发生了改变,有一些模块是被移除或修改的。而且我们想从 Windows 机器连接到 Mac 上运行的 Redis 服务器。这涉及到网络连接、防火墙设置、Redis 配置以及可能的权限问题。

针对 Redis 的有关问题,我打算继续用 007 的“贴心代码”进行 Redis 连接的测试。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Redis配置 - 使用Mac的IP地址
REDIS_HOST = "替换为Mac的IP地址"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "qmt_minute_queue"
REDIS_PASSWORD = None  # 如果有密码,请设置


# 测试Redis连接
import redis
import time

try:
    # 创建Redis客户端
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        socket_timeout=5,
        decode_responses=True
    )

    # 测试连接 - PING命令
    response = redis_client.ping()
    print(f"Redis连接测试 (PING): {'成功' if response else '失败'}")

    # 测试基本操作 - 写入和读取
    test_key = "test_connection_key"
    test_value = f"test_value_{time.time()}"

    # 写入测试
    redis_client.set(test_key, test_value)
    print(f"Redis写入测试: 成功写入键 '{test_key}'")

    # 读取测试
    read_value = redis_client.get(test_key)
    print(f"Redis读取测试: {'成功' if read_value == test_value else '失败'}")
    print(f"写入值: {test_value}")
    print(f"读取值: {read_value}")

    # 测试队列操作
    redis_client.lpush(REDIS_QUEUE_NAME, "测试消息")
    queue_length = redis_client.llen(REDIS_QUEUE_NAME)
    print(f"Redis队列测试: 成功写入队列 '{REDIS_QUEUE_NAME}',当前队列长度: {queue_length}")

    # 清理测试数据
    redis_client.delete(test_key)

    print("Redis连接和基本操作测试完成,连接正常")

except redis.exceptions.ConnectionError as e:
    print(f"Redis连接错误: {str(e)}")
    print("请检查以下问题:")
    print("1. Redis服务器是否在运行")
    print("2. IP地址是否正确")
    print("3. 端口是否正确")
    print("4. 防火墙是否允许连接")
    print("5. Redis是否配置为允许远程连接")

except Exception as e:
    print(f"Redis测试过程中出现其他错误: {str(e)}")

有了测试代码,我们还需要考虑 Redis 的默认配置。默认情况下,Redis 只允许本地连接,绑定到 127.0.0.1。因此,要允许远程连接,必须修改 Redis 的配置文件,将绑定地址改为 0.0.0.0 或者 Mac 的局域网 IP 地址。这一步可能需要用户编辑 Redis 的配置文件,比如 redis.conf,找到 bind 参数并修改。

  1. 修改 Redis 配置文件 默认情况下,Redis 仅监听本地 IP(127.0.0.1),需调整为允许远程连接:

    • 打开配置文件
      1
      sudo nano /usr/local/etc/redis.conf
      
    • 修改以下参数:
    • 绑定 IP:将 bind 127.0.0.1 改为 bind 0.0.0.0(允许所有 IP 访问)或替换为 Mac 的局域网 IP(如 192.168.1.100)。

    • 关闭保护模式:将 protected-mode yes 改为 protected-mode no

    • 设置密码(可选但推荐):取消注释 requirepass 并设置密码:
      1
      requirepass your_password
      
    • 保存并退出:按 Ctrl+O 保存,Ctrl+X 退出。
  2. 重启 Redis 服务(二选一即可)

    1
    2
    brew services restart redis  # 适用于Homebrew安装
    redis-server /usr/local/etc/redis.conf  # 手动重启
    

  3. 开放 Mac 防火墙端口

    • 图形化操作:
    • 进入 系统偏好设置 → 安全性与隐私 → 防火墙。
    • 点击锁图标解锁,选择 防火墙选项。
    • 点击 + 添加 redis-server 到允许列表。
    • 命令行操作(需管理员权限):
      1
      2
      sudo /usr/libexec/ApplicationFirewall/socketfilterfw --add /usr/local/bin/redis-server
      sudo /usr/libexec/ApplicationFirewall/socketfilterfw --unblockapp /usr/local/bin/redis-server
      

完成上述步骤之后,我们对 Windows 端进行配置(下载 Redis 客户端) 1. 安装 Redis 客户端 - 下载 Windows 版 Redis:从 Redis 官网 下载稳定版,解压到任意目录(如 C:\redis)。 - 添加到系统路径:将 C:\redis\bin 加入环境变量 PATH,以便在命令行直接使用 redis-cli。

  1. 连接 Redis 服务器
    • 命令格式:
      1
      redis-cli -h <Mac的局域网IP> -p 6379 -a <密码>
      
    • 示例(假设 Mac IP 为 192.168.1.100,密码为 your_redis_password):
      1
      redis-cli -h 192.168.1.100 -p 6379 -a your_redis_password
      
    • 验证连接:
      1
      2
      192.168.1.100:6379> PING
      PONG  # 连接成功
      

上面的步骤依旧不能确保 Redis 服务器在 Windows 上可以接受来自 Mac 的连接。(这里我们先留一个坑如果朋友们有想法也可以在评论区留下你们的解决方案)

2.2.4. macos 的数据消费者代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
 import redis
 import json
 from clickhouse_driver import Client
 from datetime import datetime
 import time

 # Redis配置
 REDIS_HOST = "localhost"  # 本地Redis或Windows的IP
 REDIS_PORT = 6379
 REDIS_QUEUE_NAME = "qmt_minute_queue"
 REDIS_PASSWORD = None  # 如果有密码,请设置

 # ClickHouse配置
 CLICKHOUSE_HOST = "localhost"
 CLICKHOUSE_PORT = 9000
 CLICKHOUSE_DB = "default"
 CLICKHOUSE_USER = "default"
 CLICKHOUSE_PASSWORD = ""

 def setup_redis_client():
     """初始化Redis客户端"""
     return redis.StrictRedis(
         host=REDIS_HOST, 
         port=REDIS_PORT, 
         password=REDIS_PASSWORD,
         decode_responses=True
     )

 def setup_clickhouse_client():
     """初始化ClickHouse客户端"""
     return Client(
         host=CLICKHOUSE_HOST,
         port=CLICKHOUSE_PORT,
         database=CLICKHOUSE_DB,
         user=CLICKHOUSE_USER,
         password=CLICKHOUSE_PASSWORD
     )

 def insert_to_clickhouse(client, data):
     """将分钟线数据插入到ClickHouse"""
     if not data["minute_data"]:
         print("没有数据需要插入")
         return 0

     query = """
     INSERT INTO minute_data 
     (ts_code, trade_time, open, high, low, close, vol, amount)
     VALUES
     """

     values = []
     for record in data["minute_data"]:
         values.append((
             record["ts_code"],
             datetime.strptime(record["trade_time"], "%Y-%m-%d %H:%M:%S"),
             record["open"],
             record["high"],
             record["low"],
             record["close"],
             record["vol"],
             record["amount"]
         ))

     if values:
         client.execute(query, values)
         return len(values)
     return 0

 def main():
     """主函数"""
     # 初始化客户端
     redis_client = setup_redis_client()
     clickhouse_client = setup_clickhouse_client()

     print("启动分钟线数据消费者,等待队列数据...")

     try:
         while True:
             # 尝试从Redis获取数据
             result = redis_client.brpop(REDIS_QUEUE_NAME, timeout=1)

             if result is None:
                 print("Redis队列为空,等待新数据...")
                 time.sleep(5)  # 等待5秒再次尝试
                 continue

             # 解析数据
             _, json_data = result
             data_package = json.loads(json_data)

             # 插入ClickHouse
             inserted_count = insert_to_clickhouse(clickhouse_client, data_package)
             print(f"成功插入分钟线数据: {data_package['ts_code']} - {data_package['trade_date']} ({inserted_count}条)")

     except KeyboardInterrupt:
         print("程序被手动中断")

     except Exception as e:
         print(f"程序执行异常: {str(e)}")

     finally:
         print("程序执行完毕")

 if __name__ == "__main__":
     main()

由于windows和mac之间的redis连接问题还尚未解决,我打算先把从 qmt 去到的分钟线数据存入 000001.SH_data.csv300750.SZ_data.csv,将其中的数据存入Clickhouse数据库中,并使用Python进行数据处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from xtquant import xtdata
import os
import pandas as pd

code_list = ['000001.SH', '300750.SZ']
period = '1h'
start_time = '20250101093000'
end_time = '20250201093000'

def on_data(datas):
    if datas:
        print(datas)
    else:
        print("数据下载失败或为空")

xtdata.download_history_data2(code_list, period, start_time, end_time, on_data)

# 创建目录(如果不存在)
save_dir = 'C:\\wbq'
os.makedirs(save_dir, exist_ok=True)

for code in code_list:
    data = xtdata.get_market_data_ex([], [code], period, start_time, end_time)
    if code in data and not data[code].empty:
        # 为每个股票创建单独的文件
        file_path = os.path.join(save_dir, f'{code}_data.csv')
        # 确保数据是DataFrame格式
        df = data[code]
        # 保存数据,添加错误处理
        try:
            df.to_csv(file_path)
            print(f'{code}数据保存到本地: {file_path}')
        except Exception as e:
            print(f'保存{code}数据时出错: {str(e)}')
    else:
        print(f'{code}没有获取到数据')

print("运行结束")