跳转至

21天驯化AI打工仔 - 我如何获取量化数据

IDEA:本人和本人的 AI黑奴 的相互协作,能不能在短短 21 天内开发出一套量化交易系统?

这么有意思的挑战,不如就从今天开始吧!“数据是一切开始的基础”,我打算先安排 AI黑奴 从数据获取开始做起。(感觉叫 AI黑奴 不太好听,那就给它取个名字叫:007号打码机,希望007号“牛码”可以“码力全开”)

好!下面我们正式准备开发工作!


Redis 启动!

我希望开发一个量化交易框架,并且使用 Redis 构建消息队列,从 Tushare 获取 OHLC 数据和复权因子。

首先,考虑代码的结构。需要分为两个主要部分:数据获取生产者和消费者处理数据。生产者从 Tushare 获取数据,然后将数据放入 Redis 队列;消费者从队列中取出数据进行处理。需要处理 Tushare 的 API 调用,包括获取 OHLC 和复权因子,这里可能需要使用 tushare 的 pro_api,所以需要用户提供 token。

接下来,Redis 部分,使用 redis-py 库,生产者使用 lpush 将数据推入队列,消费者使用 brpop 阻塞式获取数据。需要注意异常处理,比如 Tushare 的 API 调用可能失败,Redis 连接问题等,所以需要添加 try-except 块。

然后,考虑代码的独立性和可运行性。需要导入必要的库,如 redis、tushare、json 等。生产者函数中,获取数据后转换为 JSON 格式再推入队列。消费者函数中,取出数据后解析并打印,这里可以扩展为实际的处理逻辑,比如存储到数据库或进一步计算。

还要注意配置部分,将 Redis 的连接参数和 Tushare 的 token 作为配置项,方便用户修改。代码中使用注释说明需要用户替换的部分,比如 tushare_token 和 redis 的 host、port 等。

最后,确保 doubaocanvas 的格式正确,identifier 使用 “quant-data-redis-queue”,type 为 general_code,title 描述清楚。两个代码部分分别作为生产者和消费者的脚本,path 分别为 producer.py 和 consumer.py,description 准确。检查是否有未定义的变量,确保代码完整,可以独立运行,用户只需替换配置即可使用。

我的007表示这很简单,瞬间根据我的要求,给了两份代码文件:data_producer.pydata_consumer.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
"""data_consumer.py"""
import redis
import json
from typing import Dict

# 配置参数(需与生产者保持一致)
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def process_data(data_package: Dict):
    """数据处理逻辑(可扩展为实际业务处理)"""
    print(f"接收到数据时间: {data_package['timestamp']}")
    print(f"处理股票代码: {data_package['ts_code']}")
    print(f"OHLC数据条目数: {len(data_package['ohlc_data'])}")
    print(f"复权因子条目数: {len(data_package['adj_factor'])}")
    # 此处可添加数据存储/计算/分发等实际业务逻辑

def consume_data():
    """数据消费主函数(阻塞式获取)"""
    print("启动数据消费者,等待队列数据...")
    while True:
        try:
            # 阻塞式获取队列数据(超时时间30秒)
            _, json_data = redis_client.brpop(REDIS_QUEUE_NAME, timeout=30)
            data_package = json.loads(json_data)
            process_data(data_package)

        except Exception as e:
            print(f"数据处理异常: {str(e)}")
            continue

if __name__ == "__main__":
    consume_data()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""data_producer.py"""
import redis
import tushare as ts
import json
from datetime import datetime

# 配置参数
TUSHARE_TOKEN = "87a5a442931d15e749b4453b4f802c3b87794c3a5968b7b40a153edd"  # 替换为你的Tushare token
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

# 初始化连接
pro = ts.pro_api(TUSHARE_TOKEN)
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def fetch_ohlc_data(ts_code: str, start_date: str, end_date: str) -> list:
    """获取OHLC数据"""
    try:
        df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'vol']].to_dict('records')
    except Exception as e:
        print(f"OHLC数据获取失败: {str(e)}")
        return []

def fetch_adj_factor(ts_code: str, start_date: str, end_date: str) -> list:
    """获取复权因子"""
    try:
        df = pro.adj_factor(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'adj_factor']].to_dict('records')
    except Exception as e:
        print(f"复权因子获取失败: {str(e)}")
        return []

def produce_data(ts_code_list: list, date_range: tuple):
    """数据生产主函数"""
    start_date, end_date = date_range
    for ts_code in ts_code_list:
        # 封装数据结构
        data_package = {
            "timestamp": datetime.now().isoformat(),
            "ts_code": ts_code,
            "ohlc_data": fetch_ohlc_data(ts_code, start_date, end_date),
            "adj_factor": fetch_adj_factor(ts_code, start_date, end_date)
        }

        # 推入Redis队列
        redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
        print(f"已推送数据: {ts_code} - {start_date}{end_date}")

if __name__ == "__main__":
    # 示例参数
    STOCK_CODES = ["000001.SZ", "600519.SH"]  # 沪深300成分股示例
    DATE_RANGE = ("20230101", "20231231")   # 年度数据范围

    produce_data(STOCK_CODES, DATE_RANGE)

运行了两个文件没有反应,原来是这个电脑还没有下载并启动 Redis 。然后,我快速手动下载并启动,并夸赞了 007 贴心给我写了一段检查 Redis 是否正常启动的代码。

此时,运行 007 的“贴心代码”,就显示成功连接到 Redis:

为了直观地查看和管理 Redis 中的数据,我安装了 RedisInsight

Tushare 获取数据

完成了 Redis 的连接,直接来运行 007 提供的 data_producer.py,成功在 RedisInsight 中看到了数据:

再运行 data_consumer.py,成功从消息队列中取出数据。007 还为我讲解了一下 producer 和 consumer 之间的关系,今天又是依赖 AI 打工仔的一天呢~