WorldQuant? Word Count!
如果你去商场逛,你会发现,销量最好的店和最好的商品总是占据人气中心。对股票来说也是一样,被新闻和社交媒体频频提起的个股,往往更容易获得更大的成交量。
如果一支个股获得了人气,那它的成交量一定很大,两者之间有强相关关系。但是,成交量落后于人气指标。当一支个股成交量开始放量时,有可能已经追不上了(涨停)。如果我们能提前发现人气指标,就有可能获得提前介入的时机。
具体怎么操作(建模)呢?
我们首先讲解一点信息检索的知识,然后介绍如何运用统计学和信息检索的知识,来把上述问题模型化。
TF-IDF
TF是Term Frequency的意思。它是对一篇文档中,某个词语共出现了多少次的一个统计。IDF则是Inverse Document Frequency的意思,大致来说,如果一个词在越多的文档中出现,那么,它携带的信息量就越少。
很显然,我们几乎每句话都会用到『的、地、得』,这样的词由于太常常出现,就不携带信息量。新闻业常讲的一句话,狗咬人不是新闻,人咬狗才是新闻,本质上也是前者太常出现,所以就不携带信息量了。
最早发明TF-IDF的人应该是康奈尔大学的杰拉德·索尔顿(康奈尔大学的计算机其实很强)和英国的计算机科学家卡伦·琼斯。到今天,美国计算机协会(ACM)还会每三年颁发一次杰拉德·索尔顿奖,以表彰信息检索领域的突出贡献者。
根据TF-IDF的思想,这里提出一个word-count因子。它的构建方法是,通过tushare获取每天的新闻信息,用jieba进行分词,统计每天上市公司名称出现的次数。这是TF部分。
在IDF构建部分,我们做法与经典方法不一样,但更简单、更适合量化场景。这个方法就是,我们取TF的移动平均做为IDF。
最后,我们把当天某个词的出现频率除以它的移动平均的读数作为因子(使用排序归一化)。显然,这个数值越大,它携带的信息量也越大。
获取新闻文本数据
我们可以通过tushare的news接口来获取新闻。
这个方法是:
| news = pro.news(src='sina',
date=start,
end_date=end,
)
|
我们把获取的新闻数据先保存到本地,以免后面还可能进行其它挖掘:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | def fetch_news(start, end):
# tushare对新闻接口调用次数及单次返回的新闻条数都有限制
# 我们姑且设置为每30天做为一批次调用
# 如果是production code,需要仔细调试这个限制,以免遗漏新闻
date_range = pd.date_range(start=start, end=end)
dates = pd.DataFrame([], index = date_range)
freq = '30D'
grouped = dates.groupby(pd.Grouper(freq=freq))
groups = [group for _, group in grouped][::-1]
for group in groups:
period_start, period_end = group.index[0], group.index[-1]
start = period_start.strftime('%Y%m%d')
end = period_end.strftime('%Y%m%d')
news = pro.news(src='sina',
date=start,
end_date=end,
)
csv_file = os.path.join(data_home, f"{start}-{end}.news.csv")
news.to_csv(csv_file)
# 每小时能访问20次
time.sleep(181)
|
在统计新闻中上市公司出现的词频时,我们需要先给jieba增加自定义词典,以免出现分词错误。比如,如果不添加关键词『万科A』,那么它一定会被jieba分解为万科和A两个词。
增加自定义词典的代码如下:
| def init():
# get_stock_list 是自定义的函数,用于获取股票列表。在quantide research环境可用
stocks = get_stock_list(datetime.date(2024,11,1), code_only=False)
stocks = set(stocks.name)
for name in stocks:
jieba.add_word(name)
return stocks
|
这里得到的证券列表,后面还要使用,所以作为函数返回值。
接下来,就是统计词频了:
1
2
3
4
5
6
7
8
9
10
11
12
13 | def count_words(news, stocks)->pd.DataFrame:
data = []
for dt, content, _ in news.to_records(index=False):
words = jieba.cut(content)
word_counts = Counter(words)
for word, count in word_counts.items():
if word in stocks:
data.append((dt, word, count))
df = pd.DataFrame(data, columns=['date', 'word', 'count'])
df["date"] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
return df
|
tushare返回的数据共有三列,其中date, content是我们关心的字段。公司名词频就从content中提取。
然后我们对所有已下载的新闻进行分析,统计每日词频和移动均值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | def count_words_in_files(stocks, ma_groups=None):
ma_groups = ma_groups or [30, 60, 250]
# 获取指定日期范围内的数据
results = []
files = glob.glob(os.path.join(data_home, "*.news.csv"))
for file in files:
news = pd.read_csv(file, index_col=0)
df = count_words(news, stocks)
results.append(df)
df = pd.concat(results)
df = df.sort_index()
df = df.groupby("word").resample('D').sum()
df.drop("word", axis=1, inplace=True)
df = df.swaplevel()
unstacked = df.unstack(level="word").fillna(0)
for win in ma_groups:
df[f"ma_{win}"] = unstacked.rolling(window=win).mean().stack()
return df
count_words_in_files(stocks)
|
最后,完整的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | import os
import glob
import jieba
from collections import Counter
import time
data_home = "/data/news"
def init():
stocks = get_stock_list(datetime.date(2024,11,1), code_only=False)
stocks = set(stocks.name)
for name in stocks:
jieba.add_word(name)
return stocks
def count_words(news, stocks)->pd.DataFrame:
data = []
for dt, content, _ in news.to_records(index=False):
words = jieba.cut(content)
word_counts = Counter(words)
for word, count in word_counts.items():
if word in stocks:
data.append((dt, word, count))
df = pd.DataFrame(data, columns=['date', 'word', 'count'])
df["date"] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
return df
def count_words_in_files(stocks, ma_groups=None):
ma_groups = ma_groups or [30, 60, 250]
# 获取指定日期范围内的数据
results = []
files = glob.glob(os.path.join(data_home, "*.news.csv"))
for file in files:
news = pd.read_csv(file, index_col=0)
df = count_words(news, stocks)
results.append(df)
df = pd.concat(results)
df = df.sort_index()
df = df.groupby("word").resample('D').sum()
df.drop("word", axis=1, inplace=True)
df = df.swaplevel()
unstacked = df.unstack(level="word").fillna(0)
for win in ma_groups:
df[f"ma_{win}"] = unstacked.rolling(window=win).mean().stack()
return df.sort_index(), unstacked.sort_index()
def retry_fetch(start, end, offset):
i = 1
while True:
try:
df =pro.news(**{
"start_date": start,
"end_date": end,
"src": "sina",
"limit": 1000,
"offset": offset
}, fields=[
"datetime",
"content",
"title",
"channels",
"score"])
return df
except Exception as e:
print(f"fetch_new failed, retry after {i} hours")
time.sleep(i * 3600)
i = min(i*2, 10)
def fetch_news(start, end):
for i in range(1000):
offset = i * 1000
df = retry_fetch(start, end, offset)
df_start = arrow.get(df.iloc[0]["datetime"]).format("YYYYMMDD_HHmmss")
df_end = arrow.get(df.iloc[-1]["datetime"]).format("YYYYMMDD_HHmmss")
df.to_csv(os.path.join(data_home, f"{df_start}_{df_end}.news.csv"))
if len(df) == 0:
break
# tushare对新闻接口调用次数及单次返回的新闻条数都有限制
time.sleep(3.5 * 60)
stocks = init()
start = datetime.date(2023, 1, 4)
end = datetime.date(2024, 11, 20)
fetch_news(start, end)
# factor, raw = count_words_in_files(stocks)
|
最终因子化要通过factor["count"]/factor["ma_30"]来计算并执行rank,这里的ma_30可以替换为ma_60, ma_250等。
通过tushare似乎只能取到最近一年的数据,并且有可能每天的数据并没有取完整,所以,因子检测就不做了。感兴趣的同学可以在完善fetch_news的基础上(可以换别的数据源),自行验证。
在测试中,我数据检索到11月13日。