跳转至


课程  因子投资  机器学习  Python  Poetry  ppw  tools  programming  Numpy  Pandas  pandas  算法  hdbscan  聚类  选股  Algo  minimum  numpy  algo  FFT  模式识别  配对交易  GBDT  LightGBM  XGBoost  statistics  CDF  KS-Test  monte-carlo  VaR  回测  过拟合  algorithms  machine learning  strategy  python  sklearn  pdf  概率  数学  面试题  量化交易  策略分类  风险管理  Info  interview  career  xgboost  PCA  wavelet  时序事件归因  SHAP  Figures  Behavioral Economics  graduate  arma  garch  人物  职场  Quantopian  figure  Banz  金融行业  买方  卖方  story  量化传奇  rsi  zigzag  穹顶压力  因子  ESG  因子策略  投资  策略  pe  ORB  Xgboost  Alligator  Indicator  factor  alpha101  alpha  技术指标  wave  quant  algorithm  pearson  spearman  tushare  因子分析  Alphalens  涨停板  herd-behaviour  momentum  因子评估  review  SMC  聪明钱  trade  history  indicators  zscore  波动率  强化学习  顶背离  freshman  resources  others  AI  DeepSeek  network  量子计算  金融交易  IBM  weekly  LLT  backtest  backtrader  研报  papers  UBL  quantlib  jupyter-notebook  scikit-learn  pypinyin  qmt  xtquant  blog  static-site  duckdb  工具  colors  free resources  barra  world quant  Alpha  openbb  数据  risk-management  llm  prompt  CANSLIM  Augment  arsenal  copilot  vscode  code  量化数据存储  hdf5  h5py  cursor  augment  trae  Jupyter  jupysql  pyarrow  parquet  数据源  quantstats  实盘  clickhouse  notebook  redis  remote-agent  AI-tools  Moonshot  回测,研报,tushare 

tools »

21天驯化AI打工仔 - 我如何获取量化数据


IDEA:本人和本人的 AI黑奴 的相互协作,能不能在短短 21 天内开发出一套量化交易系统?

这么有意思的挑战,不如就从今天开始吧!“数据是一切开始的基础”,我打算先安排 AI黑奴 从数据获取开始做起。(感觉叫 AI黑奴 不太好听,那就给它取个名字叫:007号打码机,希望007号“牛码”可以“码力全开”)

好!下面我们正式准备开发工作!


Redis 启动!

我希望开发一个量化交易框架,并且使用 Redis 构建消息队列,从 Tushare 获取 OHLC 数据和复权因子。

首先,考虑代码的结构。需要分为两个主要部分:数据获取生产者和消费者处理数据。生产者从 Tushare 获取数据,然后将数据放入 Redis 队列;消费者从队列中取出数据进行处理。需要处理 Tushare 的 API 调用,包括获取 OHLC 和复权因子,这里可能需要使用 tushare 的 pro_api,所以需要用户提供 token。

接下来,Redis 部分,使用 redis-py 库,生产者使用 lpush 将数据推入队列,消费者使用 brpop 阻塞式获取数据。需要注意异常处理,比如 Tushare 的 API 调用可能失败,Redis 连接问题等,所以需要添加 try-except 块。

然后,考虑代码的独立性和可运行性。需要导入必要的库,如 redis、tushare、json 等。生产者函数中,获取数据后转换为 JSON 格式再推入队列。消费者函数中,取出数据后解析并打印,这里可以扩展为实际的处理逻辑,比如存储到数据库或进一步计算。

还要注意配置部分,将 Redis 的连接参数和 Tushare 的 token 作为配置项,方便用户修改。代码中使用注释说明需要用户替换的部分,比如 tushare_token 和 redis 的 host、port 等。

最后,确保 doubaocanvas 的格式正确,identifier 使用 “quant-data-redis-queue”,type 为 general_code,title 描述清楚。两个代码部分分别作为生产者和消费者的脚本,path 分别为 producer.py 和 consumer.py,description 准确。检查是否有未定义的变量,确保代码完整,可以独立运行,用户只需替换配置即可使用。

我的007表示这很简单,瞬间根据我的要求,给了两份代码文件:data_producer.pydata_consumer.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
"""data_consumer.py"""
import redis
import json
from typing import Dict

# 配置参数(需与生产者保持一致)
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def process_data(data_package: Dict):
    """数据处理逻辑(可扩展为实际业务处理)"""
    print(f"接收到数据时间: {data_package['timestamp']}")
    print(f"处理股票代码: {data_package['ts_code']}")
    print(f"OHLC数据条目数: {len(data_package['ohlc_data'])}")
    print(f"复权因子条目数: {len(data_package['adj_factor'])}")
    # 此处可添加数据存储/计算/分发等实际业务逻辑

def consume_data():
    """数据消费主函数(阻塞式获取)"""
    print("启动数据消费者,等待队列数据...")
    while True:
        try:
            # 阻塞式获取队列数据(超时时间30秒)
            _, json_data = redis_client.brpop(REDIS_QUEUE_NAME, timeout=30)
            data_package = json.loads(json_data)
            process_data(data_package)

        except Exception as e:
            print(f"数据处理异常: {str(e)}")
            continue

if __name__ == "__main__":
    consume_data()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""data_producer.py"""
import redis
import tushare as ts
import json
from datetime import datetime

# 配置参数
TUSHARE_TOKEN = "87a5a442931d15e749b4453b4f802c3b87794c3a5968b7b40a153edd"  # 替换为你的Tushare token
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_QUEUE_NAME = "tushare_data_queue"

# 初始化连接
pro = ts.pro_api(TUSHARE_TOKEN)
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def fetch_ohlc_data(ts_code: str, start_date: str, end_date: str) -> list:
    """获取OHLC数据"""
    try:
        df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'vol']].to_dict('records')
    except Exception as e:
        print(f"OHLC数据获取失败: {str(e)}")
        return []

def fetch_adj_factor(ts_code: str, start_date: str, end_date: str) -> list:
    """获取复权因子"""
    try:
        df = pro.adj_factor(ts_code=ts_code, start_date=start_date, end_date=end_date)
        return df[['ts_code', 'trade_date', 'adj_factor']].to_dict('records')
    except Exception as e:
        print(f"复权因子获取失败: {str(e)}")
        return []

def produce_data(ts_code_list: list, date_range: tuple):
    """数据生产主函数"""
    start_date, end_date = date_range
    for ts_code in ts_code_list:
        # 封装数据结构
        data_package = {
            "timestamp": datetime.now().isoformat(),
            "ts_code": ts_code,
            "ohlc_data": fetch_ohlc_data(ts_code, start_date, end_date),
            "adj_factor": fetch_adj_factor(ts_code, start_date, end_date)
        }

        # 推入Redis队列
        redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
        print(f"已推送数据: {ts_code} - {start_date}{end_date}")

if __name__ == "__main__":
    # 示例参数
    STOCK_CODES = ["000001.SZ", "600519.SH"]  # 沪深300成分股示例
    DATE_RANGE = ("20230101", "20231231")   # 年度数据范围

    produce_data(STOCK_CODES, DATE_RANGE)

运行了两个文件没有反应,原来是这个电脑还没有下载并启动 Redis 。然后,我快速手动下载并启动,并夸赞了 007 贴心给我写了一段检查 Redis 是否正常启动的代码。

此时,运行 007 的“贴心代码”,就显示成功连接到 Redis:

为了直观地查看和管理 Redis 中的数据,我安装了 RedisInsight

Tushare 获取数据

完成了 Redis 的连接,直接来运行 007 提供的 data_producer.py,成功在 RedisInsight 中看到了数据:

再运行 data_consumer.py,成功从消息队列中取出数据。007 还为我讲解了一下 producer 和 consumer 之间的关系,今天又是依赖 AI 打工仔的一天呢~