跳转至




2025

Numpy应用案例[1]

“在很多量化场景下,我们都需要统计某个事件连续发生的次数,比如连续涨停、N 连阳等。通过 Numpy 的向量化操作,我们可以快速实现这些需求,既高效又简洁。”


1. 连续值统计

在很多量化场景下,我们都需要统计某个事件连续发生了多少次,比如,连续涨跌停、N连阳、计算Connor's RSI中的streaks等等。比如,要判断下列收盘价中,最大的连续涨停次数是多少?最长的N连阳数是多少?

1
2
a = [15.28, 16.81, 18.49, 20.34, 21.2, 20.5, 22.37, 24.61, 27.07, 29.78, 
    32.76, 36.04]

假设我们以10%的涨幅为限,则可以将上述数组转换为:

1
2
pct = np.diff(a) / a[:-1]
pct > 0.1

我们将得到以下数组:

1
2
flags = [True, False,  True, False, False, False,  True, False,  True,
        True,  True]

这仍然不能计算出最大连续涨停次数,但它是很多此类问题的一个基本数据结构,我们将原始的数据按条件转换成类似的数组之后,就可以使用下面的神器了:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from numpy.typing import ArrayLike
from typing import Tuple
import numpy as np

def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find runs of consecutive items in an array.

    Args:
        x: the sequence to find runs in

    Returns:
        A tuple of unique values, start indices, and length of runs
    """

    # ensure array
    x = np.asanyarray(x)
    if x.ndim != 1:
        raise ValueError("only 1D array supported")
    n = x.shape[0]

    # handle empty array
    if n == 0:
        return np.array([]), np.array([]), np.array([])

    else:
        # find run starts
        loc_run_start = np.empty(n, dtype=bool)
        loc_run_start[0] = True
        np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
        run_starts = np.nonzero(loc_run_start)[0]

        run_values = x[loc_run_start]  # find run values
        run_lengths = np.diff(np.append(run_starts, n))  # find run lengths

        return run_values, run_starts, run_lengths

1
2
3
pct = np.diff(a) / a[:-1]
v,s,l = find_runs(pct > 0.099)
(v, s, l)

输出结果为:

(array([ True, False, True]), array([0, 3, 6]), array([3, 3, 5]))

输出结果是一个由三个数组组成的元组,分别表示:

  • value: unique values
  • start: start indices
  • length: length of runs

在上面的输出中,v[0]为True,表示这是一系列涨停的开始,s[0]则是对应的起始位置,此时索引为0; l[0]则表示该连续的涨停次数为3次。同样,我们可以知道,原始数组中,最长连续涨停(v[2])次数为5(l[2]),从索引6(s[2])开始起。

所以,要找出原始序列中的最大连续涨停次数,只需要找到l中的最大值即可。但要解决这个问题依然有一点技巧,我们需要使用第4章中介绍的 mask array。

1
2
3
4
v_ma = np.ma.array(v, mask = ~v)
pos = np.argmax(v_ma * l)

print(f"最大连续涨停次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

在这里,mask array的作用是,既不让 v == False 的数据参与计算


(后面的 v_ma * l),又保留这些元素的次序(索引)不变,以便后面我们调用 argmax 函数时,找到的索引跟v, s, l中的对应位置是一致的。

我们创建的v_ma是一个mask array,它的值为:

1
2
3
masked_array(data=[True, --, True],
             mask=[False,  True, False],
       fill_value=True)

当它与另一个整数数组相乘时,True就转化为数字1,这样相乘的结果也仍然是一个mask array:

1
2
3
masked_array(data=[3, --, 5],
             mask=[False,  True, False],
       fill_value=True)

当arg_max作用在mask array时,它会忽略掉mask为True的元素,但保留它们的位置,因此,最终pos的结果为2,对应的 v,s,l中的元素值分别为: True, 6, 5。

如果要统计最长N连涨呢?这是一个比寻找涨停更容易的任务。不过,这一次,我们将不使用mask array来实现:

1
2
3
4
v,s,l = find_runs(np.diff(a) > 0)
pos = np.argmax(v * l)

print(f"最长N连涨次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

输出结果是: 最长N连涨次数6,从索引5:20.5开始。


这里的关键是,当Numpy执行乘法时,True会被当成数字1,而False会被当成数字0,于是,乘法结果自然消除了没有连续上涨的部分,从而不干扰argmax的计算。

当然,使用mask array可能在语义上更清楚一些,尽管mask array的速度会慢一点,但正确和易懂常常更重要。

2. 计算 Connor's RSI中的streaks

Connor's RSI(Connor's Relative Strength Index)是一种技术分析指标,它是由Nirvana Systems开发的一种改进版的相对强弱指数(RSI)。Connor's RSI与传统RSI的主要区别在于它考虑了价格连续上涨或下跌的天数,也就是所谓的“连胜”(winning streaks)和“连败”(losing streaks)。这种考虑使得Connor's RSI能够更好地反映市场趋势的强度。

在前面介绍了find_runs函数之后,计算streaks就变得非常简单了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def streaks(close):
    result = []
    conds = [close[1:]>close[:-1], close[1:]<close[:-1]]
    flags = np.select(conds, [1, -1], 0)
    v, _, l = find_runs(flags)
    for i in range(len(v)):
        if v[i] == 0:
            result.extend([0] * l[i])
        else:
            result.extend([v[i] * x for x in range(1, (l[i] + 1))])
    return np.insert(result, 0, 0)

这段代码首先将股价序列划分为上涨、下跌和平盘三个子系列,然后对每个子系列计算连续上涨或下跌的天数,并将结果合并成一个新的数组。在streaks中,连续上涨天数要用正数表示,连续下跌天数用负数表示,所以在第5行中,通过np.select将条件数组转换为[1, 0, -1]的序列,后面使用乘法就能得到正确的连续上涨(下跌)天数了。


Numpy核心语法[6]

“Masked Array 是 Numpy 中的重要概念,能帮助我们在保持数据完整性的同时,屏蔽无效值进行运算。而 ufunc 则通过底层 C 实现的向量化操作,让复杂计算变得高效且简洁。”


1. Masked Array

你可能常常在一些接近底层的库中,看到 Numpy masked array 的用法。Masked Array 是 Numpy 中很重要的概念。考虑这样的情景,你有一个数据集,其中包含了一些缺失的数据或者无效值。这些”不合格“的数据,可能以 np.nan,np.inf, None 或者其它仅仅是语法上有效的值来表示(比如,在 COVID-19 数据集中,病例数出现负数)的。如何在保持数据集的完整性不变的前提下,仍然能对数据进行运算呢?

Note

这里有一个真实的例子。你可以在 Kaggle 上找到一个 COVID-19 的数据集,这个数据集中,就包含了累积病例数为负数的情况。该数据集由 Johns Hoopkins University 收集并提供。

很显然,我们无法直接对这些数据进行运算。请看下面的例子:

1
2
3
x = np.array([1, 2, 3, np.inf, np.nan, None])
np.mean(x)
np.nanmean(x)

只要数据中包含 np.nan, np.inf 或者 None,numpy 的函数就无法处理它们。即使数据在语法上合法,但在语义上无效,Numpy 强行进行计算,结果也是错误的。


这里有一个量化中可能遇到的真实场景,某公司有一年的年利润为零,这样使得它的 YoY 利润增长在次年变得无法计算。如果我们需要利用 YoY 数据进一步进行运算,我们就需要屏蔽这一年的无效值。否则,我们会连 YoY 利润的均值都无法计算出来。

这里有一个补救的方法,就是将原数据拷贝一份,并且将无效值替换为 np.nan。此后多数运算,都可以用np.nan*来计算。这个方法我们已经介绍过了。但是,如果你是原始数据的收集者,显然你应该照原样发布数据,任何修改都是不合适的;如果你是数据的应用者,当然应该对数据进行预处理后,才开始运算。但是,你又很可能缺少了对数据进行预处理所必须的信息 -- 你怎么能想到像-1, 0 这样看起来人畜无害的小可爱们,竟然是隐藏着的错误呢?

为了解决这个问题,Numpy 就提供了 Masked Array。但是我们不打算在 Masked Array 上过多着墨。关于 Masked Array,我们可以借用这样一句话,很多人不需要知道 Masked Array,知道 Masked Array 的人则已经精通它了。

有一点需要注意的是,仅在需要时,使用 Masked Array。因为可能与您相像的相反,Masked Array 不会提高性能,反而,它大大降低了性能:

1
2
3
4
5
6
7
8
import numpy as np

# NUMPY VERSION 1.24.4
g = np.random.random((5000,5000))
indx = np.random.randint(0,4999,(500,2))
g_nan = g.copy()
g_nan[indx] = np.nan
mask =  np.full((5000,5000),False,dtype=bool)

1
2
3
4
5
6
7
mask[indx] = True
g_mask = np.ma.array(g,mask=mask)

%timeit (g_mask + g_mask)**2
# 901 MS ± 52.3 MS PER LOOP ...
%timeit (g_nan + g_nan)**2
# 109 MS ± 72.2 ΜS PER LOOP ...

可以看出,Masked Array 的性能慢了接近 9 倍。

Tip

如果你不得不对含有 np.nan 的数组进行运算,那么可以尝试使用 bottleneck 库中的 nan *函数。由于并不存在 nansquare 函数,但是考虑到求方差的运算中必然包含乘方运算,因此我们可以考虑通过 nanvar 函数来评测 numpy 与 bottleneck 的性能差异。

1
2
3
4
5
6
    %timeit np.var(g_mask)
    # 587 MS ± 37.9 MS PER LOOP ...
    %timeit np.nanvar(g_nan)
    # 281 MS ± 1.46 MS PER ...
    %timeit nanvar(g_nan)
    # 61 MS ± 362 ΜS PER LOOP ...

bottleneck 要比 numpy 快接近 5 倍。如果你使用的 numpy 版本较旧,那么 bottleneck 还会快得更多。


2. ufunc

ufunc 是 Numpy 中的重要概念,它对两个输入数组同时进行逐元素的操作(比如,相加,比较大小等)。在 Numpy 中大约定义了 61 个左右的 ufunc。这些操作都是由底层的 C 语言实现的,并且支持向量化,因此,它们往往具有更快的速度。

比如,在 numpy 中,求数组中的最大值,有两个相似的函数, np.maxnp.maximum可以达成这一目标。后者是 ufunc,前者不是,两者除了用法上有所区别之外,后者的速度也要快一些。

1
2
3
4
5
6
arr = np.random.normal(size=(1_000_000,))

%timeit np.max(arr)
# 801 MS ± 54.7 MS PER LOOP ...
%timeit np.maximum.reduce(arr)
# 775 MS ± 12.1 MS PER LOOP ...

np.maximum作为 ufunc,它本来是要接收两个参数的,并不能用来求一维数组的最大值。这种情况下,我们要使用reduce操作才能得到想要的结果。

这里np.maximum是一个 ufunc,则reduce是 unfunc 对象(在 Python 中,一切都是对象,包括函数)的属性之一。ufunc的其它属性还有accumulateouterreduceat等。

accumulate是 ufunc 中的另一个常用属性,可能你之前已经有所接触。比如,在求最大回撤时,我们就会用到它:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 模拟一个股价序列
n = 1000
xs = np.random.randn(n).cumsum()

# 最大回撤结束期
i = np.argmax(np.maximum.accumulate(xs) - xs) 

# 最大回撤开始期
j = np.argmax(xs[:i]) 

# 最大回撤
mdd = (xs[j] - xs[i])/xs[j]

plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)

50%

简洁就是美。在使用了accumulate之后,我们发现,计算最大回撤竟简单到只有两三行代码即可实现。


ufunc 如此好用,你可能要问,为何我用到的却不多?实际上,你很可能每天都在使用ufunc。许多二元数学操作,它们都是对 ufunc 的封装。比如,当我们调用A + B时,实际上是调用了np.add(A, B)这个 ufunc。二者在功能和性能上都是等价的。其它的 ufunc 还有逻辑运算、比较运算等。只要某种运算接受两个数组作为参数,那么,很可能 Numpy 就已经实现了相应的 ufunc 操作。此外,一些三角函数,尽管只接受一个数组参数,但它们也是 ufunc。

因此,我们需要特别关注和学习的 ufunc 函数,可能主要就是maximumminimum等。这里再举一个在量化场景下,使用maximum的常用例子 -- 求上影线长度。

Tip

长上影线是资产向上攻击失败后留下的痕迹。它对股价后来的走势分析有一定帮助。首先,资金在这个点位发起过攻击,暴露了资金的意图。其次,攻击失败,接下来往往会有洗盘(或者溃败)。股价底部的长上影线,也被有经验的股民称为仙人指路。后面出现拉升的概率较大。上影线出现在高位时,则很可能是见顶信号。此时在较低级别的 k 线上,很可能已经出现均线拐头等比较明显的见顶信号。

现在,我们就来实现长上影线的检测。上影线的定义是:

upper_shadow=highmax(open,close) upper\_shadow = high - max(open, close)

下图也显示了上影线:


如果 upper_shadow > threshold,则可认为出现了长上影线(当然,需要对 upper_shadow 进行归一化)。检测单日的上影线很简单,我们下面的代码将演示如何向量化地求解:

《因子投资与机器学习策略》喊你上课啦!

面向策略研究员的专业课程,涵盖因子挖掘因子检验和基于机器学习的策略开发三大模块,构建你的个人竞争优势!

    课程助教: 宽粉
  1. 全网独家精讲 Alphalens 分析报告,助你精通因子检验和调优。
  2. 超 400 个独立因子,分类精讲底层逻辑,学完带走 350+ 因子实现。
  3. 课程核心价值观:Learning without thought is labor lost. Know-How & Know-Why.
  4. 三大实用模型,奠定未来研究框架1:聚类算法寻找配对交易标的(中性策略核心)、基于 XGBoost 的资产定价、趋势交易模型。
  5. 领先的教学手段:SBP(Slidev Based Presentation)、INI(In-place Notebook Interaction)和基于 Nbgrader(UCBerkley 使用中)的作业系统。

1. 示例模型思路新颖。未来一段时间,你都可以围绕这些模型增加因子、优化参数,构建出领先的量化策略系统。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import numpy as np
import pandas as pd

rng = np.random.default_rng(seed=78)
matrix = rng.uniform(0.98, 1.02, (4, 30)).cumprod(axis=1)
opn = matrix[0]
close = matrix[-1]
high = np.max(matrix, axis=0)

upper_shadow = (high - np.maximum(opn, close))/close
np.round(upper_shadow, 2)

第 10 行的代码完全由 ufunc 组成。这里我们使用了 np.sub(减法), np.maximum, np.divide(除法)。maximum 从两个等长的数组 opn 和 close 中,逐元素比较并取出最大的那一个,组成一个新的数组,该数组也与 opn, close 等长。

如果要求下影线长度,则可以使用 minimum。


版权声明 本课程全部文字、图片、代码、习题等所有材料,除声明引用外,均由作者本人开发。所有草稿版本均通过第三方 git 服务进行管理,作为拥有版权的证明。未经书面作者授权,请勿引用。
本文写作时,少量代码及文本参考了通义灵码生成内容。


  1. https://zh.wikipedia.org/wiki/%E9%97%B0%E7%A7%92 

Numpy核心语法[5]

“日期和时间的处理从来都不简单。时区、夏令时、闰秒等问题让时间计算变得复杂。Numpy 提供了高效的日期时间处理工具,帮助我们轻松应对这些挑战。”


1. 日期和时间

一些第三方数据源传递给我们的行情数据,常常会用字符串形式,或者整数(从 unix epoch time 起)格式来表示行情的时间。比如,akshare 和 tushare 许多接口给出的行情数据就是字符串格式;而 QMT 很多时候,会将行情时间用整数表示。掌握这些格式与 Numpy 的日期时间格式转换、以及 Numpy 到 Python 对象的时间日期转换是非常有必要的。

但是在任何编程语言中,日期和时间的处理从来都不简单。

Info

很少有程序员/研究员了解这一点:日期和时间并不是一个数学上或者物理上的一个客观概念。时区的划分、夏令时本身就是一个政治和法律上的概念;一些地方曾经使用过夏令时,后来又取消了这种做法。其次,关于闰秒 [^闰秒] 的决定,也并不是有章可循的,它是由一个委员会开会来临时决定的。这种决定每年做一次。所有这些决定了我们无法通过一个简单的数学公式来计算时间及其变化,特别是在时区之间的转换。

关于时间,首先我们要了解有所谓的 timezone aware 时间和 timezone naive 时间。当我们说到晚上 8 时开会时,这个时间实际上默认地包含了时区的概念。如果这是一个跨国会议,但你在通知时不告诉与会方时区,就会导致其它人无法准时出席 -- 他们将会在自己时区的晚上 8 时上线。

如果一个时间对象不包含时区,它就是 timezone naive 的;否则,它是 timezone aware 的。但这只是针对时间对象(比如,Python 中的 datetime.datetime)才有的意义;日期对象(比如,Python 中的 datetime.date)是没有时区的。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import pytz
import datetime

# 通过 DATETIME.NOW() 获得的时间没有时区信息
# 返回的是标准时间,即 UTC 时间,等同于调用 UTCNOW()
now = datetime.datetime.now()
print(f"now() without param: {now}, 时区信息{now.tzinfo}")

now = datetime.datetime.utcnow()
print(f"utcnow: {now}, 时区信息{now.tzinfo}")

# 构造 TIMEZONE 对象
cn_tz = pytz.timezone('Asia/Shanghai')
now = datetime.datetime.now(cn_tz)
print(f"现在时间{now}, 时区信息{now.tzinfo}")
print("现在日期:", now.date())

try:
    print(now.date().tzinfo)
except AttributeError:
    print("日期对象没有时区信息")

上述代码将依次输出:

1
2
3
4
5
now() 不带参数2024-05-19 11:03:41.550328, 时区信息 None
utcnow: 2024-05-19 11:03:41.550595, 时区信息 None
现在时间 2024-05-19 19:03:41.550865+08:00, 时区信息 Asia/Shanghai
现在日期2024-05-19
日期对象没有时区信息

不过,限于篇幅,我们对时间问题的介绍只能浅尝辄止。在这里,我们主要关注在 Numpy 中,日期/时间如何表示,它们彼此之间如何比较、转换,以及如何与 Python 对象进行比较和转换。

在 Numpy 中,日期/时间总是用一个 64 位整数(np.datetime64)来表示,此外,还关联了一个表示其单位(比如,纳秒、秒等)的元数据结构。np.datetime64是没有时区概念的。

1
2
3
tm = np.datetime64('1970-01-01T00:00:00')
print(tm)
print(tm.dtype)

这将显示为:

1
2
1970-01-01T00:00:00
datetime64[s]

这里的[s]就是我们前面所说的时间单位。其它常见单位还有[ms][us][ns]等等。

除了从字符串解释之外,我们还可以直接将 Python 对象转换成np.datetime64,反之亦然:

1
2
3
4
5
tm = np.datetimet64(datetime.datetime.now())
print(tm)

print(tm.item())
print(tm.astype(datetime.datetime))

下面我们来看看如何实现不同格式之间的批量转换。这在处理 akshare, tushare 或者 QMT 等第三方数据源提供的行情数据时,非常常见。

首先我们构造一个时间数组。顺便提一句,这里我们将使用np.timedelta64这个时间差分类型:

1
2
3
now = np.datetime64(datetime.datetime.now())
arr = np.array([now + np.timedelta64(i, 'm') for i in range(3)])
arr

输出结果如下:

1
2
3
4
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
     dtype='datetime64[us]')

我们可以通过np.datetime64.astype()方法将时间数组转换为 Python 的时间对象:

1
2
3
4
time_arr = arr.astype(datetime.datetime)

# 转换后的数组,每个元素都是 TIMEZONE NAIVE 的 DATETIME 对象
print(type(time_arr[0]))

1
2
3
4
5
6
# !!! 技巧
# 如何把 NP.DATETIME64 数组转换为 PYTHON DATETIME.DATE 数组?
date_arr = arr.astype('datetime64[D]').astype(datetime.date)
# 或者 -- 两者的容器不一样
date_arr = arr.astype('datetime64[D]').tolist()
print(type(date_arr[0]))

这里的关键是,我们之前生成的arr数组,其元素类型为np.datetime64[us]。它到 Python datetime.date的转换将损失精度,所以 Numpy 要求我们显式地指定转换类型。

如何将以字符串表示的时间数组转换为 Numpy datetime64 对象数组呢?答案仍然是 astype() 方法。

1
2
3
4
5
6
# 将时间数组转换为字符串数组
str_arr_time = arr_time.astype(str)
print(str_arr_time)

# 再将字符串数组转换为 DATETIME64 数组,精度指定为 D
str_arr_time.astype('datetime64[D]')

显示结果为:

1
2
3
4
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
       dtype='datetime64[us]')

1
2
3
4
array([
    '2024-05-19', 
    '2024-05-19'],               
    dtype='datetime64[D]')

最后,我们给一个 QMT 获取交易日历后的格式转换示例。在 QMT 中,我们通过get_trading_dates来获取交易日历,该函数返回的是一个整数数组,每个元素的数值,是从 unix epoch 以来的毫秒数。

我们可以通过以下方法对其进行转换:

1
2
3
4
import numpy as np

days = get_trading_dates('SH', start_time='', end_time='', count=10)
np.array(days, dtype='datetime64[ms]').astype(datetime.date)

QMT 官方没有直接给出交易日历转换方案,但给出åå了如何将 unix epoch 时间戳转换为 Python 时间对象(但仍以字符串表示):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import time

def conv_time(ct):
    # conv_time(1476374400000) --> '20161014000000.000'
    local_time = time.localtime(ct / 1000)
    data_head = time.strftime('%Y%m%d%H%M%S', local_time)
    data_secs = (ct - int(ct)) * 1000
    time_stamp = '%s.%03d' % (data_head, data_secs)
    return time_stamp

conv_time(1693152000000)

我们需要对每一个数组元素使用上述解析方法。官方方案的优点是不依赖任何三方库。不过,没有量化程序能离开 Numpy 库,所以,我们的方案并未增加第三方库的依赖。


2. 字符串操作

你的数据源、或者本地存储方案很可能使用 Numpy Structured Array 或者 Rec Array 返回证券列表。很显然,证券列表中一定会包括字符串,因为它一定会存在证券代码列和证券名称列。有一些还会返回证券的地域属性和其它属性,这也往往是字符串。

对证券列表,我们常常有以下查询操作:

  1. 获取在某个板块上市的股票列表,比如,北交所、科创板和创业板与主板的个股交易规则上有一些不同,因此,我们的策略很可能需要单独为这些板块构建。这就有了按板块过滤证券列表的需要。也可能我们要排除 ST,刚上市新股。这些都可以通过字符串操作来实现。
  2. 市场上有时候会出现魔幻的名字炒作。比如龙年炒龙字头(或者含龙的个股)、炒作“东方”、炒作“中”字头。作为量化人,参与这样的炒作固然不可取,但我们要拥有分析市场、看懂市场的能力。

Numpy 中的大多数字符串操作都封装在 numpy.char 这个包下面。它主要提供了一些用于格式化的操作(比如左右填充对齐、大小写转换等)、查找和替换操作。

下面的代码展示了如何从证券列表中过滤创业板:

1
2
import numpy as np
import numpy.char as nc

1
2
3
4
5
6
7
8
9
# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'S10'), ('name', 'S10'), ('ipo_date', 'datetime64[D]')])

def get_cyb(arr):
    mask = np.char.startswith(arr["symbol"], b"30")
    return arr[mask]

Question

我们在查找创业板股票时,使用的是 b"30"来进行匹配。为何要用 b"30"而不是"30"?

注意第 11 行,我们要通过np.char.startswith()来使用startswith函数。任何一个 numpy array 对象都没有这个方法。

".SZ"是我们的数据源给股票编制的交易所代码。不同的数据源,可能使用不同的交易所代码。比如,聚宽数据源会使用.XSHG 表示上交所,.XSHE 表示深交所。现在,如果我们要将上述代码转换为聚宽的格式,应该如何操作?

1
2
3
4
5
# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'U10'), ('name', 'U10'), ('ipo_date', 'datetime64[D]')])

1
2
3
4
5
6
7
8
9
def translate_exchange_code(arr):
    symbols = np.char.replace(arr["symbol"], ".SH", ".XSHG")
    print(symbols)
    symbols = np.char.replace(symbols, ".SZ", ".XSHE")

    arr["symbol"] = symbols
    return arr

translate_exchange_code(arr)

这一次,我们把 symbol 和 name 的定义改为 Unicode 型,以避免我们查找时,要输入像 b"30"这样的字面量。

但输出的结果可能让人意外,因为我们将得到这样的输出:

1
2
3
4
array([('600000.XSH', '中国平安', '1997-08-19'),
       ('000001.XSH', '平安银行', '1997-08-19'),
       ('301301.XSH', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

Question

发生了什么?我们得到了一堆以".XSH"结尾的 symbol,它们本应该是"600000.XSHG"这样的字符串。错在哪里,又该如何修改?

在上面的示例中,如果我们把替换字符串改为空字符串,就实现了删除操作。这里就不演示了。

char 模块还提供了字符串相等比较函数equal:


1
2
3
4
arr = array([('301301.SZ', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

arr[np.char.equal(arr["symbol"], "301301.SZ")]

在这个特殊的场景下,我们也可以直接使用以下语法:

1
arr[arr["symbol"] == "301301.SZ"]

Tip

np.char 下的函数很多,如何记忆?实际上,这些函数多数是 Python 中 str 的方法。如果你熟悉 Pandas,就会发现 Pandas 中也有同样的用法。因此,像upper, lower, strip这样的str函数,你可以直接拿过来用。

Numpy 中的字符串函数另一个比较常用的场景,就是执行格式化。你可以通过ljust, 'center', rjust在显示一个数组前,将它们的各列数据进行左右空格填充,这样,输出时就可以比较整齐。

Question

2024 年 5 月 10 日起,南京化纤走出 7 连板行情,短短 7 日,股价翻倍。市场上还有哪些名字中包含化纤的个股?它们的涨跌是否存在相关性或者跨周期相关性?


Numpy核心语法[4]

“随机数和采样是量化中的高频操作。通过 Numpy 的 random 模块,我们可以轻松生成符合正态分布的收益率数组,并利用 np.cumprod() 计算价格走势,快速模拟资产的夏普率与价格关系。”


1. 随机数和采样

随机数和采样是量化中的高频使用的操作。在造数据方面非常好用。我们在前面的示例中,已经使用过了 normal() 函数,它是来自 numpy.random 模块下的一个重要函数。借由这个函数,我们就能生成随机波动、但总体上来看又是上涨、下跌或者震荡的价格序列。

Tip

我们会在何时需要造价格序列?除了前面讲过的例子外,这里再举一例:我们想知道夏普为SS的资产,它的价格走势是怎么样的?价格走势与夏普的关系如何?要回答这个问题,我们只能使用“蒙”特卡洛方法,造出若干模拟数据,然后计算其夏普并绘图。此时我们一般选造一个符合正态分布的收益率数组,然后对它进行加权(此时即可算出夏普),最后通过 np.cumprod() 函数计算出价格走势,进行绘图。

我们通过一个例子来说明夏普与股价走势之间的关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import numpy as np
from empyrical import sharpe_ratio
import matplotlib.pyplot as plt

returns_ = np.random.normal(0, 0.02, size=100)
legend = []

for alpha in (-0.01, 0, 0.01):
    returns = returns_ + alpha
    prices = np.cumprod(returns + 1)
    sharpe = sharpe_ratio(returns)
    _ = plt.plot(prices)
    legend.append(f"{sharpe:.1f}")

1
2
lines = plt.gca().lines
plt.legend(lines, legend)

从绘制的图形可以看出,当 alpha 为 1%时,夏普率可达 8.2。国内优秀的基金经理可以在一年内,做到 2~3 左右的夏普率。大家可以调整 alpha 这个参数,看看 alpha 与夏普率的关系。

50%

1.1. The legacy: np.random module

迄今为止,我们在网上看到的多数关于 numpy random 的教程,都是使用的 np.random module 下面的函数。除了 normal 方法之外,random 包中还有以下函数:

函数 说明
randint(a,b,shape) 生成在区间 (a,b) 之间,形状为 shape 的随机整数数组。
rand(shape) 生成 shape 形状的随机数组,使用 [0,1) 区间的均匀分布来填充。
random(shape) 生成 shape 形状的随机数组,使用均匀分布填充
randn(d1, d2, ...) 生成 shape 形状的随机数组,使用正态分布来填充。
standard_normal(shape) 生成 shape 形状的随机数组,使用标准正态分布来填充。

函数 说明
normal(loc,scale,shape) 生成 shape 形状的随机数组,使用正态分布来填充,loc 是均值,scale 是标准差。
choice(a,size,replace,p) 从 a 中随机抽取 size 个元素,如果 replace=True, 则允许重复抽取,否则不允许重复抽取。p 表示概率,如果 p=None, 则表示每个元素等概率抽取。
shuffle(a) 将 a 中的元素随机打乱。
seed(seed) 设置随机数种子,如果 seed=None, 则表示使用系统时间作为随机数种子。

可以看出,numpy 为使用同一个功能,往往提供了多个方法。我们记忆这些方法,首先是看生成的随机数分布。最朴素的分布往往有最朴素的名字,比如,rand, randint 和 random 都用来生成均匀分布,而 normal, standard_normal 和 randn 用来生成正态分布。

除了均匀分布之外,Numpy 还提供了许多著名的分布的生成函数,比如 f 分布、gama 分布、hypergeometric(超几何分布),beta, weibull 等等。

在同一类别中,numpy 为什么还要提供多个函数呢?有一些是为了方便那些曾经使用其它知名库(比如 matlab) 的人而提供的。

《因子投资与机器学习策略》喊你上课啦!

面向策略研究员的专业课程,涵盖因子挖掘因子检验和基于机器学习的策略开发三大模块,构建你的个人竞争优势!

    课程助教: 宽粉
  1. 全网独家精讲 Alphalens 分析报告,助你精通因子检验和调优。
  2. 超 400 个独立因子,分类精讲底层逻辑,学完带走 350+ 因子实现。
  3. 课程核心价值观:Learning without thought is labor lost. Know-How & Know-Why.
  4. 三大实用模型,奠定未来研究框架1:聚类算法寻找配对交易标的(中性策略核心)、基于 XGBoost 的资产定价、趋势交易模型。
  5. 领先的教学手段:SBP(Slidev Based Presentation)、INI(In-place Notebook Interaction)和基于 Nbgrader(UCBerkley 使用中)的作业系统。

1. 示例模型思路新颖。未来一段时间,你都可以围绕这些模型增加因子、优化参数,构建出领先的量化策略系统。

randn 就是这样的例子,它是 matlab 中一个生成正态随机分布的函数,现在被 numpy 移植过来了。我们这里看到的另一个函数,rand 也是这样。而对应的 random,则是 Numpy 按自己的 API 风格定义的函数。

choice 方法在量化中有比较具体的应用。比如,我们可能想要从一个大的股票池中,随机抽取 10 只股票先进行一个小的试验,然后根据结果,再考虑抽取更多的股票。

seed 函数用来设置随机数生成器的种子。在进行单元测试,或者进行演示时(这两种情况下,我们都需要始终生成相同的随机数序列)非常有用。


1.2. New Style: default_rng

我们在上一节介绍了一些随机数生成函数,但没有介绍它的原理。Numpy 生成的随机数是伪随机数,它们是使用一个随机数生成器(RNG)来生成的。RNG 的输出是随机的,但是相同的输入总是会生成相同的输出。我们调用的每一个方法,实际上是在这个序列上的一个抽取动作(根据输入的 size/shape)。

在 numpy.random 模块中,存在一个全局的 RNG。在我们调用具体的随机函数时,实际上是通过这个全局的 RNG 来产生随机数的。而这个全局的 RNG,总会有人在它之上调用 seed 方法来初始化。这会产生一些问题,因为你不清楚何时、在何地、以哪个参数被人重置了 seed。

由于这个原因,现在已经不推荐直接使用 numpy.random 模块中的这些方法了。更好的方法是,为每一个具体地应用创建一个独立的 RNG,然后在这个对象上,调用相应的方法:

1
2
rng = np.random.default_rng(seed=123)
rng.random(size=10)

rng 是一个 Random Generator 对象,在初始化时,我们需要给它传入一个种子。如果省略,那么 Numpy 会使用系统时间作为种子。

rng 拥有大多数前一节中提到的方法,比如 normal, f, gamma 等;但从 matlab 中移植过来的方法不再出现在这个对象上。另外,randint 被 rng.integers 替代。


除此之外,default_rng 产生的随机数生成器对象,在算法上采用了 PCG64 算法,与之前版本采用的算法相比,它不仅能返回统计上更好的随机数,而且速度上也会快 4 倍。

Warning

在 numpy 中还存在一个 RandomState 类。它使用了较慢的梅森扭曲器生成伪随机数。现在,这个类已经过时,不再推荐使用。

1.3. 数据集平衡示例

我们已经介绍了 choice 的功能,现在我们来举一个例子,如何使用 choice 来平衡数据集。

在监督学习中,我们常常遇到数据不平衡的问题,比如,我们希望训练一个分类器,但是训练集的类别分布不均衡。我们可以通过 choice 方法对数据集进行 under sampling 或者 over sampling 来解决这个问题。

为了便于理解,我们先生成一个不平衡的训练数据集。这个数据集共有 3 列,其中前两列是特征(你可以想像成因子特征),第三列则是标签。

1
2
3
4
5
6
import pandas as pd
import numpy as np

rng = np.random.default_rng(seed=42)
x = rng.random((10,3))
x[:,-1] = rng.choice([0,1], len(x), p=[0.2, 0.8])

我们通过下面的方法对这个数据集进行可视化,以验证它确实是一个不平衡的数据集。

1
2
df = pd.DataFrame(x, columns=['factor1', 'factor2', 'label'])
df.label.value_counts().plot(kind='bar')

运行结果为:

50%

要在此基础上,得到一个新的平衡数据集,我们有两种思路,一种是 under sampling,即从多数类的数据中抽取部分数据,使得它与最小分类的数目相等;另一种是 over sampling,即从少数类的数据中复制部分数据,使得它与最大的类的数目相等。

下面的例子演示了如何进行 under sampling:

1
labels, counts = np.unique(x[:,-1], return_counts=True)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 最小分类的标签
min_label = labels[np.argmin(counts)]

# 最小分类样本的数量,作为 UNDER SAMPLING 的样本数量
min_label_count = np.min(counts)

# 最小分类无须抽取,全部提取
results = [
    x[x[:,-1] == min_label]
]

# 对其它分类标签进行遍历,需要先剔除最小分类
for label in np.delete(labels, np.argmin(counts)):
    sampled = rng.choice(x[x[:,-1]== label], min_label_count)
    results.append(sampled)

np.concatenate(results)

这段代码先是找到最小分类及它的数量,然后遍历每个标签,再通过 rng.choice 对其它分类随机抽取最小分类的数量,最后把所有的子集拼接起来。

这段示例代码可用以多个标签的情况。如果要进行 over sampling,只要把其中的 min 换成 max 就可以了。

2. IO 操作

我们直接使用 Numpy 读写文件的场合并不多。提高 IO 读写性能一直都不是 Numpy 的重点,我们也只需要稍加了解即可。


2.1. 读写 CSV 文件

Numpy 可以从 CSV 格式的文本文件中读取数据,主要有以下方法:

api 描述
loadtxt 解析文本格式的表格数据
savetxt 将数据保存为文本文件
genfromtxt 同上,但允许数据中有缺失值,提供了更高级的用法
recfromtxt 是 genfromtxt 的快捷方式,自动推断为 record array
recfromcsv 同上,如果分隔符为逗号,无须额外指定

我们通过下面的示例简单演示一下各自的用法:

1
2
3
4
5
6
7
import io
import numpy 

buffer = io.StringIO("""1,2""")

# 默认情况下,LOADTXT 只能读取浮点数
numpy.loadtxt(buffer, delimiter=",")

这会输出数组array([1., 2.])

1
2
3
4
buffer = io.StringIO("""1,2,hello""")

# 通过指定 DTYPE 参数,可以读取其它类型
numpy.loadtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

这样我们将得到一个 Structured Array,其中第三列为字符串类型。如果我们不指定 dtype 参数,那么 loadtxt 将会解析失败。

1
2
3
4
buffer = io.StringIO("""
1,2,hello
""")
numpy.genfromtxt(buffer, delimiter=",")

这一次我们使用了 genfromtxt 来加载数据,但没有指定 dtype 参数,genfromtxt 会将非数字列解析为 nan。因此,这段代码将输出:`array([1., 2., nan])

现在,我们也给 genfromtxt 加上 dtype 参数:

1
2
3
buffer = io.StringIO("""1,2,hello""")

numpy.genfromtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

此时我们得到的结果是:array((1, 2., 'hello'), dtype=[('age', '<i4'), ('score', '<f4'), ('name', '<U8')])。注意它是 Structured Array。

recfromtxt 则不需要 dtype, 会自动推断数据类型。

1
2
3
buffer = io.StringIO("""1,2,hello""")

numpy.recfromtxt(buffer,delimiter=",")

这段代码输出为rec.array((1, 2, b'hello'), dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', 'S5')])。如果推断不准确,我们也可以自己加上 dtype 参数。

如果我们使用 recfromcsv,则连 delimiter 参数都可以省掉。

1
2
buffer = io.StringIO("""age,score,name1,2,hello""")
numpy.recfromcsv(buffer)

输出跟上一例的结果一样。

出于速度考虑,我们还可以使用其它库来解析 CSV 文件,再转换成为 numpy 数组。比如:

1
2
3
4
5
# 利用 CSV.READER() 来解析,比 NUMPY 快 8 倍
np.asarray(list(csv.reader()))

# 利用 PANDAS 来解析,比 NUMPY 快 22 倍
pd.read_csv(buffer).to_records()

2.2. 读写二进制文件

如果我们不需要与外界交换数据,数据都是自产自销型的,也可以使用二进制文件来保存数据。

使用 numpy.save 函数来将单个数组保存数据为二进制文件,使用 numpy.load 函数来读取 numpy.save 保存的数据。这样保存的文件,文件扩展名为.npy。


如果要保存多个数组,则可以使用 savez 命令。这样保存的文件,文件扩展名为.npz。如果有更复杂的需求,可以使用 Hdf5,pyarrow 等库来进行保存数据。

Numpy处理表格数据

一开始,Numpy 的数组只能存放同质的元素,即元素必须有相同的数据类型。但对表格类数据而言,它们往往是由一条条记录组成的,而这些记录,又是由不同数据类型的数据组成的。

如何做到在Numpy中也能处理大规模的表格数据呢?


1. Structured Array

为了满足这种需求,Numpy 扩展出一种名为 Structured Array 的数据格式。它是一种 一维数组,每一个元素都是一个命名元组。

我们可以这样声明一个 Structured Array:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import numpy as np
import datetime
dtypes = [
        ("frame", "O"),
        ("code", "O"),
        ("open", "f4"),
        ("high", "f4"),
        ("low", "f4"),
        ("close", "f4")
    ]
secs = np.array (
    [
        (datetime.date (2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9),
        (datetime.date (2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9),
    ], dtype = dtypes
)

在这个数据结构中,共有 6 个字段,它们的名字和类型通过 dtype 来定义。这是一个 List [Tuple] 类型。在初始化数据部分,它也是一个 List [Tuple]。


Warning

初学者很容易犯的一个错误,就是使用 List [List] 来初始化 Numpy Structured Array,而不是 List [Tuple] 类型。这会导致 Numpy 在构造数组时,对应不到正确的数据类型,报出一些很奇怪的错误。
比如,下面的初始化是错误的:

1
2
3
4
secs = np.array ([
    [datetime.date (2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9],
    [datetime.date (2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9]
], dtype=dtypes)
这段代码会报告一个难懂的 "Type Error: float () argument must be a string or ..."

我们使用上一节学过的 inspecting 方法来查看 secs 数组的一些特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
print (f"secs 的维度是 {secs.ndim}")
print (f"secs 的 shape 是 {secs.shape}")
print (f"secs 的 size 是 {secs.size}")
print (f"secs 的 length 是 {len (secs)}")

print (f"secs [0] 的类型是 {type (secs [0])}")
print (f"secs [0] 的维度是 {secs [0].ndim}")
print (f"secs [0] 的 shape 是 {secs [0].shape}")
print (f"secs [0] 的 size 是 {secs [0].size}")
print (f"secs [0] 的 length 是 {len (secs [0])}")

可以看出,secs 数组是 一维数组,它的 shape (2,) 也正是一维数组的 shape 的表示法。前一节还介绍过这几个属性的关系,大家可以自行验证下是否仍然得到满足。


Tip

这里 size 仍然等于 shape 各元素的取值之积。注意对 secs 而言,它的 size 与 length 是相等的,但对 secs [0] 而言,它的 size 和 length 是不相等的。我们在开发 Zillionare 量化框架时时,遇到过由此产生的一个 bug。

但 secs 的元素类型则是 numpy.void,它在本质上是一个 named tuple,所以,我们可以这样访问其中的任一字段:

1
2
3
4
print (secs [0]["frame"])

# 不使用列名(字段名),使用其序号也是可以的
print (secs [0][0])

我们还可以以列优先的顺序来访问其中的一个 “单元格”:

1
print (secs ["frame"][0])

对表格数据,遍历是很常见的操作,我们可以这样遍历:

1
2
for (frame, code, opn, high, low, close) in secs:
    print (frame, code, opn, high, low, close)

Numpy structured array 在这部分的语法要比 Pandas 的 DataFrame 易用许多。我们在后面介绍 Pandas 时,还会提及这一点。


Warning

修改 cell 值时,索引的先后不能互换:

1
2
3
4
5
6
7
8
    data = np.array ([("aaron", "label")], dtype=[("name", "O"), ("label", "O")])
    filter = data ["name"] == "aaron"

    new_label = "blogger"
    data ["label"][filter] = new_label

    # this won't change
    data [filter]["label"] = new_label
这里的最后一行,并不会生效。

2. 运算类

2.1. 比较和逻辑运算

我们在上一节介绍定位、查找时,已经接触到了数据比较,比如:arr > 1。它的结果将数组中的每一个元素都与 1 进行比较,并且返回一个布尔型的数组。

现在,我们要扩充比较的指令:

函数 描述
all 如果数组中的元素全为真,返回 True。可用以判断一组条件是否同时成立。
any 如果数组中至少有一个元素为真,则返回 True。用以判断一组条件是否至少有一个成立
isclose 判断两个数组中的元素是否一一近似相等,返回所有的比较结果
allclose 判断两个数组中的元素是否全部近似相等

函数 描述
equal 判断两个数组中的元素是否一一相等,返回所有的比较结果。
not_equal 一一判断两个数组中的元素是否不相等,返回所有的比较结果
isfinite 是否为数字且不为无限大
isnan 测试是否为非数字
isnat 测试对象是否不为时间类型
isneginf 测试对象是否为负无限大
isposinf 测试对象是否为正无限大

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 开启多行输出模式
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

np.random.seed (78)
returns = np.random.normal (0, 0.03, size=4)
returns
# 判断是否全下跌
np.all (returns <= 0)
np.any (returns <= 0)

# 模拟一个起始价格为 8 元的价格序列
prices = np.cumprod (1+returns) * 8

# 对应的涨停价如下
buy_limit_prices = [8.03, 8.1, 8.1, 8.3]

# 判断是否涨停
np.isclose (prices, buy_limit_prices, atol=1e-2)

Tip

为什么需要存在判断近似相等的函数?这是因为,数字分为整型和浮点型。凡是带小数点的数字,都可以看成浮点型。许多浮点数不能精确表达,所以它们是不会相等的,只能比较两个浮点数的差值,如果差值的绝对值小于某个可以接受的小数,才能认为这两个数近似相等。

因此,如果我们拿到了个股的收盘价和涨停价,要判断此时个股有没有涨停,就只能用 isclose 来进行比较,而不能使用 equal。

参数 atol 表示绝对误差,表示两个浮点数之间的差值小于这个值,就可以认为这两个数近似相等。

除了判断一个数组中的元素要么都为 True,要么至少一个为 True 之外,有时候我们还希望进行模糊一点的判断,比如,如果过去 20 天中,超过 60% 的是收阳线,此时我们可以用 np.count_nonzero,或者 np.sum 来统计数组中为真的情况:

1
2
np.count_nonzero (returns > 0)
np.sum (returns > 0)

在上一节进行比较的示例中,我们都只使用了单个条件。如果我们要按多个条件的组合进行查找,就需要依靠逻辑运算来实现。

在 Numpy 中,逻辑运算既可以通过函数、也可以通过运算符来完成:


函数 运算符 描述 python 等价物
logical_and & 执行逻辑与操作 and
logical_or | 执行逻辑或操作 or
logical_not ~ 执行逻辑或操作 not
logical_xor '^' 执行逻辑异或操作 xor

Tip

如果你对编程语言不是特别熟悉,就会难以理解这里的布尔运算,但它们在量化中运用非常广泛,并且在后面讲 pandas 时,我们还会遇到

逻辑与 a&b 的含义是, 只有当条件 a 与 b 都为真时,表达式才成立 逻辑或 a|b 的含义是,a 与 b 之中,任何一个为真即成立 逻辑非~b 的含义是,如果 b 为真,则表达式不成立,反之则成立 逻辑异或 a ^ b 的含义是,只有两个不同时才为真。

逻辑运算有什么用呢?比如我们在选股时,有以下表格数据:

股票 pe mom
AAPL 30.5 0.1
GOOG 32.3 0.3
TSLA 900.1 0.5
MSFT 35.6 0.05

上述表格可以用 Numpy 的 Structured Array 来表示为:

1
2
3
4
5
6
tickers = np.array ([
    ("APPL", 30.5, 0.1),
    ("GOOG", 32.3, 0.3),
    ("TSLA", 900.1, 0.5),
    ("MSFT", 35.6, 0.05)
], dtype=[("ticker", "O"), ("pe", "f4"), ("mom", "f4")])

现在,我们要找出求 PE < 35, 动量 (mom) > 0.2 的记录,那么我们可以这样构建条件表达式:

1
(tickers ["pe"] < 35) & (tickers ["mom"] > 0.2)

Numpy 会把 pe 这一列的所有值跟 35 进行比较,然后再与 mom 与 0.2 比较的结果进行逻辑与运算,这相当于:

1
np.array ((1,1,0,0)) & np.array ((0, 1, 1, 0))

在 Numpy 中,True 与 1 的值在做逻辑运算时是相等的;0 与 False 也是。

如果不借助于 Numpy 的逻辑操作,我们就要用 Python 的逻辑操作。很不幸,这必须使用循环。如果计算量大,这将会比较耗时间。

Tip

这里解释下异或操作。它比较拧巴。如果两个操作数取值一样,那么结果为 False;否则为 True,非常不团结。

在量化中使用异或操作的例子仍然最可能来自于选股。比如,如果我们要求两个选股条件,只能有一个成立时,才买入;否则不买入,就可以使用异或运算。


Tip

在多个条件中,投资者为什么会想要只有一个条件成立?这可能是因为他们认为这两个条件可能互相冲突,或者他们想要在两种投资策略之间进行平衡。

2.2. 集合运算

在交易中,我们常常要执行调仓操作。做法一般是,选确定新的投资组合,然后与当前的投资组合进行比较,找出需要卖出的股票,以及需要买入的股票。这个操作,就是集合运算。在 Python 中,我们一般是通过 set 语法来实现。

在 Numpy 中,我们可以使用通过以下方法来实现集合运算:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import numpy as np

# 创建两个一维数组
x = np.array ([1, 2, 3, 4, 5])
y = np.array ([4, 5, 6, 7, 8])

# 计算交集
intersection = np.intersect1d (x, y)
print ("Intersection (交集):", intersection)

# 计算并集
union = np.union1d (x, y)
print ("Union (并集):", union)

diff = np.setdiff1d (x, y)
print ("x - y:", diff)


此外,我们还可能使用 in1d (a1, a2) 方法来判断 a1 中的元素是否都在 a2 中存在。比如,在调仓换股中,如果当前持仓都在买入计划中,则不需要执行调仓。

2.3. 数学运算和统计

Numpy 中数学相关的运算有线性代数运算(当然还有基本代数运算)、统计运算、金融指标运算等等。

2.3.1. 线性代数

线性代数在量化中有重要用途。比如,在现代资产组合理论(MPT)中,我们要计算资产组合收益率及协方差,都要使用矩阵乘法。大家可以参考 投资组合理论与实战 系列文章,下面是其中的一段代码:

1
2
3
...
cov = np.cov (port_returns.T)
port_vol = np.sqrt (np.dot (np.dot (weights, cov), weights.T))

矩阵乘法是线性代数中的一个核心概念,它涉及到两个矩阵的特定元素按照规则相乘并求和,以生成一个新的矩阵。具体来说,如果有一个矩阵 A 为 m×nm \times n 维,另一个矩阵 B 为 n×pn \times p 维,那么它们的乘积 C=ABC = AB 将会是一个 m×pm \times p 维的矩阵。乘法的规则是 A 的每一行与 B 的每一列对应元素相乘后求和。

下面通过一个具体的例子来说明矩阵乘法的过程:

假设我们有两个矩阵 A 和 B:


A=[23 14] A = \begin {bmatrix} 2 & 3 \ 1 & 4 \end {bmatrix} B=[12 31] B = \begin {bmatrix} 1 & 2 \ 3 & 1 \end {bmatrix} 要计算 AB,我们遵循以下步骤:

取 A 的第一行 (2,3)(2, 3) 与的第一列 (1,3)(1,3) 相乘并求和得到 C11=[2×1+3×3=11]C_{11} = [2\times1 + 3\times3 = 11]

同理,取 A 的第一行与 B 的第二列 (2,1)(2, 1) 相乘并求和得到 C12=[2×2+3×1=7]C_{12} = [2\times2 + 3\times1 = 7]

取 A 的第二行 (1,4)(1, 4) 与 B 的第一列相乘并求和得到 C21=[1×1+4×3=13]C_{21} = [1\times1 + 4\times3 = 13]

取 A 的第二行与 B 的第二列相乘并求和得到 C22=[1×2+4×1=5]C_{22} = [1\times2 + 4\times1 = 5]

因此,矩阵 C = AB 为:

C=[117136 ] C = \begin {bmatrix} 11 & 7 \\ 13 & 6 \ \end {bmatrix}

与代数运算不同,矩阵乘法不满足交换律,即一般情况下 ABBAAB \neq BA

在 Numpy 中,我们可以使用 np.dot () 函数来计算矩阵乘法。


上述示例使用 numpy 来表示,即为:

1
2
3
4
A = np.array ([[2,3],[1,4]])
B = np.array ([[1,2],[3,1]])

np.dot (A, B)

最终我们将得到与矩阵 C 相同的结果。

除此之外,矩阵逆运算 (np.linalg.inv) 在计算最优投资组合权重时,用于求解方程组,特征值和特征向量 (np.linalg.eig, np.linalg.svd) 在分析资产回报率的主成分,进行风险分解时使用。

2.3.2. 统计运算

常用的统计运算包括:

函数 描述
np.mean 计算平均值
np.median 计算中位数
np.std 计算标准差
np.var 计算方差
np.min 计算最小值
np.max 计算最大值
np.percentile 用于计算历史数据的分位点
np.quantile 用于计算历史数据的分位数,此函数与 percentile 功能相同
np.corr 用于计算两个变量之间的相关性

np.percentile 与 np.quantile 功能相同,都是用于计算分位数。


两者在参数上略有区别。当我们对同一数组,给 quantile 传入分位点 0.25 时,如果给 percentile 传入分位点 25 时,两者的结果将完全一样。也就是后者要乘以 100。在量化交易中,quantile 用得可能会多一些。

Tip

在 pandas 中存在 quantile 函数,但没有 percentile 函数。

np.percentile(或者 np.quantile)的常见应用是计算 25%, 50% 和 75% 的分位数。用来绘制箱线图(Boxplot)。

此外,我们也常用它来选择自适应参数。比如,在 RSI 的应用中,一般推荐是低于 20(或者 30)作为超卖,此时建议买入;推荐是高于 80(或者 70)作为超买,此时建议卖出。但稍微进行一些统计分析,你就会发现这些阈值并不是固定的。如果我们以过去一段时间的 RSI 作为统计,找出它的 95% 分位作为卖点,15% 作为买点,往往能得到更好的结果。

2.3.3. 量化指标的计算

有一些常用的量化指标的计算,也可以使用 Numpy 来完成,比如,计算移动平均线,就可以使用 Numpy 提供的 convolve 函数。

1
2
3
4
5
import numpy as np
def moving_average (data, window_size):
    return np.convolve(data,
                       np.ones(window_size)/window_size, 
                       'valid')

当然,很多人习惯使用 talib,或者 pandas 的 rolling 函数来进行计算。convolve(卷积)是神经网络 CNN 的核心,正是这个原因,我们这里提一下。

np.convolve 的第二个参数,就是卷积核。这里我们是实现的是简单移动平均,所以,卷积核就是一个由相同的数值组成的数组,它们的长度就是窗口大小,它们的和为 1。

如果我们把卷积核替换成其它值,还可以实现 WMA 等指标。从信号处理的角度看,移动平均是信号平滑的一种,使用不同的卷积核,就可以实现不同的平滑效果。

在量化中,还有一类计算,这里也提一下,就是多项式回归。比如,某两支股票近期都呈上升趋势,我们想知道哪一支涨得更好?这时候我们就可以进行多项式回归,将其拟合成一条直线,再比较它们的斜率。

下面的代码演示了如何使用 Numpy 进行多项式回归。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import numpy as np
import matplotlib.pyplot as plt

returns = np.random.normal (0, 0.02, size=100)
alpha = 0.01
close = np.cumprod (1 + returns + alpha)

a, b = np.polyfit (np.arange (100), close, deg=1)

# 使用 a, b 构建回归线的 y 值
regression_line = a * np.arange (100) + b

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 绘制原始的 close 曲线
plt.figure (figsize=(10, 6))
plt.plot (close, label='Close Price', color='blue')

# 绘制回归线
plt.plot (regression_line, label='Regression Line', color='red', linestyle='--')

# 添加图例、标题和坐标轴标签
plt.title ('Stock Close Price vs Regression Line')
plt.xlabel ('Time Period')
plt.ylabel ('Price')
plt.legend ()

# 显示图表
plt.grid (True)
plt.show ()

这将生成下图:


题图: Photo by Steve Harvey on Unsplash

Numpy核心语法[1]

1. 基本数据结构

NumPy 的核心数据结构是 ndarray(即 n-dimensional array,多维数组)数据结构。这是一个多维度、同质并且大小固定的数组对象。

为了表达记录类型的数据,Numpy又拓展出名为Structured Array的数据结构。


它用一个 void 类型的元组来表示一条记录,从而使得 numpy 也可以用来表达记录型的数据。因此,在 Numpy 中,实际上跟数组有关的数据类型主要是两种。

1. 基本数据结构

前一种数组格式广为人知,我们将以它为例介绍多数 Numpy 操作。而后一种数据格式,在量化中也常常用到,比如,通过聚宽[1]的jqdatasdk获得的行情数据,就允许返回这种数据类型,与 DataFrame 相比,在存取上有不少简便之处。我们将在后面专门用一个章节来介绍。

在使用 Numpy 之前,我们要先安装和导入 Numpy 库:

1
2
# 安装 NUMPY
pip install numpy

一般地,我们通过别名np来导入和使用 numpy:

1
import numpy as np

为了在 Notebook 中运行这些示例时,能更加醒目地显示结果,我们首先定义一个 cprint 函数,它将原样输出提示信息,但对变量使用红色字体来输出,以示区别:


1
2
3
4
5
6
7
8
from termcolor import colored

def cprint(formatter: str, *args):
    colorful = [colored(f"{item}", 'red') for item in args]
    print(formatter.format(*colorful))

# 测试一下 CPRINT
cprint("这是提示信息,后接红色字体输出的变量值:{}", "hello!")

接下来,我们将介绍基本的增删改查操作。

1.1. 创建数组

1.1.1. 通过 Python List 创建

我们可以通过np.array的语法来创建一个简单的数组,在这个语法中,我们可以提供 Python 列表,或者任何具有 Iterable 接口的对象,比如元组。

1
2
arr = np.array([1, 2, 3])
cprint("create a simple numpy array: {}", arr)
1.1.2. 预置特殊数组

很多时候,我们希望 Numpy 为我们创建一些具有特殊值的数组。Numpy 也的确提供了这样的支持,比如:


函数 描述
zeros
zeros_like
创建全 0 的数。zeros_like 接受另一个数组,并生成相同形状和数据类型的 zeros 数组。常用于初始化。以下*_like 类推。
ones
ones_like
创建全 1 的数组
full
full_like
创建一个所有元素都填充为n的数组
empty
empty_like
创建一个空数组
eye
identity
创建单位矩阵
random.random 创建一个随机数组
random.normal 创建一个符合正态分布的随机数组
random.dirichlet 创建一个符合狄利克雷分布的随机数组
arange 创建一个递增数组
linspace 创建一个线性增长数组。与 arange 的区别在于,此方法默认生成全闭区间数组。并且,它的元素之间的间隔可以为浮点数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 创建特殊类型的数组
cprint("全 0 数组:\n{}", np.zeros(3))
cprint("全 1 数组:\n{}", np.ones((2, 3)))
cprint("单位矩阵:\n{}", np.eye(3))
cprint("由数字 5 填充的矩阵:\n{}", np.full((3,2), 5))

cprint("空矩阵:\n{}", np.empty((2, 3)))
cprint("随机矩阵:\n{}",np.random.random(10))
cprint("正态分布的数组:\n{}",np.random.normal(10))
cprint("狄利克雷分布的数组:\n{}",np.random.dirichlet(np.ones(10)))
cprint("顺序增长的数组:\n{}", np.arange(10))
cprint("线性增长数组:\n{}", np.linspace(0, 2, 9))

Warning

尽管 empty 函数的名字暗示它应该生成一个空数组,但实际上生成的数组,每个元素都是有值的,只不过这些值既不是 np.nan,也不是 None,而是随机值。我们在使用 empty 生成的数组之前,一定要对它进行初始化,处理掉这些随机值。

生成正态分布数组很有用。我们在做一些研究时,常常需要生成满足某种条件的价格序列,再进一步研究和比较它的特性。

比如,如果我们想研究上升趋势和下降趋势下的某些指标,就需要有能力先构建出符合趋势的价格序列。下面的例子就演示了如何生成这样的序列,并且绘制图形:

1
2
import numpy as np
import matplotlib.pyplot as plt

1
2
3
4
5
6
7
8
9
returns = np.random.normal(0, 0.02, size=100)

fig, axes = plt.subplots(1, 3, figsize=(12,4))
c0 = np.random.randint(5, 50)

for i, alpha in enumerate((-0.01, 0, 0.01)):
    r = returns + alpha
    close = np.cumprod(1 + r) * c0
    axes[i].plot(close)

绘制的图形如下:


示例中还提到了 Dirichlet(狄利克雷)分布数组。这个数组具有这样的特点,它的所有元素加起来会等于 1。比如,在现代投资组合理论中的有效前沿优化中,我们首先需要初始化各个资产的权重(随机值),并且满足资产权重之和等于 1 的约束(显然!),此时我们就可以使用 Dirichlet[2] 分布。


1.1.3. 通过已有数组转换

我们还可以从已有的数组中,通过复制、切片、重复等方法,创建新的数组:

1
2
3
4
5
6
7
8
# 复制一个数组
cprint("通过 np.copy 创建:{}", np.copy(np.arange(5)))

# 复制数组的另一种方法
cprint("通过 arr.copy: {}", np.arange(5).copy())

# 使用切片,提取原数组的一部分
cprint("通过切片:{}", np.arange(5)[:2])

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 合并两个数组
arr = np.concatenate((np.arange(3), np.arange(2)))
cprint("通过 concatenate 合并:{}", arr)

# 重复一个数组
arr = np.repeat(np.arange(3), 2)
cprint("通过 repeat 重复原数组:{}", arr)

# 重复一个数组,注意与 NP.REPEAT 的差异
# NP.TILE 的语义类似于 PYTHON 的 LIST 乘法
arr = np.tile(np.arange(3), 2)
cprint("通过 tile 重复原数组:{}", arr)

Question

np.copy 与 arr.copy 有何不同?在 Numpy 中还有哪些类似函数对,有何规律?


注意在 concatenate 函数中,axis 的作用:

1
2
3
4
5
6
arr = np.arange(6).reshape((3,2))

# 在 ROW 方向上拼接,相当于增加行,默认行为
cprint("按 axis=0 拼接:\n{}", np.concatenate((arr, arr), axis=0))
# 在 COL 方向上拼接,相当于扩展列
cprint("按 axis=1 拼接:\n{}", np.concatenate((arr, arr), axis=1))

1.2. 增加/删除和修改元素

Numpy 数组是固定大小的,一般我们不推荐频繁地往数组中增加或者删除元素。


但如果确实有这种需求,我们可以使用下面的方法来实现增加或者删除:

函数 使用说明
append values添加到arr的末尾。
insert obj(可以是下标、slicing)指定的位置处,插入数值value(可以是标量,也可以是数组)
delete 删除指定下标处的元素

示例如下:

1
2
3
arr = np.arange(6).reshape((3,2))
np.append(arr, [[7,8]], axis=0)
cprint("指定在行的方向上操作、n{}", arr)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
arr = np.arange(6).reshape((3,2))
arr = np.insert(arr.reshape((3,2)), 1, -10)
cprint("不指定 axis,数组被扁平化:\n{}", arr)

arr = np.arange(6).reshape((3,2))
arr = np.insert(arr, 1, (-10, -10), axis=0)
cprint("np.insert:\n{}", arr)

arr = np.delete(arr, [1], axis=1)
cprint("deleting col 1:\n{}", arr)

Tip

请一定运行一下这里的代码,特别是关于 insert 的部分,了解所谓的扁平化是怎么回事。

有时候我们需要修改个别元素的值,应该这样操作:


1
2
3
arr = np.arange(6).reshape(2,3)

arr[0,2] = 3

这里涉及到如何定位一个数组元素的问题,也正是我们下一节的内容。

1.3. 定位、读取和搜索

1.3.1. 索引和切片

Numpy 中索引和切片语法大致类似于 Python,主要区别在于对多维数组的支持:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
arr = np.arange(6).reshape((3,2))
cprint("原始数组:\n{}", arr)

# 切片语法
cprint("按行切片:{}", arr[1, :])
cprint("按列切片:{}", arr[:, -1])
cprint("逆排数组:\n {}", arr[: : -1])

# FANCY INDEXING
cprint("fancy index: 使用下标数组:\n {}", arr[[2, 1, 0]])

上述切片语法在 Python 中也存在,但只能支持到一维,因此,对下面的 Python 数组,类似操作会出错:


1
2
3
arr = np.arange(6).reshape((3,2)).tolist()

arr[1, :]

提示 list indices must be integers or slices, not tuple。

1.3.2. 查找、筛选和替换

在上一节中,我们是通过索引来定位一个数组元素。但很多时候,我们得先通过条件运算把符合要求的索引找出来。这一节将介绍相关方法。

函数 使用说明
np.searchsorted 在有序数组中搜索指定的数值,返回索引。
np.nonzero 返回非零元素的索引,用以查找数组中满足条件的元素。
np.flatnonzero 同 nonzero,但返回输入数组的展平版本中非零的索引。
np.argwere 返回满足条件的元素的索引,相当于 nonzero 的转置版本
np.argmin 返回数组中最小元素的索引(注意不是返回满足条件的最小索引)
np.argmax 返回数组中最大元素的索引
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 查找
arr = [0, 2, 2, 2, 3]
pos = np.searchsorted(arr, 2, 'right')
cprint("在数组 {} 中寻找等于 2 的位置,返回 {}, 数值是 {}", 
        arr, pos, arr[pos - 1])

arr = np.arange(6).reshape((2, 3))
cprint("arr[arr > 1]: {}", arr[arr > 1])

# NONZERO 的用法
mask = np.nonzero(arr > 1)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
cprint("nonzero 返回结果是:{}", mask)
cprint("筛选后的数组是:{}", arr[mask])

# ARGWHERE 的用法
mask = np.argwhere(arr > 1)
cprint("argwere 返回的结果是:{}", mask)

# 多维数组不能直接使用 ARGWHERE 结果来筛选
# 下面的语句不能得到正确结果,一般会出现 INDEXERROR
arr[mask]

# 但对一维数组筛选我们可以用:
arr = np.arange(6)
mask = np.argwhere(arr > 1)
arr[mask.flatten()[0]]

# 寻找最大值的索引
arr = [1, 2, 2, 1, 0]
cprint("最大值索引是:{}", np.argmax(arr))

使用 searchsorted 要注意,数组本身一定是有序的,不然不会得出正确结果。

第 10 行到第 21 行代码,显示了如何查找一个数组中符合条件的数据,并且返回它的索引。

argwhere 返回值相当于 nonzero 的转置,在多维数组的情况下,它不能直接用作数组的索引。请自行对比 nonzero 与 argwhere 的用法。

在量化中,有很多情况需要实现筛选功能。比如,在计算上下影线时,我们是用公式(highmax(open,close))/(highlow)(high - max(open, close))/(high - low)来进行计算的。


如果我们要一次性地计算过去 n 个周期的所有上影线,并且不使用循环的话,那么我们就要使用 np.where, np.select 等筛选功能。

下面的例子显示了如何使用 np.select 来计算上影线:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import pandas as pd
import numpy as np

bars = pd.DataFrame({
    "open": [10, 10.2, 10.1],
    "high": [11, 10.5, 9.3],
    "low": [9.8, 9.8, 9.25],
    "close": [10.1, 10.2, 10.05]
})

max_oc = np.select([bars.close > bars.open, 
                    bars.close <= bars.open], 
                    [bars.close, bars.open])
print(max_oc)

shadow = (bars.high - max_oc)/(bars.high - bars.low)
print(shadow)

np.where 是与 np.select 相近的一个函数,不过它只接受一个条件。

1
2
arr = np.arange(6)
cprint("np.where: {}", np.where(arr > 3, 3, arr))

这段代码实现了将 3 以上的数字截断为 3 的功能。


这种功能被称为 clip,在因子预处理中是非常常用的一个技巧,用来处理异常值 (outlier)。

但它没有办法实现两端截断。此时,但 np.select 能做到,这是 np.where 与 np.select 的主要区别:

1
2
arr = np.arange(6)
cprint("np.select: {}", np.select([arr<2, arr>4], [2, 4], arr))
其结果是,生成的数组,小于 2 的被替换成 2,大于 4 的被替换成 4,其它的保持不变。

1.4. 审视 (inspecting) 数组

当我们调用其它人的库时,往往需要与它们交换数据。这时就可能出现数据格式不兼容的问题。为了有能力进行查错,我们必须掌握查看 Numpy 数组特性的一些方法。

我们先如下生成一个简单的数组,再查看它的各种特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
arr = np.ones((3,2))
cprint("dtype is: {}", arr.dtype)
cprint("shape is: {}", arr.shape)
cprint("ndim is: {}", arr.ndim)
cprint("size is: {}", arr.size)
cprint("'len' is also available: {}", len(arr))

# DTYPE
dt = np.dtype('>i4')
cprint("byteorder is: {}", dt.byteorder)
cprint("name of the type is: {}", dt.name)
cprint('is ">i4" a np.int32?: {}', dt.type is np.int32)

1
2
3
4
5
# 复杂的 DTYPE
complex = np.dtype([('name', 'U8'), ('score', 'f4')])
arr = np.array([('Aaron', 85), ('Zoe', 90)], dtype=complex)
cprint("A structured Array: {}", arr)
cprint("Dtype of structured array: {}", arr.dtype)

正如 Python 对象都有自己的数据类型一样,Numpy 数组也有自己的数据类型。我们可以通过arr.dtype来查看数组的数据类型。

从第 3 行到第 6 行,我们分别输出了数组的 shape, ndim, size 和 len 等属性。ndim 告诉我们数组的维度。shape 告诉我们每个维度的 size 是多少。shape 本身是一个 tuple, 这个 tuple 的 size,也等于 ndim。

size 在不带参数时,返回的是 shape 各元素取值的乘积。len 返回的是第一维的长度。

2. 数组操作

我们在前面的例子中,已经看到过一些引起数组形状改变的例子。比如,要生成一个3×23×2的数组,我们先用 np.arange(6) 来生成一个一维数组,再将它的形状改变为 (2, 3)。

另一个例子是使用 np.concatenate,从而改变了数组的行或者列。

2.1. 升维

我们可以通过 reshape, hstack, vstack 来改变数组的维度:


1
2
3
4
5
6
7
8
9
cprint("increase ndim with reshape:\n{}", 
        np.arange(6).reshape((3,2)))

# 将两个一维数组,堆叠为 2*3 的二维数组
cprint("createing from stack: {}", 
        np.vstack((np.arange(3), np.arange(4,7))))

# 将两个 (3,1)数组,堆叠为(3,2)数组
np.hstack((np.array([[1],[2],[3]]), np.array([[4], [5], [6]])))

2.2. 降维

通过 ravel, flatten, reshape, *split 等操作对数组进行降维。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
cprint("ravel: {}", arr.ravel())

cprint("flatten: {}", arr.flatten())

# RESHAPE 也可以用做扁平化
cprint("flatten by reshape: {}", arr.reshape(-1,))

# 使用 HSPLIT, VSPLIT 进行降维
x = np.arange(6).reshape((3, 2))
cprint("split:\n{}", np.hsplit(x, 2))

# RAVEL 与 FLATTEN 的区别:RAVEL 可以操作 PYTHON 的 LIST
np.ravel([[1,2,3],[4, 5, 6]])

这里一共介绍了 4 种方法。ravel 与 flatten 用法比较接近。ravel 的行为与 flatten 类似,只不过 ravel 是 np 的一个函数,可作用于 ArrayLike 的数组。


通过 reshape 来进行扁平化也是常用操作。此外,还介绍了 vsplit, hsplit 函数,它们的作用刚好与 vstack,hstack 相反。

2.3. 转置

此外,对数组进行转置也是此类例子中的一个。比如,在前面我们提到,np.argwhere 的结果,实际上是 np.nonzero 的转置,我们来验证一下:

1
2
3
4
5
x = np.arange(6).reshape(2,3)
cprint("argwhere: {}", np.argwhere(x > 1))

# 我们再来看 NP.NONZERO 的转置
cprint("nonzero: {}", np.array(np.nonzero(x > 1)).T)

两次输出结果完全一样。在这里,我们是通过.T来实现的转置,它是一个语法糖,正式的函数是transpose


当然,由于 reshape 函数极其强大,我们也可以使用它来完成转置:

1
2
3
cprint("transposing array from \n{} to \n{}", 
    np.arange(6).reshape((2,3)),
    np.arange(6).reshape((3,2)))

狄利克雷,德国数学家。他对数论、傅里叶级数理论和其他数学分析学领域有杰出贡献,并被认为是最早给出现代函数定义的数学家之一和解析数论创始人之一。Dirichlet 数组可作为 MPT 求解中的初始值。

比Deepseek还要Deep!起底GBDT做回归预测的秘密

决策树是机器学习中一类重要的算法。它本质是这样一种算法,即将由程序hard-coded的各种if-else逻辑,改写成为可以通过数据训练得到的模型,而该模型在效果上等价于硬编码的if-else逻辑。

1
2
3
4
5
for 有房, 年薪 in [("有", "40万"), ("有", "20万"), ("无", "100万")]:
    if 有房 == "有" and 年薪 > "30万":
        print("见家长!")
    else:
        print("下次一定")

这样做的好处是,大大增强了算法的普适性:只要有标注数据,无须编码,都可以转换成为对应的决策树模型,条件越复杂,这种优越性就表现的越明显。此外,在决策树的训练过程中,也自然地考虑了数据分布的统计特征、加入了容错(只要数据标注是正确的)。

单细胞生物: 决策树

比如,假如我是霸总的助理,要根据他的生活习惯来安排明天是否工作。我收集到过往的数据如下:

1
2
3
4
5
6
7
8
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '宜工作': [0, 0, 1, 1, 1, 1, 0, 0],
}

df = pd.DataFrame(data)
df
天气 气温 宜工作
0 高温 0
1 高温 0
2 舒适 1
3 凉爽 1
4 凉爽 1
5 凉爽 1
6 凉爽 0
7 凉爽 0

我们就可以用决策树来训练一个模型,从而为他安排明天的出差。如果哪一天他与某个女艺人热恋了,这样会新增一个判断条件,如果头一天晚上学了英语,第二天就不工作了,这样我们就只需要改数据就行了。

1
2
3
4
5
6
7
8
9
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '学英语':[0, 1, 0, 0, 1, 0, 0, 1],
    '宜工作': [0, 0, 1, 1, 1, 1, 0, 0],
}

df = pd.DataFrame(data)
df

下面这个决策树模型简单是简单了点,不过,它涉及到了决策树模型构建的全部过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier, plot_tree
import matplotlib.pyplot as plt

# 创建示例数据
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '学英语':[0, 0, 0, 0, 1, 0, 0, 1],
    '宜工作': [0, 0, 1, 1, 0, 1, 0, 0],
}

df = pd.DataFrame(data)
df

# 将分类变量转换为数值
df['天气'] = df['天气'].map({'晴': 0, '阴': 1, '雨': 2})
df['气温'] = df['气温'].map({'高温': 0, '舒适': 1, '凉爽': 2})

X = df[['天气', '气温', '学英语']]
y = df['宜工作']

# 创建并训练决策树模型
clf = DecisionTreeClassifier()
clf.fit(X, y)

# 可视化决策树
plt.figure(figsize=(12, 8))
plot_tree(clf, filled=True, feature_names=['天气', '气温', '学英语'], class_names=['诸事不宜', '宜工作'], fontsize=10)
plt.title('霸总工作否?')
plt.show()

# 预测第二天是否工作
weather = "晴"
temp = "高温"
dating=0

sample = pd.DataFrame([(weather, temp, dating)], columns=["天气", "气温", "学英语"])
sample['天气'] = sample['天气'].map({'晴': 0, '阴': 1, '雨': 2})
sample['气温'] = sample['气温'].map({'高温': 0, '舒适': 1, '凉爽': 2})

prediction = clf.predict(sample)
dating_desc = "没学英语" if dating == 0 else "昨晚学了英语"
if prediction[0] == 1:
    print(weather, temp, dating_desc, "宜工作")
else:
    print(weather, temp, dating_desc, "诸事不宜")

增加

Numpy核心语法[3]

在不同的库之间交换数据,常常会遇到格式问题。比如,我们从第三方数据源拿到的行情数据,它们用的时间字段常常会是字符串。有一些库在存储行情时,对 OHLC 这些字段进行了优化,使用了 4 个字节的浮点数,但如果要传给 talib 进行指标计算,就必须先转换成 8 个字节的浮点数,等等,这就有了类型转换的需求。


此外,我们还会遇到需要将 numpy 数据类型转换为 python 内置类型,比如,将 numpy.float64 转换为 float 的情况。

1. 类型转换和 Typing

1.1. Numpy 内部类型转换

Numpy 内部类型转换,我们只需要使用 astype

1
2
3
4
5
6
7
8
x = np.array (['2023-04-01', '2023-04-02', '2023-04-03'])
print (x.astype (dtype='datetime64[D]'))

x = np.array (['2014', '2015'])
print (x.astype (np.int32))

x = np.array ([2014, 2015])
print (x.astype (np.str_))

Tip

如何将 boolean array 转换成整数类型,特别是,将 True 转为 1,False 转为 - 1? 在涉及到阴阳线的相关计算中,我们常常需要将 open > close 这样的条件转换为符号 1 和 - 1,以方便后续计算。这个转换可以用:

1
2
3
>>> x = np.array ([True, False])
>>> x * 2 - 1
... array ([ 1, -1])

1.2. Numpy 类型与 Python 内置类型转换

如果我们要将 Numpy 数组转换成 Python 数组,可以使用 tolist 函数。

1
2
x = np.array ([1, 2, 3])
print (x.tolist ())

我们通过 item () 函数,将 Numpy 数组中的元素转换成 Python 内置类型。

1
2
3
x = np.array (['2023-04-01', '2023-04-02'])
y = x.astype ('M8[s]')
y [0].item ()

Warning

一个容易忽略的事实是,当我们从 Numpy 数组中取出一个标量时,我们都应该把它转换成为 Python 对象后再使用。否则,会发生一些隐藏的错误,比如下面的例子:

1
2
3
4
5
6
import json
x = np.arange (5)
print (json.dumps ([0]))
print (x [0])

json.dumps ([x [0]])

Warning

这里最后一行会出错。提示 type int64 is not JSON serializable。把最后一行换成 json.dumps ([x [0].item ()]) 则可以正常执行。

1.3. Typing

从 Python 3.1 起,就开始引入类型注解 (type annotation),到 Python 3.8,基本上形成了完整的类型注解体系。我们经常看到函数的参数类型注解,比如,下面的代码:

1
2
3
from typing import List
def add (a: List [int], b: int) -> List [int]:
    return [i + b for i in a]

从此,Python 代码也就有了静态类型检查支持。

NumPy 的 Typing 模块提供了一系列类型别名(type aliases)和协议(protocols),使得开发者能够在类型注解中更精确地表达 NumPy 数组的类型信息。这有助于静态分析工具、IDE 以及类型检查器提供更准确的代码补全、类型检查和错误提示。

这个模块提供的主要类型是 ArrayLike, NDArray 和 DType。

1
2
3
import numpy
from numpy.typing import ArrayLike, NDArray, DTypeLike
import numpy as np

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def calculate_mean (data: ArrayLike) -> float:
    """计算输入数据的平均值,数据可以是任何 ArrayLike 类型"""
    return np.mean (data)

def add_one_to_array (arr: NDArray [np.float64]) -> NDArray [np.float64]:
    """向一个浮点数数组的每个元素加 1,要求输入和输出都是 np.float64 类型的数组"""
    return arr + 1

def convert_to_int (arr: NDArray, dtype: DTypeLike) -> NDArray:
    """将数组转换为指定的数据类型"""
    return arr.astype (dtype)

如果你是在像 vscode 这样的 IDE 中使用上述函数,你就可以看到函数的类型提示。如果传入的参数类型不对,还能在编辑期间,就得到错误提示。

2. 拓展阅读

2.1. Numpy 的数据类型

在 Numpy 中,有以下常见数据类型。每一个数字类型都有一个别名。在需要传入 dtype 参数的地方,一般两者都可以使用。另外,别名在字符串类型、时间和日期类型上,支持得更好。比如,'S5' 是 Ascii 码字符串别外,它除了指定数据类型之外,还指定了字符串长度。datetime64 [S] 除了表明数据是时间日期类型之外,还表明它的精度到秒。


类型 别名
np.int8 i1
np.int16 i2
np.int32 i4
np.int64 i8
np.uint8 u1
np.uint16 u2
np.uint32 u4
np.uint64 u8
np.float16 f2
np.float32 f4,还可指定结尾方式,比如 'f4' 表示大端字节序。其它 float 类型同。
np.float64 f8
np.float128 f16
np.bool_ b1
np.str_ U (后接长度,例如 U10)
np.bytes_ S (后接长度,例如 S5)
np.datetime64 M8 和 M8[D] M8[h] M8[m] M8[s],也可写作 datetime64[D] 等
np.timedelta64 m8 和 m8[D] m8[h] m8[m] m8[s] 等

3. 处理包含 np.nan 的数据

在量化分析中,我们常常会遇到数据为 np.nan 情况。比如,某公司上年利润为负数,今年利润实现正增长,请问要如何表示公司的 YoY 的利润增长呢?


Info

np.nan 是 numpy 中的一个特殊值,表示“Not a Number”,即“不是数字”。注意,在 Numpy 中,尽管 np.nan 不是一个数字,但它确实数字类型。确切地说,它是 float 类型。此外,在 float 类型中,还存在 np.inf(正无穷大)和负无穷大 (np.NINF,或者-np.inf)。

又比如,在计算个股的 RSI 或者移动平均线时,最初的几期数据是无法计算出来的(在回测框架 backtrader 中,它把这种现象称之为技术指标的冷启动)。如果不要求返回的技术指标的取值与输入数据长度一致,则会返回短一些、但全部由有效数据组成的数组;否则,此时我们常常使用 np.NaN 或者 None 来进行填充,以确保返回的数据长度与输入数据长度一致。

但是,如果我们要对返回的数组进行统计,比如求均值、最大值、排序,对包含 np.nan 或者 None 的数组,应该如何处理?

3.1. 包含 np.nan 和 np.inf 的数组运算

在 numpy 中,提供了对带 np.nan 的数组进行运算的支持。比如有以下数组:

1
2
3
4
import numpy as np

x = np.array([1, 2, 3, np.nan, 4, 5])
print(x.mean())

我们将得到一个 nan。实际上,多数情况下,我们希望忽略掉 nan,只对有效数据进行运算,此时得到的结果,我们往往仍然认为是有意义的。

因此,Numpy 提供了许多能处理包含 nan 数据的数组输入的运算函数。下面是一个完整的列表:

在这里,我们以输入 np.array([1, 2, 3, np.nan, np.inf, 4, 5]) 为例

函数 nan 处理 inf 处理 输出
nanmin 忽略 inf 1.0
nanmax 忽略 inf inf
nanmean 忽略 inf inf
nanmedian 忽略 inf 3.5
nanstd 传递 inf nan
nanvar 传递 inf nan
nansum 忽略 inf inf
nanquantile 忽略 inf 2.25
nancumsum 忽略 inf inf
nancumprod 忽略 inf inf

对 np.nan 的处理中,主要是三类,一类是传递,其结果导致最终结果也是 nan,比如,在计算方差和标准差时;一类是忽略,比如在找最小值时,忽略掉 np.nan,在余下的元素中进行运算;但在计算 cumsum 和 cumprod 时,"忽略"意味着在该元素的位置上,使用前值来填充。我们看一个不包含 np.inf 的示例:


1
2
3
x = np.array([1, 2, 3, np.nan, 4, 5])
np.nancumprod(x)
np.nancumsum(x)

输出结果是:

1
2
3
array([  1.,   2.,   6.,   6.,  24., 120.])

array([ 1.,  3.,  6.,  6., 10., 15.])

结果中的第 4 个元素都是由第 3 个元素复制而来的。

如果一个数组中包含 inf,则在任何涉及到排序的操作(比如 max, median, quantile)中,这些元素总是被置于数组的最右侧;如果是代数运算,则结果会被传导为 inf。这些地方,Numpy 的处理方式与我们的直觉是一致的。

除了上述函数,np.isnan 和 np.isinf 函数,也能处理包含 np.nan/np.inf 元素的数组。它们的作用是判断数组中的元素是否为 nan/inf,返回值是一个 bool 数组。

3.2. 包含 None 的数组运算

在上一节中,我们介绍的函数能够处理包含 np.nan 和 np.inf 的数组。但是,在 Python 中,None 是任何类型的一个特殊值,如果一个数组包含 None 元素,我们常常仍然会期望能对它进行 sum, mean, max 等运算。但是,Numpy 并没有专门为此准备对应的函数。


但是,我们可以通过 astype 将数组转换为 float 类型,在此过程中,所有的 None 元素都转换为 np.nan,然后就可以进行运算了。

1
2
x = np.array([3,4,None,55])
x.astype(np.float64)

输出为:array([3., 4., nan, 55.])

3.3. 性能提升

当我们调用 np.nan *函数时,它的性能会比普通的函数慢很多。因此,如果性能是我们关注的问题,我们可以使用 bottleneck 这个库中的同名函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from bottleneck import nanstd
import numpy as np
import random

x = np.random.normal(size = 1_000_000)
pos = random.sample(np.arange(1_000_000).tolist(), 5)
x[pos] = np.nan

%timeit nanstd(x)
%timeit np.nanstd(x)

我们担心数组中 np.nan 元素个数会影响到性能,所以,在上面的示例中,在随机生成数组时,我们只生成了 5 个元素。在随后的一次测试中,我们把 nan 元素的个数增加了 10 倍。实验证明,nan 元素的个数对性能没有什么影响。在所有的测试中,bottlenect 的性能比 Numpy 都要快一倍。

Info

根据 bottleneck 的文档,它的许多函数,要比 Numpy 中的同名函数快 10 倍左右。


这是你的量化母语

正如死亡和税收不可避免,Numpy 和 Pandas 对量化人而言,也具有同样的地位 -- 每个量化人都不可避免地要与这两个库打交道。

如果你去研究一些非常重要的量化库,比如 alphalens, empyrical, backtrader, tushare, akshare, jqdatasdk 等,或者一些非常优秀的量化框架比如 quantaxis, zillionare, vnpy 等等,你就会发现它们都依赖于 numpy 和 pandas。实际上,一个库只要依赖于 pandas,它也必将传递依赖到 numpy。

如果说量化人有一种共同的语言的话,它就是 Numpy 和 Pandas。Numpy 和 Pandas 是量化人的母语。


具体地说,Numpy 和 Pandas 不仅为量化人提供了类似于表格的数据结构 -- Numpy structured array 和 Pandas DataFrame -- 这对于包括行情数据在内的诸多数据的中间存储是必不可少的;它还提供了许多基础算法。

比如:

  1. 在配对交易 (pair trade) 中,相关性计算是非常重要的一环。无论是 Numpy 还是 Pandas 都提供了相关性计算函数。
  2. 在 Alpha 101 因子计算中,排序操作是一个基础函数 -- 这是分层回测的基础 -- Pandas 通过 rank 方法来提供这一功能。
  3. Maxdrawdown(最大回测)是衡量策略的重要指标。Numpy 通过 numpy.maximum.accumulate 提供了支持。

类似常用的算法非常多,我们将在本课程中一一介绍它们。

课程编排说明

紧扣量化场景来介绍 Numpy 和 Pandas 是本课的一大特点。我们通过分析重要的、流行度较高的量化库源码,找出其中使用 numpy 和 pandas 的地方,再进行归类的提炼,并结合一些量化社区中常问的相关问题 -- 这些往往是量化人在使用 numpy/pandas 时遇到的困难所在 -- 来进行课程编排,确保既系统讲解这两个重要的库,又保证学员在学习后,能立即将学习到的方法与技巧运用到工作中,迅速提高自己的生产力。

任何高效地学习都离不开高强度的练习。本课程安排了大量的练习。无论是演示代码、还是练习,我们都尽可能安排在量化场景下完成,这样会增强您的代入感。但是,这往往也要求您能理解这些场景和数据。

在编写本课程时,作者阅读了大量书籍、博文、论文和开源项目代码。其中一部分与教材关联度较高的,我们以延伸阅读、脚注的方式提供参考链接。如果学员有时间,也可以阅读这部分内容,以获得跟作者同样的视野景深。但如果你时间紧张,也完全可以跳过这些内容,只关注我们课程内容的主线就好。


本课程是专门为量化交易从业者,比如 quant developer, quant researcher 和 quant pm 等人设计。如果您有基础的金融知识,这门课也适用于其它需要学习 Numpy 和 Pandas 的人。课程内容在丰度和深度上都是市面上少见的。

什么是 Numpy

图片来源:numpy.org

Numpy 是 Python 科学计算中的基础包,它是一款开源软件,允许在保留原有版权声明的前提下,自由使用。它的名字来源于 Numeric Programming(数值编程),其前身是 Numeric 库和 Numarray 库。

Numpy 提供了多维数组对象、各种派生对象(比如掩码数组 -- masked array)以及各种用于数组操作的高性能例程,包括数学、逻辑、形状操作、排序、选择、I/O 、离散傅里叶变换、基本线性代数、基本统计运算、随机模拟等等。下图提供了一个更详细的说明:


Numpy 的底层开发语言是 C 语言,并且进行了相当多的优化,这包括并行化支持、使用 OpenBLAS 和高级 SIMD 指令来优化矩阵操作等等。由于 Python 这种胶水语言的灵活性,使得 Numpy 最终得以作为一个 Python 库发布出来。

Tip

很多人认为要提高量化策略的性能,就必须放弃 Python,改用 C/Rust。这种说法又对又不对。

如果一个 Quanter 不懂得利用 OpenBLAS 和 LAPACK,那么即使用 C 开发出来的算法,也很难比通过 Python 调用 Numpy 来得更快。在 Numpy 中,一个最常见的矩阵乘法,就可能利用到多核机器的并行运算能力(即多线程)和高级 CPU 指令以实现快速的 BLAS/LAPACK 运算。这些知识和技巧,是一般人难以掌握的。

你可以通过下面的方法来查看你的 Numpy 是否利用了 OpenBLAS/LAPACK 及高级 SIMD 指令:

1
2
import numpy as np
np.show_config()

Numpy 广泛运用于学术界、金融界和工业界,具有成熟、快速、稳定和活跃的特点。当前的稳定版本是 2.2.0 版(2025 年 3 月),发布于仅仅 1 个季度之前,这足以说明 Numpy 社区开发的活跃度。

Numpy 还是 Pandas, scipy, statsmodels 和 scikit-learn 等众多知名 Python 库的底层依赖库。

什么是 Pandas

Pandas 是用于数据操作和分析的 Python 软件库。它构造在 Numpy 之上,增加了索引、异构数组等功能(相当于 Numpy 的 Structure Array -- 这个概念我们会在本课程后面详细解释),这使它成为处理表格类数据的有力武器。


Pandas 的名字来源于术语 Panel Data(面板数据)和 Python Data Analysis,前者是计量经济学的一个术语,用以表示对同一个体在多个时期观测的数据集。

自 2010 年成为开源项目以来,pandas 已经发展成为相当大的一个库,开发者社区已发展到超过 2500 名不同的贡献者。

Pandas 提供了 Series, DataFrame 两种数据结构。它曾经还提供了 Panel 这种三维数据结构,但最终放弃了。与 Excel 相比,它能更快速地分析更大的数据(一般小于 1 千万行,主要取决于机器的物理内存)。


延伸阅读

来源:Github readme 项目

如果要推荐一本讲解 Pandas 的书,毫无疑问,没人任何书籍能比 《Python for Data Analysis》 更权威了。因为它是由 Pandas 的创建者 Wes McKinney 撰写的!这本书现在提供有网页版供开放访问。读者也可点击 此链接 阅读。在 2023 年 4 月进行更新后,它现在支持到了 pandas 2.0 版本。

Wes Mckinney 是 Pandas 的创建者和终生仁慈独裁者。他现在居住在田纳西的纳什维尔,是 DataPad 的 CEO 和联合创始人。

Wes Mckinney 本科毕业于 MIT,是数学和统计学博士(杜克大学)。在 AQR 资本管理公司工作期间,学习了 Python 并开始构建 pandas。他同时还是 Apache Arrow 的联合创建者。

从 Pandas 的诞生史来看,毫无疑问,Pandas 就是为金融/量化而生的。Wes Mckinney 创建 Pandas 的初衷,就是要解决用 Microsoft Excel 来进行金融数据分析和统计运算时效率低、并且十分繁琐的问题。在今天,量化巨头 Two Sigma[^two-sigma] 是这个项目的重要赞助商。Pandas 的成功,也促进了 Python 的广泛流行。甚至可以说,Mckinney 以一己之力,开拓了 Python 的生存空间。

创建 Pandas 并没有任何收益,Wes Mckinney 最初主要依靠第一份工作的存款和兼职来生活。这是一个类似于《月亮和六便士》的故事,就连主人公的背景都极其相似,都是金融工作者。幸运地是,Wes Mckinney 获得了成功。如果你对这段故事感兴趣,可以阅读 《可持续发展的开源项目将赢得未来》 这篇文章。

烛台密码 三角形整理如何提示玄机

本文是几个月前《三角形整理检测》的后续篇,改进了算法,增加了应用场景的讨论。

《匡醍.因子分析与机器学习策略》课程的最后一课是关于深度学习框架在量化交易中的应用的。考虑很多技术交易者都会看图操作,比如艾略特浪型、头肩顶、三角形整理等等。正好CNN在图像模式识别能力上超越了人类,所以,就打算拿三角形整理的检测作为例子。

要通过CNN网络来实现三角形整理检测,首先需要做到数据标注。我们在课程中已经实作出来一个标注工具。不过,我更希望能够使用算法自动检测到三角形整理模式。这篇文章就将介绍我的算法。

如果你希望拿到本文源码,可以加入我们的星球。加入星球三天后,更可以获得我们研究平台的账号。在平台中提供了可以运行、验证notebook版本,你可以完全复现本文的结果。

Note

先通过算法对k线图进行标注,再通过CNN网络进行识别,感觉这个有点Matryoshka doll了。于是我在课程中换了另一个例子,通过4 channel的一维卷积,实现了预测误差1%的准确率。这篇文章算是课程的边脚料重新加工了。

算法的示意图如下:

三角形检测示意图

首先,我们得找到k线图中的波峰和波谷。在图中,波峰是点1和2,波谷则是点3和4。然后我们作通过点1和2的直线,得到压力线;作通过点3和4的直线,得到支撑线。

在Python中,计算两点之间的直线可以通过np.polyfit来实现。通过该函数,将获得直线的斜率。通过两条直线之间的斜率关系,我们可以进一步得到三角形的形态。

如果记Sr为压力线的斜率,记Ss为支撑线的斜率,那么,三角形的形态可以由以下表格来定义:

压力线方向 支撑线方向 角度对比 标记 说明
Sr>0 Ss>0 abs(sr) > abs(ss) 1 上升且发散三角
Sr>0 Ss>0 abs(sr) < abs(ss) 2 上升且收敛三角
Sr>0 Ss<0 abs(sr) > abs(ss) 3 发散偏上升三角
Sr>0 Ss<0 abs(sr) < abs(ss) 4 发散偏下降三角
Sr<0 Ss>0 abs(sr) > abs(ss) 5 下降且收敛三角
Sr<0 Ss>0 abs(sr) < abs(ss) 6 上升且收敛三角
Sr<0 Ss<0 abs(sr) > abs(ss) 7 下降且收敛三角
Sr<0 Ss<0 abs(sr) < abs(ss) 8 下降且发散三角

部分形态如下图所示:

识别算法的实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from zigzag import peak_valley_pivots

def triangle_flag(df, lock_date=None):
    if lock_date is not None:
        peroid_bars = df.loc[:lock_date]
    else:
        peroid_bars = df

    thresh = peroid_bars.close[-120:].pct_change().std() * 3

    pvs = peak_valley_pivots(peroid_bars.close.astype(np.float64), thresh, -1 * thresh)

    if len(pvs) == 0:
        return 0, None, None

    pvs[0] = pvs[-1] = 0
    pos_peaks = np.argwhere(pvs == 1).flatten()[-2:]
    pos_valleys = np.argwhere(pvs == -1).flatten()[-2:]

    if len(pos_peaks) < 2 or len(pos_valleys) < 2:
        return 0, None, None

    minx = min(pos_peaks[0], pos_valleys[0])
    y = df.close[pos_peaks].values
    p = np.polyfit(x=pos_peaks, y=y, deg=1)
    upper_trendline = np.poly1d(p)(np.arange(0, len(df)))

    y = df.close[pos_valleys].values
    v = np.polyfit(x=pos_valleys, y=y, deg=1)
    lower_trendline = np.poly1d(v)(np.arange(0, len(df)))

    sr, ss = p[0], v[0]

    flags = {
        (True, True, True): 1,
        (True, True, False): 2,
        (True, False, True): 3,
        (True, False, False): 4,
        (False, True, True): 5,
        (False, True, False): 6,
        (False, False, True): 7,
        (False, False, False): 8,
    }

    flag = flags[(sr > 0, ss > 0, abs(sr) > abs(ss))]

    return flag, upper_trendline, lower_trendline
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def show_trendline(asset, df, resist, support, flag, width=600, height=400):
    desc = {
        1: "上升且发散三角",
        2: "上升且收敛三角",
        3: "发散偏上升三角",
        4: "发散偏下降三角",
        5: "下降且收敛三角",
        6: "上升且收敛三角",
        7: "下降且收敛三角",
        8: "下降且发散三角",
    }

    if isinstance(df, pd.DataFrame):
        df = df.reset_index().to_records(index=False)

    title = f"flag: {flag} - {desc[flag]}"
    cs = Candlestick(df, title=title, show_volume=False, show_rsi=False, width=width, height=height)
    cs.add_line("support", np.arange(len(df)), support)
    cs.add_line("resist", np.arange(len(df)), resist)
    cs.plot()


np.random.seed(78)
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 4)

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag)

最后,我们来研究一支个股的情况,看看这个算法可能有哪些用途:

1
2
3
4
5
6
7
8
9
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9, 11))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

标的从23年4月19日以来,先后出现4次波峰。随着时间的推移,整理形态也不断变化。

7月12,突破之前的形态是发散偏上升三角。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 7,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

7月12日突破之后,支撑和压力线发生变化。此时可以计算出9月7日的压力位是48元。但当天只冲击到45.6,随后收了上影线。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 8,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

此时仍然是上升三角,但9月7日未破压力位后,压力线应该使用最新的两个波峰连线。此时的压力线的斜率比之前的要小,显示后续走势会弱一些。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9,15))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

在9月7日新的波峰形成后,新的压力线在11月20日的值为48.5,当天的最高点为46.4,再次未破压力位。此后压力线需要重新计算。新的压力线的斜率进一步减小。形态也由此前的上升且发散三角形,转换为上升且收敛三角形,表明已经到了退出的时间。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

上述压力线斜率的变化能够表明价格上升是打开了新的通道,还是预期在走弱,这对我们中短线操作很有帮助。

在通过机器学习构建策略时,我们可以把压力线和支撑线斜率的变化(δSr\delta{Sr}, δSs\delta{Ss})、压力线和支撑线预测出来的值(Pt+1P_{t+1}, Vt1V_{t_1})等作为特征,那么,我们就可能更精确地预测未来走势。