量化交易的本质,其实就是利用代码,把投资逻辑变成可执行的规则。但在策略、模型、执行这三个环节里,数据源往往是被低估的那一环,数据质量的高低,对模型的影响很少有人提及。很多团队花了大量精力打磨信号模型,最后因为数据的各种质量问题(延迟、丢数据、格式不统一),导致回测结论在实盘中完全失效。

本文系统梳理量化交易中三类核心数据源:股票数据、外汇数据、期货数据的特性、坑点以及接入方案,下文中的所有代码示例都基于 Infoway API 提供的 Python SDK

一、数据源的评判标准

量化策略的有效性依赖两个前提:回测数据与实盘数据一致,以及实盘数据的延迟足够低。如果这两点能同时满足,我们可以称之为高质量的数据源。

在众多数据质量问题中,以下几点是比较普遍的:

  • 没有 Tick 数据:如果只有日线的高开低收,短线策略根本无从验证
  • 复权处理不一致:回测用前复权数据,实盘拿到的是不复权数据,导致跨越除权日的信号全部偏移
  • 时区与时间戳处理错误:这种仅出现在多市场联动策略中,比如把东京时间和纽约时间混用
  • 数据格式不统一:股票、外汇、期货来自不同来源,字段命名和精度各异,合并时出现隐性错误

选择一个覆盖广、标准化程度高的数据接口,可以从源头规避掉上述大多数问题。

二、股票数据源

2.1 量化场景对股票数据的要求

股票是量化策略最集中的战场,从趋势跟踪、统计套利到事件驱动,几乎所有主流策略类型都是围绕股票市场展开的。量化场景通常需要以下几类数据:

数据类型用途举例
实时成交(Tick)高频监控、入场时机判断
盘口深度(Order Book)市场冲击估算、流动性判断
K 线历史(OHLCV)技术指标计算、回测
复权因子跨分红/拆股日的正确计算
基本面数据(估值/评级)多因子模型、价值筛选
板块与行业分类轮动策略、板块热度监控

2.2 多市场接入的复杂性

最近这几年,押注单一市场的策略边际收益正在递减,越来越多的量化团队在同时跑港股、美股、A股,甚至日本韩国股票的多因子模型。但多市场意味着多套数据格式和交易规则:

  • A股:T+1 限制、±10% 涨跌停(ST 股 ±5%)、集合竞价、复权因子必须处理
  • 港股:无涨跌停限制、T+0(当日可卖)、部分股票不足 100 股手数不一
  • 美股:T+2 结算、有盘前盘后时段(价格波动显著)、不同市场的 LULD 熔断机制

用统一 SDK 接入多市场,可以在代码层面屏蔽掉这些差异,只保留策略逻辑本身。

2.3 代码示例:批量获取多市场股票数据

Python
from infoway import InfowayClient, KlineType

client = InfowayClient(api_key="YOUR_API_KEY")

# 同时查询美股 + 港股 + A股的实时成交
codes = {
    "US":  ["AAPL.US", "TSLA.US", "NVDA.US"],
    "HK":  ["700.HK", "9988.HK", "3690.HK"],
    "A":   ["600519.SH", "000858.SZ"],
}

all_codes = ",".join(c for market in codes.values() for c in market)
trades = client.stock.get_trade(all_codes)

for item in trades["data"]:
    print(f"{item['s']:15s}  价格: {item['p']:>10s}  涨幅: {item['td']:+.2f}%")
Python
# 获取腾讯 60 根日 K 线,用于计算均线
klines = client.stock.get_kline("700.HK", kline_type=KlineType.DAY, count=60)

import pandas as pd
df = pd.DataFrame(klines["data"]["kLine"])
df.columns = ["time", "open", "high", "low", "close", "volume"]
df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].astype(float)

df["ma5"]  = df["close"].rolling(5).mean()
df["ma20"] = df["close"].rolling(20).mean()

# 均线交叉信号
df["signal"] = (df["ma5"] > df["ma20"]).astype(int).diff()
buy_signals  = df[df["signal"] ==  1]
sell_signals = df[df["signal"] == -1]

print(f"买入信号: {len(buy_signals)} 次  卖出信号: {len(sell_signals)} 次")

2.4 复权因子的正确使用

回测时必须对历史 K 线做复权处理,否则价格序列在分红除权日会出现跳缺,导致均线、动量指标严重失真。

Python
# 获取苹果的复权因子列表
factors = client.basic.get_adjustment_factors("AAPL.US")

# 典型返回结构示例:
# [{"date": "2020-08-31", "factor": 0.25}, ...]  # 股票分拆,价格 × 0.25

# 在回测框架中将 K 线价格按因子调整
def apply_adjustment(df, factors):
    factor_df = pd.DataFrame(factors["data"]).sort_values("date")
    df = df.sort_values("time")
    # 将复权因子与 K 线时间对齐,逐段调整价格
    # 具体实现需结合回测框架的时间戳格式
    ...
    return df

2.5 实时盘口监控:用于冲击成本估算

Python
# 获取 5 档盘口,判断当前买入 10 万港元腾讯的滑点
depth = client.stock.get_depth("700.HK")

bids = depth["data"][0]["bids"]  # 买盘 [[price, volume], ...]
asks = depth["data"][0]["asks"]  # 卖盘

target_value = 100000  # 港元
remaining = target_value
slippage_cost = 0
base_price = float(asks[0][0])

for ask_price, ask_vol in asks:
    ask_price = float(ask_price)
    ask_vol   = float(ask_vol)
    fill_value = min(remaining, ask_price * ask_vol)
    slippage_cost += fill_value * (ask_price - base_price) / base_price
    remaining -= fill_value
    if remaining <= 0:
        break

print(f"预估滑点: {slippage_cost:.2f} 港元  ({slippage_cost/target_value*100:.3f}%)")

三、外汇数据源

3.1 外汇市场的特殊性

很多人以为股票是全球最大的金融市场,但其实真正的霸主是外汇。外汇市场日均成交额超过 7.5 万亿美元,而全球股票每天的交易额不到一万亿美元。

外汇和股票市场有几个根本性的不同,直接影响数据接入方式:

7×24 小时运转:外汇市场从周日 22:00 UTC(悉尼开盘)到周五 22:00 UTC(纽约收盘)持续运转,没有明确的收盘价。数据源需要支持持续推送,周末的平仓缺口在周日重新开盘时经常产生显著跳空。

报价货币对影响 P&L 计算:EURUSD 的 1 个 pip 等于多少人民币,取决于当前 USDCNY 的汇率。多货币对的量化策略需要实时维护一张汇率转换表。

点差即交易成本:外汇没有手续费的概念,做市商通过买入和卖出的那一丢丢价差来盈利。回测时必须把点差纳入成本模型,否则高频外汇策略的回测收益会被严重高估。

无中央清算所:外汇是场外市场(OTC),不同数据源的报价可能存在细微差异,特别是在流动性薄弱的货币对上。

3.2 外汇数据覆盖范围

Infoway API 通过 /common 接口覆盖 80+ 主流外汇货币对:

类别主要货币对
主要货币对EURUSD、GBPUSD、USDJPY、USDCHF、AUDUSD、USDCAD、NZDUSD
交叉盘EURJPY、GBPJPY、EURGBP、AUDCAD 等
人民币相关USDCNY、USDCNH、EURCNY 等
新兴市场货币USDINR、USDKRW、USDTHB、USDSGD 等

3.3 代码示例:外汇多货币对监控

Python
from infoway import InfowayClient, KlineType
import asyncio
from infoway.ws import InfowayWebSocket

client = InfowayClient(api_key="YOUR_API_KEY")

# ─── 实时成交快照 ───
forex_pairs = "EURUSD,GBPUSD,USDJPY,USDCNY,AUDUSD,USDCAD"
quotes = client.common.get_trade(forex_pairs)

for q in quotes["data"]:
    print(f"{q['s']:10s}  {q['p']}")
Python
# ─── 外汇 K 线:获取 EURUSD 最近 200 根 1 小时 K 线 ───
klines = client.common.get_kline("EURUSD", kline_type=KlineType.HOUR_1, count=200)

df = pd.DataFrame(klines["data"]["kLine"])
df.columns = ["time", "open", "high", "low", "close", "volume"]
df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].astype(float)

# 计算 ATR(平均真实波幅),用于仓位管理
df["tr"] = df[["high", "low", "close"]].apply(
    lambda r: max(r["high"] - r["low"],
                  abs(r["high"] - df["close"].shift(1).iloc[df.index.get_loc(r.name)]),
                  abs(r["low"]  - df["close"].shift(1).iloc[df.index.get_loc(r.name)])),
    axis=1
)
df["atr14"] = df["tr"].rolling(14).mean()
print(f"EURUSD 当前 ATR(14): {df['atr14'].iloc[-1]:.5f}")
Python
# ─── WebSocket 实时订阅外汇报价 ───
async def monitor_forex():
    ws = InfowayWebSocket(api_key="YOUR_API_KEY", business="common")

    positions = {"EURUSD": 100000, "GBPUSD": 50000}  # 当前持仓(单位:基础货币)
    entry_prices = {"EURUSD": 1.0850, "GBPUSD": 1.2700}

    async def on_trade(msg):
        for item in msg.get("data", []):
            pair  = item["s"]
            price = float(item["p"])
            if pair in positions:
                pnl = (price - entry_prices[pair]) * positions[pair]
                print(f"{pair}  当前: {price:.5f}  持仓盈亏: {pnl:+.2f} USD")

    ws.on_trade = on_trade
    await ws.subscribe_trade("EURUSD,GBPUSD,USDJPY,USDCNY")
    await ws.connect()

asyncio.run(monitor_forex())

3.4 回测中的点差建模

外汇回测需要在每次开仓时扣除点差,以主要货币对为例:

Python
SPREAD_PIPS = {
    "EURUSD": 0.5,   # 0.5 pip ≈ 0.00005
    "GBPUSD": 0.8,
    "USDJPY": 0.6,
    "USDCNY": 15,    # 人民币对,单位不同
}

PIP_VALUE = {
    "EURUSD": 0.0001,
    "GBPUSD": 0.0001,
    "USDJPY": 0.01,
    "USDCNY": 0.0001,
}

def calc_spread_cost(pair: str, lot_size: float) -> float:
    """标准手(100,000 基础货币)下的点差成本(报价货币)"""
    spread = SPREAD_PIPS[pair] * PIP_VALUE[pair]
    return spread * lot_size * 100000

四、期货数据源

4.1 期货数据的核心难点:连续合约

期货和股票、外汇最大的差别,是期货合约有到期日,同一个品种在市场上还会同时存在多个到期月份的合约(近月、次近月、远月)。这给量化接入带来几个独特问题:

换月展期问题:策略需要在合约到期前移仓到下一个月份,如果回测数据用单一合约(如 CL2024M),会在换月时出现价格跳空,形成虚假的极端信号。正确做法是使用主力连续合约 (国内简称 主连),把历史上各近月合约价格拼接成一条价格序列,并做价差调整。

交割与现货价格收敛:临近到期日,期货价格会向现货收敛,价差(Basis)会快速变化。趋势跟踪策略如果持有近月合约太晚,会被这个效应严重干扰。

保证金与杠杆:期货的仓位计算基于合约规格(每手乘数),与股票/外汇完全不同。仓位管理模型必须单独处理。

4.2 Infoway 支持的期货品种

Infoway 通过 /common 接口覆盖中国主要期货交易所(上期所、郑商所、大商所)的主力合约:

类别代码示例主要品种
贵金属AU1!AG1!黄金、白银
有色金属CU1!AL1!ZN1!NI1!PB1!SN1!SS1!铜、铝、锌、镍、铅、锡、不锈钢
黑色金属/建材RB1!HC1!SF1!SM1!螺纹钢、热轧卷板、硅铁、锰硅
化工/能源化工TA1!MA1!BU1!FU1!PX1!PL1!PTA、甲醇、石油沥青、燃料油、对二甲苯、聚丙烯
农产品/软商品CF1!OI1!SR1!RM1!AP1!RS1!棉花、菜籽油、白砂糖、菜籽粕、苹果、油菜籽
橡胶/浆纸RU1!BR1!SP1!天然橡胶、丁二烯橡胶、漂白针叶木浆

国际大宗商品(美元计价)同样通过 /common 接口接入,使用不同 Symbol 格式,例如黄金现货 XAUUSD、WTI 原油 USOIL、布伦特原油 UKOIL,与上方中国期货代码并列,但属于两套独立的报价体系。

4.3 代码示例:期货数据接入

Python
from infoway import InfowayClient, KlineType
import pandas as pd
import numpy as np

client = InfowayClient(api_key="YOUR_API_KEY")

# ─── 中国期货主力合约实时报价快照 ───
# 代码格式:{品种}1!,例如 CU1! = 铜主力,RB1! = 螺纹钢主力
cn_futures = "AU1!,AG1!,CU1!,AL1!,ZN1!,RB1!,HC1!,TA1!,MA1!,RU1!"
quotes = client.common.get_trade(cn_futures)

for q in quotes["data"]:
    print(f"{q['s']:8s}  {q['p']:>12s}  涨跌: {float(q.get('td', 0)):+.2f}%")
Python
# ─── 国际大宗商品(美元计价)实时报价 ───
intl_commodities = "XAUUSD,XAGUSD,USOIL,UKOIL"
quotes = client.common.get_trade(intl_commodities)

for q in quotes["data"]:
    print(f"{q['s']:10s}  {q['p']:>12s}  涨跌: {float(q.get('td', 0)):+.2f}%")
Python
# ─── 铜主力 K 线:计算趋势强度(CTA 策略信号层) ───
klines = client.common.get_kline("CU1!", kline_type=KlineType.DAY, count=120)

df = pd.DataFrame(klines["data"]["kLine"])
df.columns = ["time", "open", "high", "low", "close", "volume"]
df[["open", "high", "low", "close", "volume"]] = \
    df[["open", "high", "low", "close", "volume"]].astype(float)

# ADX 方向性趋势强度指标
def calc_adx(df, period=14):
    df["tr"]   = np.maximum(df["high"] - df["low"],
                 np.maximum(abs(df["high"] - df["close"].shift(1)),
                            abs(df["low"]  - df["close"].shift(1))))
    df["+dm"]  = np.where((df["high"] - df["high"].shift(1)) >
                           (df["low"].shift(1) - df["low"]),
                           np.maximum(df["high"] - df["high"].shift(1), 0), 0)
    df["-dm"]  = np.where((df["low"].shift(1) - df["low"]) >
                           (df["high"] - df["high"].shift(1)),
                           np.maximum(df["low"].shift(1) - df["low"], 0), 0)

    atr   = df["tr"].ewm(span=period, adjust=False).mean()
    plus  = 100 * df["+dm"].ewm(span=period, adjust=False).mean() / atr
    minus = 100 * df["-dm"].ewm(span=period, adjust=False).mean() / atr
    dx    = 100 * abs(plus - minus) / (plus + minus)
    df["adx"] = dx.ewm(span=period, adjust=False).mean()
    return df

df = calc_adx(df)
adx_val = df["adx"].iloc[-1]
trend_label = "强趋势" if adx_val > 25 else ("弱趋势" if adx_val > 15 else "震荡")
print(f"CU1!(铜主力) ADX(14): {adx_val:.2f}  市场状态: {trend_label}")
Python
# ─── 黑色金属产业链相关性分析 ───
# RB1!(螺纹钢) / HC1!(热轧卷板) / SF1!(硅铁) / SM1!(锰硅)
symbols = ["RB1!", "HC1!", "SF1!", "SM1!"]
close_data = {}

for sym in symbols:
    kl = client.common.get_kline(sym, kline_type=KlineType.DAY, count=60)
    prices = [float(row[4]) for row in kl["data"]["kLine"]]  # close price
    close_data[sym] = prices

df_corr = pd.DataFrame(close_data)
corr_matrix = df_corr.pct_change().corr()

print("\n黑色金属产业链 近 60 日收益率相关系数:")
print(corr_matrix.round(3).to_string())
Python
# ─── WebSocket 实时监控:沪铜与沪金价格联动 ───
import asyncio
from infoway.ws import InfowayWebSocket

async def monitor_cn_futures():
    ws = InfowayWebSocket(api_key="YOUR_API_KEY", business="common")
    latest = {}

    async def on_trade(msg):
        for item in msg.get("data", []):
            latest[item["s"]] = float(item["p"])

        # 沪金(AU1!) / 沪银(AG1!) 金银比
        if "AU1!" in latest and "AG1!" in latest:
            gold_silver_ratio = latest["AU1!"] / latest["AG1!"]
            print(f"沪金: {latest['AU1!']:.2f}  "
                  f"沪银: {latest['AG1!']:.2f}  "
                  f"金银比: {gold_silver_ratio:.1f}")

    ws.on_trade = on_trade
    # 同时订阅贵金属 + 有色金属 + 黑色金属主力合约
    await ws.subscribe_trade("AU1!,AG1!,CU1!,RB1!,HC1!")
    await ws.connect()

asyncio.run(monitor_cn_futures())

4.4 那些基于期货数据的量化策略

商品 CTA 策略:这是期货量化中最经典的策略(小白入门策略,但好用),核心是 K 线历史数据和 ATR 仓位管理。通道突破(唐奇安通道)和均线趋势是两种常见信号,铜(CU1!)、螺纹钢(RB1!)等流动性好的品种都需要至少 120 根以上的历史 K 线来做充分回测。

产业链价差套利:黑色金属产业链中,螺纹钢(RB1!)和热轧卷板(HC1!)价差长期维持在一定区间内,偏离时存在统计套利机会;化工板块中 PTA(TA1!)与甲醇(MA1!)的价差也是常见的配对交易标的。贵金属方面,沪金(AU1!)与沪银(AG1!)的金银比同样是经典均值回归策略的标的。

跨品种宏观对冲:铜(CU1!)被称为铜博士,对全球经济预期高度敏感,常被用于对冲工业股票持仓的宏观风险;天然橡胶(RU1!)与汽车板块的联动也是常见的跨市场对冲思路。这类策略需要同时订阅多个品种的实时数据,WebSocket 多品种订阅实时数据必不可少。

五、三类数据源的统一接入架构

在实际的量化系统中,往往需要同时处理股票、外汇、期货三类数据,关键是建立一个统一的数据抽象层,避免上层策略感知到底层来源差异。

Python
from infoway import InfowayClient, KlineType
from infoway.ws import InfowayWebSocket
import asyncio
from dataclasses import dataclass
from typing import Callable

@dataclass
class Quote:
    symbol:   str
    price:    float
    change:   float   # 涨跌幅 %
    timestamp: int    # Unix 毫秒

class UnifiedMarketDataFeed:
    """统一行情数据层,屏蔽股票/外汇/期货的接口差异"""

    BUSINESS_MAP = {
        # 标的后缀 → WebSocket business 参数
        ".US": "stock", ".HK": "stock", ".SH": "stock", ".SZ": "stock",
        # 外汇/商品/期货:无后缀,走 common
    }

    def __init__(self, api_key: str):
        self.client   = InfowayClient(api_key=api_key)
        self.api_key  = api_key
        self._handlers: list[Callable[[Quote], None]] = []

    def on_quote(self, handler: Callable[[Quote], None]):
        self._handlers.append(handler)

    def _dispatch(self, raw_items: list):
        for item in raw_items:
            q = Quote(
                symbol    = item["s"],
                price     = float(item["p"]),
                change    = float(item.get("td", 0)),
                timestamp = item["t"],
            )
            for h in self._handlers:
                h(q)

    def snapshot(self, symbols: list[str]) -> list[Quote]:
        """REST 快照:同时拉取任意品种的当前报价"""
        stock_syms  = [s for s in symbols if "." in s]
        common_syms = [s for s in symbols if "." not in s]
        results = []

        if stock_syms:
            data = self.client.stock.get_trade(",".join(stock_syms))
            results.extend(data["data"])
        if common_syms:
            data = self.client.common.get_trade(",".join(common_syms))
            results.extend(data["data"])

        return [Quote(r["s"], float(r["p"]), float(r.get("td", 0)), r["t"])
                for r in results]

    async def stream(self, stock_symbols: list[str], common_symbols: list[str]):
        """WebSocket 实时推送:分业务类型建立连接"""
        tasks = []

        if stock_symbols:
            ws_stock = InfowayWebSocket(api_key=self.api_key, business="stock")
            ws_stock.on_trade = lambda msg: self._dispatch(msg.get("data", []))
            tasks.append(self._run_ws(ws_stock, ",".join(stock_symbols)))

        if common_symbols:
            ws_common = InfowayWebSocket(api_key=self.api_key, business="common")
            ws_common.on_trade = lambda msg: self._dispatch(msg.get("data", []))
            tasks.append(self._run_ws(ws_common, ",".join(common_symbols)))

        await asyncio.gather(*tasks)

    async def _run_ws(self, ws: InfowayWebSocket, codes: str):
        await ws.subscribe_trade(codes)
        await ws.connect()


# ─── 使用示例 ───
async def main():
    feed = UnifiedMarketDataFeed(api_key="YOUR_API_KEY")

    def strategy_handler(q: Quote):
        print(f"[{q.symbol}] ${q.price:.4f}  ({q.change:+.2f}%)")

    feed.on_quote(strategy_handler)

    # 股票 + 外汇 + 商品期货,统一处理
    await feed.stream(
        stock_symbols  = ["AAPL.US", "700.HK", "600519.SH"],
        common_symbols = ["EURUSD", "USDJPY", "XAUUSD", "USOIL"],
    )

asyncio.run(main())

六、数据质量评估维度

选择量化数据源时,以下几个维度直接影响策略的可靠性:

评估维度说明为什么重要
延迟(Latency)Tick 数据从交易所到 API 的传输时延延迟超过 500ms 会影响日内策略的信号有效性
历史深度K 线数据可回溯多少年短历史无法验证策略在不同市场周期中的表现
品种覆盖支持多少标的,是否包含退市股票仅含活跃股票的数据库存在”幸存者偏差”
复权处理是否提供前复权/后复权/不复权三种数据复权处理错误是回测信号失真的头号原因
WebSocket 稳定性断线重连、心跳机制是否完善连接不稳定会导致实盘策略漏信号
数据标准化多品种是否统一字段格式格式不统一会引入隐性的数据处理错误

Infoway API 的 WebSocket 内置自动重连(指数退避,1 秒起步上限 30 秒)和心跳保活机制,重连后自动恢复全部订阅,对实盘策略的连续性有基本保障。

七、常见问题

回测用历史 K 线,实盘用实时数据,两者的字段格式一样吗?

一样。REST 接口的 get_kline 返回的每根 K 线格式为 [time, open, high, low, close, volume],WebSocket 推送的实时 K 线(PUSH_KLINE,消息码 10008)字段格式相同,可以直接接入同一条数据处理 pipeline。

外汇市场周末休市,数据源还会推送数据吗?

主要货币对的外汇市场在周五纽约收盘(约 22:00 UTC)关闭,周日悉尼开盘时重新开始。这段时间内不会有新的成交数据推送,但 WebSocket 连接本身会保持(心跳继续),重新开盘后会立刻恢复推送。

期货品种的 Symbol 代码是什么格式?

Infoway 对商品和外汇品种统一走 /common 接口,Symbol 直接用交易代码,例如黄金 XAUUSD、WTI 原油 USOIL、布伦特 UKOIL、天然气 NATGAS。可以用 client.basic.get_symbols("common") 获取完整品种列表。

多品种同时订阅 WebSocket,一个连接能承载多少标的?

subscribe_tradecodes 参数支持逗号分隔多个标的,但股票和外汇/商品需要分别建立连接(business 参数不同)。在单个连接内可以订阅数十个标的,具体上限根据套餐级别有所不同。

如何判断一根 K 线是”已完成”还是”进行中”的实时 K 线?

WebSocket 推送的 PUSH_KLINE 消息中包含当前周期内的实时更新,在 REST 接口获取历史 K 线时,最后一根通常是未完成的当前 K 线。策略回测时应跳过最后一根 K 线,只用已完成的历史 K 线生成信号。

股票、外汇、期货三类数据源各有特性:股票需要处理复权和涨跌停规则,外汇需要建模点差和 7×24 小时连续性,期货需要处理连续合约的换月问题。

共同点是:所有量化策略都对数据的一致性实时性高度敏感。选择一个字段格式统一、多市场覆盖完整、WebSocket 连接稳定的数据接口,可以让你把精力集中在策略逻辑本身,而不是数据清洗和格式对齐。

Infoway Python SDK(pip install infoway-sdk)通过统一的 client.stockclient.commonclient.crypto 模块,以相同的方法签名覆盖上述三类市场的 REST 和 WebSocket 接入,适合作为多市场量化策略的统一数据层。