上一篇文章里,我们整理了 yfinance 最常见的 12 类问题,比如空 DataFrame、时区错误、MultiIndex 列处理、分钟数据的历史深度限制等等。如果你照着方案改,能解决单只股票的数据下载问题,但一到批量场景就又崩了,因为yfinance的429限速问题,始终是绕不过的门槛。

这篇文章专门讲这个问题:当你需要同时下载几十支、上百支股票的历史 K 线时,yfinance 为什么一定会被限速,以及真正可靠的解决方案是什么。

为什么 yfinance 会被限速

在上一篇文章中我们已经介绍过,yfinance 并不是雅虎财经的官方 API,它只是一个开源库,原理是通过浏览器请求来抓取雅虎财经的网页数据。

也就是说 yfinance 其实并不是正规军,它没有正经的授权,而雅虎财经本身就有反爬的机制,一旦发现高频的请求(非人类正常操作),马上会触发速率限制。如果雅虎财经对接口的结构做了修改,立马会导致yfinance失效。

这是 yfinance 的先天局限,不是你代码的问题。

限速之后,你会看到哪些报错

1. HTTP 429 Too Many Requests

最常见的限速信号:

Python
HTTPError: 429 Client Error: Too Many Requests

或者 yfinance 在底层自动重试几次后,以更含糊的错误返回:

Python
Failed to get ticker 'AAPL' reason: HTTPError 429

2. Connection Reset / ConnectionError

限速更严重时,Yahoo Finance 会直接断开连接:

Python
ConnectionError: ('Connection aborted.', ConnectionResetError(10054, ...))

这种报错出现时,通常再等几分钟也无法恢复,必须更换 IP 或等待更长时间的封锁解除。

3. curl_cffi session 报错(yfinance 1.x 新版本特有)

如果你使用的是 yfinance 1.0 及以上版本,有时会看到:

Python
yahoo api requires curl_cffi session not <class 'requests.sessions.session'>.
please stop passing session (or switch to curl_cffi)

这是 yfinance 1.x 切换底层 HTTP 库引入的破坏性变更。解决方式是不要手动传入 session 参数,让 yfinance 自己处理:

Python
# ❌ 旧写法,1.x 版本会报错
import requests
session = requests.Session()
df = yf.download("AAPL", session=session)

# ✅ 1.x 版本的正确写法
import yfinance as yf
df = yf.download("AAPL")  # 不传 session

4. 返回空 DataFrame,无任何报错

最难排查的情况是 yfinance 根本没有抛出异常,但返回的 DataFrame 是空的。这通常是在限速接近边界时 Yahoo Finance 返回了空响应,而 yfinance 没有把它识别为错误:

Python
df = yf.download("TSLA", period="1y")<br>print(df.shape)   # (0, 5)  ← 空数据

临时缓解方案(能用,但不治本)

如果你只是偶尔需要批量下载,以下方法可以减少触发限速的概率。

方案 A:请求之间加随机延迟

Python
import yfinance as yf<br>import time<br>import random<br><br>symbols = ["AAPL", "TSLA", "NVDA", "MSFT", "GOOGL"]<br><br>results = {}<br>for sym in symbols:<br>    try:<br>        df = yf.download(sym, period="1y", progress=False)<br>        if not df.empty:<br>            results[sym] = df<br>            print(f"✓ {sym}: {len(df)} 行")<br>        else:<br>            print(f"✗ {sym}: 返回空数据")<br>    except Exception as e:<br>        print(f"✗ {sym}: {e}")<br>    # 每次请求之间等待 2~5 秒,随机化避免被识别<br>    time.sleep(random.uniform(2, 5))

方案 B:分批 + 指数退避重试

Python
import yfinance as yf
import time

def download_with_retry(symbol: str, period: str = "1y", max_retries: int = 3) -> object:
    for attempt in range(max_retries):
        try:
            df = yf.download(symbol, period=period, progress=False)
            if not df.empty:
                return df
        except Exception as e:
            if "429" in str(e):
                wait = (2 ** attempt) * 10  # 10s → 20s → 40s
                print(f"  限速,等待 {wait} 秒后重试...")
                time.sleep(wait)
            else:
                raise
    return None

symbols = ["AAPL", "TSLA", "NVDA", "MSFT", "GOOGL",
           "AMZN", "META", "JPM", "BAC", "WMT"]

# 每批 5 支,批次间停 30 秒
BATCH_SIZE = 5
BATCH_WAIT = 30

for i in range(0, len(symbols), BATCH_SIZE):
    batch = symbols[i:i + BATCH_SIZE]
    print(f"\n--- 批次 {i // BATCH_SIZE + 1}: {batch} ---")
    for sym in batch:
        df = download_with_retry(sym)
        if df is not None:
            print(f"  ✓ {sym}: {len(df)} 行")
    if i + BATCH_SIZE < len(symbols):
        print(f"  批次结束,等待 {BATCH_WAIT} 秒...")
        time.sleep(BATCH_WAIT)

这两个方案的局限是显而易见的:下载100支股票可能需要一个小时以上 😢,且在网络环境差或请求较集中时仍然会失败。如果是每天需要自动更新数据的生产任务,这套方案的可靠性不足。

根本解决方案:切换到稳定的行情 API

yfinance 香是真的香,毕竟免费,但限速问题本质上是无解的,只能不同方案进行测试,想长期稳定从yfinance获取数据还是有点困难。

真正的解决方式,是使用有 API Key 认证、有明确速率限制文档、为开发者设计的行情数据接口。

Infoway API 是专为量化和金融开发者设计的行情数据服务,提供 A 股、港股、美股、日股、韩股等多市场的历史 K 线和实时行情数据。认证方式简单:在请求头加一个 apiKey 字段,就可以开始调用。

用 Infoway API 批量导出 K 线数据

K 线接口说明

接口地址: POST https://data.infoway.io/stock/v2/batch_kline

支持 A 股、港股、美股的混合查询,也支持日本、韩国和印度股票、外汇、期货,但在不同的端点,具体可以看这份文档

请求参数:

参数类型说明
codesstring股票代码,多个用逗号分隔
klineTypeintK 线周期(见下表)
klineNumint每次返回的 K 线根数,建议 ≤ 500
timestampint可选,Unix 秒,用于翻页获取更早数据

klineType 取值:

周期周期
11 分钟62 小时
25 分钟74 小时
315 分钟8日 K
430 分钟9周 K
51 小时10月 K

基础版:下载多支股票日 K 线

Python
import requests
import pandas as pd
import json

API_KEY  = "YOUR_API_KEY"
ENDPOINT = "https://data.infoway.io/stock/v2/batch_kline"
HEADERS  = {
    "Content-Type": "application/json",
    "apiKey": API_KEY,
}

def fetch_klines(codes: list[str], kline_type: int = 8, num: int = 250) -> dict[str, pd.DataFrame]:
    """
    批量获取多支股票的 K 线数据,返回以股票代码为键的 DataFrame 字典。
    kline_type=8 为日 K,num=250 约为一年交易日。
    """
    payload = {
        "codes": ",".join(codes),
        "klineType": kline_type,
        "klineNum": num,
    }
    resp = requests.post(ENDPOINT, headers=HEADERS, data=json.dumps(payload))
    resp.raise_for_status()
    data = resp.json().get("data", [])

    result = {}
    for item in data:
        symbol = item["s"]
        candles = item.get("respList", [])
        if not candles:
            continue
        df = pd.DataFrame(candles)
        df.columns = ["time", "open", "high", "low", "close", "volume", "amount", "pct_chg", "chg"]
        df[["open", "high", "low", "close", "volume", "amount"]] = \
            df[["open", "high", "low", "close", "volume", "amount"]].astype(float)
        df["datetime"] = pd.to_datetime(df["time"].astype(int), unit="s")
        df = df.sort_values("datetime").reset_index(drop=True)
        result[symbol] = df[["datetime", "open", "high", "low", "close", "volume", "amount"]]

    return result

# 示例:美股 + 港股 + A 股混合查询
symbols = ["AAPL.US", "TSLA.US", "NVDA.US", "00700.HK", "600519.SH"]
data = fetch_klines(symbols, kline_type=8, num=250)

for sym, df in data.items():
    print(f"{sym}: {len(df)} 根日 K  最新收盘价: {df['close'].iloc[-1]}")

进阶版:批量下载 + 自动保存 CSV

Python
import requests
import pandas as pd
import json
import os
from datetime import datetime

API_KEY  = "YOUR_API_KEY"
ENDPOINT = "https://data.infoway.io/stock/v2/batch_kline"
HEADERS  = {"Content-Type": "application/json", "apiKey": API_KEY}

OUTPUT_DIR = "kline_export"
os.makedirs(OUTPUT_DIR, exist_ok=True)


def fetch_klines_batch(codes: list[str], kline_type: int = 8, num: int = 500) -> dict[str, pd.DataFrame]:
    payload = {"codes": ",".join(codes), "klineType": kline_type, "klineNum": num}
    resp = requests.post(ENDPOINT, headers=HEADERS, data=json.dumps(payload), timeout=30)
    resp.raise_for_status()

    result = {}
    for item in resp.json().get("data", []):
        candles = item.get("respList", [])
        if not candles:
            continue
        df = pd.DataFrame(candles,
                          columns=["time", "open", "high", "low", "close",
                                   "volume", "amount", "pct_chg", "chg"])
        df[["open", "high", "low", "close", "volume", "amount"]] = \
            df[["open", "high", "low", "close", "volume", "amount"]].astype(float)
        df["datetime"] = pd.to_datetime(df["time"].astype(int), unit="s")
        df.sort_values("datetime", inplace=True)
        result[item["s"]] = df.reset_index(drop=True)
    return result


def export_to_csv(symbol: str, df: pd.DataFrame, kline_label: str = "daily") -> str:
    filename = f"{symbol.replace('.', '_')}_{kline_label}.csv"
    filepath = os.path.join(OUTPUT_DIR, filename)
    df.to_csv(filepath, index=False)
    return filepath


# ── 主程序 ──────────────────────────────────────────────────────────────────

# 要下载的股票列表(A股/港股/美股均可混合)
WATCHLIST = [
    "AAPL.US", "TSLA.US", "NVDA.US", "MSFT.US", "GOOGL.US",
    "AMZN.US", "META.US", "JPM.US", "V.US", "WMT.US",
    "00700.HK", "09988.HK", "03690.HK",
    "600519.SH", "300750.SZ", "000858.SZ",
]

KLINE_TYPE  = 8    # 日 K
KLINE_NUM   = 500  # 约两年
BATCH_SIZE  = 10   # 每次请求不超过 10 支,控制单次请求体积

print(f"开始批量导出,共 {len(WATCHLIST)} 支股票")
print(f"K 线周期:日K,每支最多 {KLINE_NUM} 根")
print(f"输出目录:{os.path.abspath(OUTPUT_DIR)}\n")

success, failed = [], []

for i in range(0, len(WATCHLIST), BATCH_SIZE):
    batch = WATCHLIST[i:i + BATCH_SIZE]
    print(f"处理批次 {i // BATCH_SIZE + 1}/{-(-len(WATCHLIST) // BATCH_SIZE)}: {batch}")
    try:
        data = fetch_klines_batch(batch, kline_type=KLINE_TYPE, num=KLINE_NUM)
        for sym, df in data.items():
            path = export_to_csv(sym, df)
            print(f"  ✓ {sym}: {len(df)} 根  → {path}")
            success.append(sym)
        missing = [s for s in batch if s not in data]
        for sym in missing:
            print(f"  ✗ {sym}: 接口未返回数据")
            failed.append(sym)
    except requests.HTTPError as e:
        print(f"  请求失败: {e}")
        failed.extend(batch)

print(f"\n导出完成:成功 {len(success)} 支,失败 {len(failed)} 支")
if failed:
    print(f"未能获取数据的标的:{failed}")

翻页版:获取更长历史(3 年以上日 K)

Infoway API 支持通过 timestamp 参数向前翻页,可以拉取完整历史数据:

Python
import requests
import pandas as pd
import json

API_KEY  = "YOUR_API_KEY"
ENDPOINT = "https://data.infoway.io/stock/v2/batch_kline"
HEADERS  = {"Content-Type": "application/json", "apiKey": API_KEY}


def fetch_full_history(symbol: str, kline_type: int = 8) -> pd.DataFrame:
    """翻页拉取某支股票的全部历史日 K 线。"""
    all_candles = []
    cursor = None

    while True:
        payload = {"codes": symbol, "klineType": kline_type, "klineNum": 500}
        if cursor:
            payload["timestamp"] = cursor

        resp = requests.post(ENDPOINT, headers=HEADERS, data=json.dumps(payload), timeout=30)
        resp.raise_for_status()

        items = resp.json().get("data", [])
        candles = next((it.get("respList", []) for it in items if it["s"] == symbol), [])
        if not candles:
            break

        all_candles.extend(candles)

        # respList 返回顺序为最新在前,取最后一根的时间戳继续往前翻
        oldest_ts = int(candles[-1]["t"])
        if cursor and oldest_ts >= cursor:
            break   # 没有更早的数据了
        cursor = oldest_ts

    if not all_candles:
        return pd.DataFrame()

    df = pd.DataFrame(all_candles,
                      columns=["time", "open", "high", "low", "close",
                               "volume", "amount", "pct_chg", "chg"])
    df[["open", "high", "low", "close", "volume", "amount"]] = \
        df[["open", "high", "low", "close", "volume", "amount"]].astype(float)
    df["datetime"] = pd.to_datetime(df["time"].astype(int), unit="s")
    df.drop_duplicates("datetime", inplace=True)
    df.sort_values("datetime", inplace=True)
    return df.reset_index(drop=True)


# 下载苹果全部历史日 K,并保存
df = fetch_full_history("AAPL.US", kline_type=8)
print(f"AAPL.US: 共 {len(df)} 根日 K,最早 {df['datetime'].iloc[0].date()},最新 {df['datetime'].iloc[-1].date()}")
df.to_csv("AAPL_US_full_history.csv", index=False)

yfinance vs Infoway API:核心对比

对比项yfinanceInfoway API
数据来源爬取 Yahoo Finance 网页直连交易所/行情商
是否有官方授权有(API Key 认证)
限速风险高,批量请求必触发低,有明确速率文档
数据延迟非实时,最快 15 分钟延迟实时(WebSocket < 500ms)
市场覆盖主要是美股,部分 A 股A股 / 港股 / 美股 / 日股 / 韩股等
分钟 K 历史深度1 分钟仅 7 天,5 分钟仅 60 天分钟级数据最长 3 年
批量下载容易触发 429,需要加大量延迟支持单次多标的并发请求
稳定性依赖 Yahoo 接口,随时可能失效SLA 保障,接口版本稳定
费用免费7 天免费试用,后付费

FAQ

yfinance 被限速后等多久可以恢复?

通常等待 15–30 分钟后可以恢复普通请求。如果触发了更严重的封锁(大量请求集中在短时间内),可能需要 1–2 小时,或者更换网络环境(切换 IP)。

yfinance 有没有办法永久解决限速问题?

没有。yfinance 本质上是对 Yahoo Finance 网页数据的抓取,Yahoo Finance 没有为开发者提供官方批量接口,也没有承诺速率限制上限。加延迟、分批、换 IP 都是缓解手段,不能根除问题。

Infoway API 支持哪些 K 线周期?

支持 12 个周期:1/5/15/30 分钟、1/2/4 小时、日、周、月、季、年。分钟级历史数据可追溯 3 年,日线及以上无上限。

用 Infoway API 批量下载 100 支股票的日 K,需要多少时间?

每次请求可以包含多个标的(建议每批 10–20 支),100 支股票分 5–10 批即可完成,整个过程通常在 30 秒内完成,无需担心限速。

Infoway API 支持 A 股历史 K 线吗?包括复权数据吗?

支持 A 股(沪深北三所)历史 K 线。关于复权:接口返回的是未复权原始价格,复权因子需要通过配套的 adjust_factor 字段手动处理,或在策略层自行计算前/后复权价格。

yfinance 获取 A 股数据可靠吗?

可靠性有限。通过 600519.SS(注意 yfinance 用 .SS 而不是 .SH)能获取部分 A 股日线历史,但数据完整性和实时性均无保障,且分钟级数据基本不可用。如果业务依赖 A 股数据,建议直接使用专为中国市场设计的数据接口。