跳转至


课程  因子投资  机器学习  Python  Poetry  ppw  tools  programming  Numpy  Pandas  pandas  算法  hdbscan  聚类  选股  Algo  minimum  numpy  algo  FFT  模式识别  配对交易  GBDT  LightGBM  XGBoost  statistics  CDF  KS-Test  monte-carlo  VaR  回测  过拟合  algorithms  machine learning  strategy  python  sklearn  pdf  概率  数学  面试题  量化交易  策略分类  风险管理  Info  interview  career  xgboost  PCA  wavelet  时序事件归因  SHAP  Figures  Behavioral Economics  graduate  arma  garch  人物  职场  Quantopian  figure  Banz  金融行业  买方  卖方  story  量化传奇  rsi  zigzag  穹顶压力  因子  ESG  因子策略  投资  策略  pe  ORB  Xgboost  Alligator  Indicator  factor  alpha101  alpha  技术指标  wave  quant  algorithm  pearson  spearman  tushare  因子分析  Alphalens  涨停板  herd-behaviour  momentum  因子评估  review  SMC  聪明钱  trade  history  indicators  zscore  波动率  强化学习  顶背离  freshman  resources  others  AI  DeepSeek  network  量子计算  金融交易  IBM  weekly  LLT  backtest  backtrader  研报  papers  UBL  quantlib  jupyter-notebook  scikit-learn  pypinyin  qmt  xtquant  blog  static-site  duckdb  工具  colors  free resources  barra  world quant  Alpha  openbb  数据  risk-management  llm  prompt  CANSLIM  Augment  arsenal  copilot  vscode  code  量化数据存储  hdf5  h5py  cursor  augment  trae  Jupyter  jupysql  pyarrow  parquet  数据源  quantstats  实盘  clickhouse  notebook  redis  remote-agent  AI-tools  Moonshot  回测,研报,tushare 

tools »

涨时重势,跌时重质,Moonshot首测股息率因子给出结论


Abstract

  1. 在 tushare 中如何获取股息率?如何利用其分页机制,加快取数据的速度?
  2. 如何实现按股息率筛选?特别介绍 pandas的transform 与 apply方法比较
  3. 股息率数据的 alpha

这是复现基本面月度调仓策略的第三篇。在第一篇里,我们介绍了月度调仓的核心思想。在第二篇里,我们介绍了研报要求的数据清单,并以 tushare 为例,介绍了如何获取日线行情数据,并且实现了数据增量更新的一个高性能、但又极简的框架。

现在,我们就进入到第二阶段,逐步增加因子,并进行回测。

我们首先要添加的是股息率,并且根据股息率来实现股票池的筛选。

获取股息率

在 tushare 中,我们有两种方案可以获取股息率。其一是通过 daily_basic 接口。其二是先通过 dividend 接口获取每股分红,再除以每股股价,即可得到股息率。

在这里,我们只演示第一种方法。但在后面实现按两年连续分红条件筛选公司时,我们会演示如何使用 dividend 接口。

daily_basic 接口可用来获取以日期为索引的一些常用数据,比如当日收盘价、换手率、市盈率、市值等大约15列数据。它的签名如下:

1
2
3
4
def daily_basic(
    ts_code: str, trade_date: str, start_date: str, end_date: str
) -> pd.DataFrame:
    pass

其中 ts_code 与 trade_date 必选其一。与其它多数 tushare 函数一样,它有返回记录限制,目前是6000条。这样在一次存取中,可以取某支股票25年左右的数据,或者所有股票一天的数据。

Attention

一次可存取记录条数限制可能取决于你的账号。这里6000条是积分5000以上账号的限制。

下面的代码演示了如何获取股息率及 PE 等数据:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# example-1
def fetch_dv_ttm(start: datetime, end: datetime) -> pd.DataFrame:
    pro = pro_api()
    cols = "ts_code,trade_date,dv_ttm,total_mv,turnover_rate,pe_ttm"
    dfs = []
    for dt in pd.bdate_range(start, end):
        dtstr = dt.strftime("%Y%m%d")
        df = pro.daily_basic(trade_date=dtstr, fields=cols)
        dfs.append(df)

    return pd.concat(dfs)


df = fetch_dv_ttm(datetime.date(2019, 10, 8), datetime.date(2019, 10, 12))
df

大约每0.5秒能获取一天的数据。这样获取一年的数据,大约需要2分钟。

根据股息率筛选

现在,我们就实现按股息率筛选出每日前500名个股,然后用 moonshot 回测下,看看这样构建的股票池本身是否有价值。

1
2
3
4
from helper import (ParquetUnifiedStorage, dividend_yield_screen, fetch_bars,
                    fetch_dv_ttm)
import tushare as ts
from moonshot import Moonshot
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from moonshot import Moonshot

def dividend_yield_screen(data: pd.DataFrame, n: int = 500)->pd.Series:
    """股息率筛选方法

    对每个月的股息率进行排名,选择前n名股票,标记为1,
    与现有flag进行逻辑与运算

    Args:
        n: 每月选择的股票数量,默认500
    """
    logger.info("开始进行股息率筛选...")

    if 'dv_ttm' not in data.columns:
        raise ValueError("数据中不存在 dv_ttm 列,无法应用筛选器")

    def rank_top_n(group):
        # 计算每个股票在当月的排名(降序,股息率高的排名靠前)
        ranks = group.rank(method='first', ascending=False)

        return (ranks <= n).astype(int)

    # 按date分组,对 dividend_rate_ttm 进行排名筛选
    dividend_flags = data.groupby(level='month')['dv_ttm'].transform(rank_top_n)

    logger.info(f"已筛选出前{n}名股息率股")
    return dividend_flags

这个筛选方法是 pandas 中常用的 groupby/apply 套路。类似的方法有 apply, transform ,agg和map等。它们的主要不同,在于对输入输出的类型不同。

map 只能接收 Series 对象作为输入,按单个元素进行转换映射,输出与输入长度一致;transform, agg, apply在输入上,既可以是 Series,也可以是 DataFrame; 但agg 会导致输出数据维度缩减;transform 则保持不变(一对一映射变换);而 apply 则较为灵活,输出形状较复杂。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
start = datetime.date(2018, 1, 1)
end = datetime.date(2023, 12, 31)

store_path = data_home / "rw/bars.parquet"
bars_store = ParquetUnifiedStorage(store_path=store_path, fetch_data_func=fetch_bars)

barss = bars_store.load_data(start, end)
ms = Moonshot(barss)

store_path = data_home / "rw/dv_ttm.parquet"
dv_store = ParquetUnifiedStorage(store_path=store_path, fetch_data_func=fetch_dv_ttm)

dv_ttm = dv_store.load_data(start, end)

ms.append_factor(dv_ttm, "dv_ttm", resample_method="last")

output = get_jupyter_root_dir() / "reports/moonshot_v3.html"
# 筛选!回测!报告
(
    ms.screen(dividend_yield_screen, data=ms.data, n=500)
    .calculate_returns()
    .report(output=output)
)

Moonshot 的代码很简单,但也很强大:要按股息率,在每月结尾时进行股票池筛选,筛选器函数的核心部分仅令4行代码即可完成。这得益于我们梳理出来的清晰的数据结构。

最终要应用这个筛选器也很简单。我们首先获取日线行情数据,初始化一个 Moonshot 对象,然后再获得同样频率下的 dv_ttm(即股息率)数据,通过 append_factor 方法将股息率数据添加到 Moonshot 中: 在这里,我们让 moonshot 框架自动完成了月线重采样以及数据的对齐操作。

最后,让 screen 方法开始工作,计算收益,绘制策略评估指标。由于 moonshot 在设计上使用了了链式调用,所以,这里的工作任务是一气呵成。

最后,策略报告显示,在2018年到2023年间,股息率筛选本身就具有一定的 alpha:

累积收益对照

从图中可以看出,在2018年、2022和2023年,下跌之中,股息率较高的个股更加抗跌;而在19年到2021年间,股市处于上涨之中,股息率高的个股,上涨就不如其它个股。

Info

研究平台用户请双击 /reports/moonshot_v3.html 来查看更详细的报告。此目录和文件可以 jupyter lab 的侧边栏中找到。

涨时重势,跌时重质。这句股谚在这张图中得到充分体现。

为什么在股市上升期,股息率较高的个股涨势不如其它个股?因为在这些个股中,存在相当比例的价值投资者,他们时刻会警惕价格有没有过度偏价值,从而股息率成为价格的锚定工具;而垃圾股的炒作全凭想像和故事,反倒不会受到任何锚定物的牵绊

同样地,在股市下降期,股息率较高的个股不容易下跌:因为一旦价格向下过份偏离,价值投资者就会入场。

但长期来看,股息率较高的个股,累积收益会更高,存在显著的 alpha和夏普;而它们的波动更小,投资体验更好。时间的玫瑰,更值得拥有。

ParquetUnifiedStorage

ParquetUnifiedStorage是一个简单的本地存储方案。我们在上一篇文章中介绍过,在本节中我们进一步进行了拓展,使得它可以支持多种数据存取,并且能自动更新。

它的用法是,在定义 store 时,就传入一个回调函数,用来在本地缓存没有数据时,能够自动从数据源中获取数据。如此一来,调用者就只需要使用 store.load_data(start, end)就能自动获取在[start, end]区间的数据,同时又充分利用了缓存。这会是我们在进行小型研究,没有技术团队支持时的好帮手。

题外话: Tushare 中的分页读取

在 Tushare 中,数据查询一般会有6000条记录限制。但是,有一些查询允许指定 start_date 和 end_date 参数,这种情况下,返回数据集的大小并不确定,一旦时间跨度较长,数据集大小就会超过这个限制。

这种情况下,根据官方文档,我们可以通过修改查询条件,以减少查询返回数据集的大小,从而确保返回的数据集是完整的。比如,要获取 A 股所有个股一年的日线数据,我们可以按证券列表遍历,或者按日期遍历。这样每一次返回的结果集都是完整的;但是,这样一来,就不可避免地带来性能损失。比如,前者涉及到5000次左右的网络请求,后者涉及到250次左右的网络请求;按每次能返回的最大数据集行数算,实际上只需要225次左右的网络请求。所以,理论上还有至少10%的优化空间。

Tip

A 股是在最近几年才扩容到今天的5413家的。在2018年之前,上市股总数大约在1800家之间。所以,当我们遍历到那一年之前时,每次请求就只利用了不到1/3的返回容量,性能浪费就更大了。

分页查询的参数在正式文档中没有给出。但我们可以通过数据工具来查看某个 API 是否有支持分页.

在下面的截图中,左图显示了获取日线行情的 API daily的文档。在这里,我们发现它并不支持分页查询。但是,如果我们转到数据工具页面,就可以看到该 API 是支持分页查询的。

从图中可以看出,它用来实现分页的两个参数是 offset 和 limit。但是,这里还有一个隐藏的问题,就是 offset 本身也有最大限制,比如10万。这就给算法带来了额外的复杂度:我们必须考虑这样的情况,在一次[start, end]的查询中,理论上结果记录数应该是15万条,但实际上只能返回10万条,因此我们必须重新查询,但这10万条不知道会在哪一天截断,所以我们还要知道两件事:

  1. tushare 查询返回的时间顺序
  2. 截断日期怎么确定

下面的代码仅对此 API 有效。当我们运用到其它数据时,要考虑 tushare 返回结果的时间顺序,这会影响下一次查询区间的确定。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# example-2
def _fetch_dv_ttm(start: datetime.date, end: datetime.date):
    """递归获取完整的daily_basic数据,处理offset限制问题"""
    dfs = []
    pro = pro_api()
    cols = "ts_code,trade_date,dv_ttm,total_mv,turnover_rate,pe_ttm"

    page_size = 6_000
    offset_limit = 100_000

    current_start = start
    current_end = end

    def fetch_batch(batch_start: datetime.date, batch_end: datetime.date):
        batch_dfs = []
        last_trade_date = None

        for i in range(0, int(offset_limit / page_size)):
            offset = i * page_size
            df = pro.daily_basic(
                start_date=batch_start.strftime("%Y%m%d"),
                end_date=batch_end.strftime("%Y%m%d"),
                fields=cols,
                offset=offset,
                pagesize=page_size,
            )

            if len(df) == 0:
                break

            batch_dfs.append(df)
            last_trade_date = df.iloc[-1]["trade_date"]  # 最后一条记录的日期

            # 如果返回的数据少于page_size,说明已经获取完毕
            if len(df) < page_size:
                return batch_dfs, None

        # 如果达到了offset_limit,返回最后获取到的交易日期
        return batch_dfs, last_trade_date

    # 主循环:处理可能需要多次调用的情况
    while current_start <= current_end:
        batch_dfs, last_date = fetch_batch(current_start, current_end)
        print(f"获取数据: {current_start} ~ {current_end},最后数据日: {last_date}")
        dfs.extend(batch_dfs)

        if last_date is None:
            # 数据获取完毕
            break

        # 将last_date转换为datetime.date格式
        last_date_obj = datetime.datetime.strptime(last_date, "%Y%m%d").date()

        # 确保new_end不小于start
        if last_date_obj < start:
            break

        current_end = last_date_obj

    if dfs:
        result_df = pd.concat(dfs, ignore_index=True)
        # 去重,因为可能有重复的日期数据
        result_df = result_df.drop_duplicates(subset=["ts_code", "trade_date"])
        # 按交易日期排序
        result_df = result_df.sort_values(["trade_date", "ts_code"])
        return result_df
    else:
        return pd.DataFrame()

在同样的起止区间(2019年10月8日到2019年12月31日)里,示例1需要45.5秒;示例2需要24秒左右。如果我们存取的时间区间更早一点,那么这个加速比还会更大一点,因为早期上市公司的数量更少。

不过,尽管如此,我们还是要谨慎对待这些参数的使用。至少需要准备好回归测试,以防在 tushare 修改接口时,能第一时间发现变化。