配对交易(Pairs Trading)作为一种经典的市场中性策略,已有数十年的应用历史。它最早可以追溯到1980年代,由摩根士丹利的量化团队发明,后来被海内外的对冲基金广泛采用。

配对交易的核心思路是基于两个历史价格高度相关的资产进行来回套利。当这两只资产的价格关系发生暂时性偏离时,配对交易策略会介入,做空涨幅较大的资产,同时做多跌幅较大的资产,期望价格关系最终回归,从中获得利润。

在 2006 年,Evan Gatev、William N. Goetzmann 和 K. Geert Rouwenhorst 发表了论文《Pairs Trading: Performance of a Relative-Value Arbitrage Rule》,这篇论文系统地探讨了配对交易策略在美国股市的表现。他们通过对 1962 年至 2002 年间的数据进行回测,作者发现配对交易策略在市场中能提供显著的超额收益,年化回报率高达 11%。同时,这一策略具有较低的波动性,并且回撤控制较好,尤其适合在波动较大的市场环境下执行。

那么这个策略今天还能用吗?我打算复现论文中的方法,回测周期设置在1990到2025年,看看它的收益如何。想自己动手的同学建议先看论文原文。也可以看看我们前两期的回测文章:

复现思路

1. 通过计算最小历史价格距离选择配对

在我的回测中,首先需要通过历史数据计算出所有股票之间的相似度。这里的相似度采用了标准化的价格差异,也就是价格序列的距离。在论文中,作者使用了最小距离原则来筛选出最相关的股票对。在我的实现中,我通过遍历所有可能的股票配对,并计算它们在一定窗口期内的标准化价格差(通过欧氏距离来度量),然后选择出最小距离的配对。

2. 交易信号

在交易信号的生成方面,论文提出,当两个股票的价格差偏离历史均值超过两倍标准差时,触发开仓信号。这一策略符合市场中常见的均值回归思维,假设价格差的波动是暂时的,最终会回归。因此,我在代码中也使用了相似的逻辑:通过计算价格差的历史均值和标准差,判断是否出现过度偏离。如果价格差大于均值加两倍标准差或小于均值减两倍标准差,就会开仓。

3. 资金分配

资金分配是量化策略中至关重要的部分。论文中提到,配对交易策略的资金应根据可交易配对数和风险承受能力进行分配。为了控制风险,我在回测代码中加入了资金杠杆控制,并确保每个交易对的仓位在合理范围内。具体来说,我根据每个配对的市场价值和杠杆比率来决定每个配对的仓位大小,这样可以有效分散风险,避免单一配对造成的过大损失。

4. 动态调整持仓

在策略执行方面,论文中提到,当价格差回归到均值时,应及时平仓。我在代码中也设置了一个平仓机制:一旦配对的价格差回到正常范围,即可平掉相应的仓位。此部分主要依赖于动态持仓的思想,确保在市场波动的不同阶段都能做出及时的调整。

回测结果

回测区间1990 – 2025
夏普比率1.03
年化收益率8.04%
年化波动率7.79%
最大回撤22.99%

完整回测代码

Python
import itertools as it
import collections as col
from typing import Dict, List, Tuple

import backtrader as bt
import numpy as np
import pwb_toolbox.backtesting as pwb_bt


class PairsTradingStrategy(pwb_bt.BaseStrategy):
    params = (
        ("period", 12 * 21),  # 滚动窗口的周期(12个月的交易日)
        ("leverage", 0.8),  # 应用的杠杆倍数
        ("selection_month", 6),  # 股票选择的月份
        ("max_traded_pairs", 8),  # 最大交易配对数
        ("total_days", 0),
        ("indicator_cls", None),
        ("indicator_kwargs", {}),
    )

    def __init__(self):
        super().__init__()
        self.history_price = {d._name: [] for d in self.datas}
        self.traded_pairs = []
        self.traded_quantity = {}
        self.sorted_pairs = []
        self.symbols = [d._name for d in self.datas]
        self.add_timer(when=bt.Timer.SESSION_START, monthdays=[1], monthcarry=True)
        self.broker.set_coc(True)

    def notify_timer(self, timer, when, *args, **kwargs):
        if self.datas[0].datetime.date(0).month == self.params.selection_month:
            self.update_pairs()

    def update_pairs(self):
        self.symbol_pairs = list(it.combinations(self.symbols, 2))

        distances = {}
        for pair in self.symbol_pairs:
            is_tradable_a = (
                len(self.history_price[pair[0]]) == self.params.period
                and self.history_price[pair[0]][-1] != self.history_price[pair[0]][-3]
            )
            is_tradable_b = (
                len(self.history_price[pair[1]]) == self.params.period
                and self.history_price[pair[1]][-1] != self.history_price[pair[1]][-3]
            )
            if is_tradable_a and is_tradable_b:
                distance = self.distance(
                    self.history_price[pair[0]], self.history_price[pair[1]]
                )
                if distance > 0:  # 避免没有数据的配对
                    distances[pair] = distance

        if distances:
            self.sorted_pairs = [
                x[0] for x in sorted(distances.items(), key=lambda x: x[1])
            ]
        # 平掉所有仓位
        for d in self.datas:
            self.order_target_percent(d, target=0)
        self.traded_pairs.clear()
        self.traded_quantity.clear()

    def next(self):
        super().next()
        self.rebalance()

    def rebalance(self):
        for d in self.datas:
            self.history_price[d._name].append(d.close[0])
            if len(self.history_price[d._name]) > self.params.period:
                self.history_price[d._name].pop(0)

        pairs_to_remove = []

        for pair in self.sorted_pairs:
            price_a = list(self.history_price[pair[0]])
            price_b = list(self.history_price[pair[1]])
            norm_a = np.array(price_a) / price_a[0]
            norm_b = np.array(price_b) / price_b[0]

            spread = norm_a - norm_b
            mean = np.mean(spread)
            std = np.std(spread)
            actual_spread = spread[-1]

            traded_portfolio_value = (
                self.broker.getvalue()
                / self.params.max_traded_pairs
                * self.params.leverage
            )
            if actual_spread > mean + 2 * std or actual_spread < mean - 2 * std:
                if pair not in self.traded_pairs:
                    if len(self.traded_pairs) < self.params.max_traded_pairs:
                        symbol_a = pair[0]
                        symbol_b = pair[1]
                        a_price = price_a[-1]
                        b_price = price_b[-1]

                        if norm_a[-1] > norm_b[-1]:
                            long_q = int(traded_portfolio_value / b_price)
                            short_q = -int(traded_portfolio_value / a_price)
                            if (
                                self.getpositionbyname(symbol_a).size == 0
                                and self.getpositionbyname(symbol_b).size == 0
                            ):
                                self.sell(
                                    self.getdatabyname(symbol_a),
                                    size=abs(short_q),
                                    exectype=bt.Order.Market,
                                )
                                self.buy(
                                    self.getdatabyname(symbol_b),
                                    size=abs(long_q),
                                    exectype=bt.Order.Market,
                                )

                                self.traded_quantity[pair] = (short_q, long_q)
                                self.traded_pairs.append(pair)
                        else:
                            long_q = int(traded_portfolio_value / a_price)
                            short_q = -int(traded_portfolio_value / b_price)
                            if (
                                self.getpositionbyname(symbol_a).size == 0
                                and self.getpositionbyname(symbol_b).size == 0
                            ):
                                self.buy(
                                    self.getdatabyname(symbol_a),
                                    size=abs(long_q),
                                    exectype=bt.Order.Market,
                                )
                                self.sell(
                                    self.getdatabyname(symbol_b),
                                    size=abs(short_q),
                                    exectype=bt.Order.Market,
                                )

                                self.traded_quantity[pair] = (long_q, short_q)
                                self.traded_pairs.append(pair)
            else:
                if pair in self.traded_pairs and pair in self.traded_quantity:
                    if self.traded_quantity[pair][0] > 0:
                        self.sell(
                            self.getdatabyname(pair[0]),
                            size=abs(self.traded_quantity[pair][0]),
                            exectype=bt.Order.Market,
                        )
                        self.buy(
                            self.getdatabyname(pair[1]),
                            size=abs(self.traded_quantity[pair][1]),
                            exectype=bt.Order.Market,
                        )
                    else:
                        self.buy(
                            self.getdatabyname(pair[0]),
                            size=abs(self.traded_quantity[pair][0]),
                            exectype=bt.Order.Market,
                        )
                        self.sell(
                            self.getdatabyname(pair[1]),
                            size=abs(self.traded_quantity[pair][1]),
                            exectype=bt.Order.Market,
                        )
                    pairs_to_remove.append(pair)

        for pair in pairs_to_remove:
            self.traded_pairs.remove(pair)
            del self.traded_quantity[pair]

    def distance(self, price_a, price_b):
        norm_a = np.array(price_a) / price_a[0]
        norm_b = np.array(price_b) / price_b[0]
        return sum((norm_a - norm_b) ** 2)


def run_strategy():
    symbols = ["sp500"]
    strategy = pwb_bt.run_strategy(
        indicator_cls=None,
        indicator_kwargs={},
        strategy_cls=PairsTradingStrategy,
        strategy_kwargs={
            "leverage": 0.8,
        },
        symbols=symbols,
        start_date="1990-01-01",
        cash=1_000_000.0,
    )
    return strategy


if __name__ == "__main__":
    strategy = run_strategy()