21天驯化AI打工仔 - 我如何获取量化数据
IDEA:本人和本人的 AI黑奴 的相互协作,能不能在短短 21 天内开发出一套量化交易系统?
这么有意思的挑战,不如就从今天开始吧!“数据是一切开始的基础”,我打算先安排 AI黑奴 从数据获取开始做起。(感觉叫 AI黑奴 不太好听,那就给它取个名字叫:007号打码机,希望007号“牛码”可以“码力全开”)
好!下面我们正式准备开发工作!
Redis 启动!
我希望开发一个量化交易框架,并且使用 Redis 构建消息队列,从 Tushare 获取 OHLC 数据和复权因子。
首先,考虑代码的结构。需要分为两个主要部分:数据获取生产者和消费者处理数据。生产者从 Tushare 获取数据,然后将数据放入 Redis 队列;消费者从队列中取出数据进行处理。需要处理 Tushare 的 API 调用,包括获取 OHLC 和复权因子,这里可能需要使用 tushare 的 pro_api,所以需要用户提供 token。
接下来,Redis 部分,使用 redis-py 库,生产者使用 lpush 将数据推入队列,消费者使用 brpop 阻塞式获取数据。需要注意异常处理,比如 Tushare 的 API 调用可能失败,Redis 连接问题等,所以需要添加 try-except 块。
然后,考虑代码的独立性和可运行性。需要导入必要的库,如 redis、tushare、json 等。生产者函数中,获取数据后转换为 JSON 格式再推入队列。消费者函数中,取出数据后解析并打印,这里可以扩展为实际的处理逻辑,比如存储到数据库或进一步计算。
还要注意配置部分,将 Redis 的连接参数和 Tushare 的 token 作为配置项,方便用户修改。代码中使用注释说明需要用户替换的部分,比如 tushare_token 和 redis 的 host、port 等。
最后,确保 doubaocanvas 的格式正确,identifier 使用 “quant-data-redis-queue”,type 为 general_code,title 描述清楚。两个代码部分分别作为生产者和消费者的脚本,path 分别为 producer.py 和 consumer.py,description 准确。检查是否有未定义的变量,确保代码完整,可以独立运行,用户只需替换配置即可使用。
我的007表示这很简单,瞬间根据我的要求,给了两份代码文件:data_producer.py
和 data_consumer.py
。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | """data_consumer.py"""
import redis
import json
from typing import Dict
# 配置参数(需与生产者保持一致)
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
def process_data(data_package: Dict):
"""数据处理逻辑(可扩展为实际业务处理)"""
print(f"接收到数据时间: {data_package['timestamp']}")
print(f"处理股票代码: {data_package['ts_code']}")
print(f"OHLC数据条目数: {len(data_package['ohlc_data'])}")
print(f"复权因子条目数: {len(data_package['adj_factor'])}")
# 此处可添加数据存储/计算/分发等实际业务逻辑
def consume_data():
"""数据消费主函数(阻塞式获取)"""
print("启动数据消费者,等待队列数据...")
while True:
try:
# 阻塞式获取队列数据(超时时间30秒)
_, json_data = redis_client.brpop(REDIS_QUEUE_NAME, timeout=30)
data_package = json.loads(json_data)
process_data(data_package)
except Exception as e:
print(f"数据处理异常: {str(e)}")
continue
if __name__ == "__main__":
consume_data()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | """data_producer.py"""
import redis
import tushare as ts
import json
from datetime import datetime
# 配置参数
TUSHARE_TOKEN = "87a5a442931d15e749b4453b4f802c3b87794c3a5968b7b40a153edd" # 替换为你的Tushare token
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"
# 初始化连接
pro = ts.pro_api(TUSHARE_TOKEN)
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
def fetch_ohlc_data(ts_code: str, start_date: str, end_date: str) -> list:
"""获取OHLC数据"""
try:
df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
return df[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'vol']].to_dict('records')
except Exception as e:
print(f"OHLC数据获取失败: {str(e)}")
return []
def fetch_adj_factor(ts_code: str, start_date: str, end_date: str) -> list:
"""获取复权因子"""
try:
df = pro.adj_factor(ts_code=ts_code, start_date=start_date, end_date=end_date)
return df[['ts_code', 'trade_date', 'adj_factor']].to_dict('records')
except Exception as e:
print(f"复权因子获取失败: {str(e)}")
return []
def produce_data(ts_code_list: list, date_range: tuple):
"""数据生产主函数"""
start_date, end_date = date_range
for ts_code in ts_code_list:
# 封装数据结构
data_package = {
"timestamp": datetime.now().isoformat(),
"ts_code": ts_code,
"ohlc_data": fetch_ohlc_data(ts_code, start_date, end_date),
"adj_factor": fetch_adj_factor(ts_code, start_date, end_date)
}
# 推入Redis队列
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
print(f"已推送数据: {ts_code} - {start_date}至{end_date}")
if __name__ == "__main__":
# 示例参数
STOCK_CODES = ["000001.SZ", "600519.SH"] # 沪深300成分股示例
DATE_RANGE = ("20230101", "20231231") # 年度数据范围
produce_data(STOCK_CODES, DATE_RANGE)
|
运行了两个文件没有反应,原来是这个电脑还没有下载并启动 Redis 。然后,我快速手动下载并启动,并夸赞了 007 贴心给我写了一段检查 Redis 是否正常启动的代码。

此时,运行 007 的“贴心代码”,就显示成功连接到 Redis:

为了直观地查看和管理 Redis 中的数据,我安装了 RedisInsight
Tushare 获取数据
完成了 Redis 的连接,直接来运行 007 提供的 data_producer.py
,成功在 RedisInsight 中看到了数据:

再运行 data_consumer.py
,成功从消息队列中取出数据。007 还为我讲解了一下 producer 和 consumer 之间的关系,今天又是依赖 AI 打工仔的一天呢~
