印度这个神奇的国度,不仅有干净又卫生的街头美食,它还是亚洲增速最快的资本市场之一,无论是交易量、投资者增长,还是IPO数量,近年来都呈现出明显的上升趋势。针对印度市场的金融交易平台在近几年也越来越多,我们将在本文介绍如何接入印度股票的实时行情,如果您正在开发印度市场的股票交易平台,一定不要错过!

一、印度股票市场总览

我们先来看看印度股票市场的基本情况。

印度证券交易委员会(SEBI)目前认可并监管的交易所总共有9家,涵盖股权、衍生品、商品与国际证券等多个品类:

序号交易所简称主要类型总部/位置活跃度/特色总体市场份额
1NSE股权+衍生品孟买交易量最大,衍生品霸主>90%
2BSE股权+衍生品孟买历史最悠久,上市公司最多<10%
3India INX国际/外币证券GIFT City全球投资者入口,延长交易时段<1%
4NSE IFSC国际衍生品GIFT CityNSE的国际版<1%
5MSEI股权+货币孟买第三大,但份额微小<1%
6CSE股权(有限)加尔各答历史交易所,基本不活跃近0%(几乎无实质交易)
7MCX商品衍生品孟买金属+能源主力商品衍生品市场~97-98%(非股权,不计入股票市场份额)
8NCDEX农产品商品孟买农业期货专营商品衍生品市场~2-3%(农产品主力,非股权)
9ICEX商品衍生品纳维孟买细分商品(如钻石)商品衍生品市场<1%(细分领域,非股权)

对于需要接入印度市场数据的开发者与机构而言,真正值得关注的核心交易所有两家:

NSE(国家证券交易所):印度交易量最大的交易所,衍生品市场份额超过90%,是绝大多数量化策略与实时数据需求的首选数据源。

BSE(孟买证券交易所):成立于1875年,是亚洲历史最悠久的交易所,股权市场份额虽不及NSE,但在部分品种上仍具不可替代性。

截至2025年末,BSENSE的上市公司数量合计超过5,300家,交易量占据了印度接近98%的市场份额。NSE的衍生品日成交量更是长期位居全球前三,妥妥的印度金融霸主。事实上,NSE本身的估值一度高达4.7万亿卢比(约合五百亿美元),是印度最值钱的未上市公司,在全球私人公司中排第10位。

这两家交易所均采用印度标准时间(IST,UTC+5:30),正常交易时段为周一至周五上午9:15至下午3:30,预开盘阶段为9:00至9:15。

二、印度股票实时行情API成本对比

理论上,开发者可以直接向NSE或BSE申请数据授权,获取一手实时行情。但现实情况是:这条路几乎只对大型机构开放。因为实在太贵了!

以下是直连两家交易所的核心费用对比:

费用项目NSEBSE
股票L1实时(年固定费)约 176,640 元约 73,600 元
期货与期权L1(年固定费)约 176,640 元约 36,800 元
L2深度行情(年固定费)约 276,000 元约 73,600 元
终端用户分发费(月/用户)约 603–809 元约 478–846 元
备份链路约 14,720 元/条/年
GST(增值税)额外 18%额外 18%

以上价格均已换算成人民币。

仅NSE股票+衍生品L1基础授权,年度固定成本约在36万至58万元人民币之间,叠加用户费、专线接入、GST及备份冗余,实际全年总成本轻松突破百万元。BSE相对较低,但L1股票实时授权也需7万至18万元起步。

此外,直连还要求申请方具备完整的企业资质、签署复杂的数据协议,并自行搭建专线网络接入交易所数据中心。对于中小型团队或个人开发者而言,这条路在经济上和操作上都几乎不现实。

免费的第三方API(如Yahoo Finance)虽然存在,但数据延迟普遍在10至15分钟以上,而且频繁封IP,根本无法满足实时交易或实时应用的需求。

正是因为直连成本过高、门槛过硬,市场上出现了一批专业的数据服务商,他们统一采购交易所授权数据,再以更合理的价格向下游开发者分发。这种模式在全球主要市场(美股、欧股、港股)早已成熟,印度市场也正在走向同样的路径。

三、Infoway API印度股票实时行情接口

Infoway API面向开发者提供印度股票实时行情数据。可通过接口查询:

特点:

对大多数开发者来说,Infoway API 是性价比最高的选择,能在不承担高额成本的前提下快速搭建实时行情应用。Infoway API印度股票行情接口包含NSE与BSE所有挂牌的股票(约5300只个股)。

除了印度股票以外,Infoway API还提供以下市场的实时数据:

  1. A股、港股、美股、日本股票
  2. 外汇货币对
  3. 加密货币
  4. 商品期货、CFD

四、印度股票API使用教程

Infoway API的接口使用非常简单,先在官网注册账号,注册完自动获得API Key → 点我注册,在查询数据的时候需要带上API Key。下面来看看如何查询印度股票数据。

4.1 成交明细

成交明细(Last trade)查询地址:

Python
https://data.infoway.io/india/batch_trade/{codes}

需要在codes中传入对应的股票代码,建议在我们的官网下载股票代码列表,里面有所有的印度股票代码(包含NSE与BSE)。

下面是成交明细的请求示例:

Python
import requests

api_url = 'https://data.infoway.io/india/batch_trade/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

返回示例:

Python
{
  "s": "INFY.IN",
  "respList": [
    {
      "t": "1773037920",
      "h": "1303.40",
      "o": "1301.50",
      "l": "1301.50",
      "c": "1303.00",
      "v": "15615.0",
      "vw": "20342210.20",
      "pc": "0.15%",
      "pca": "1.90"
    }
  ]
}

字段说明:

字段说明
s股票代码
t秒时间戳(UTC+8)
p交易价格
v成交量
vw成交额
td交易方向 1:BUY 2:SELL 0:默认值

4.2 实时K线

K线请求地址:

Python
https://data.infoway.io/india/v2/batch_kline/{klineType}/{klineNum}/{codes}

参数说明:

  • klineType 指的是K线的周期,这里传入1,返回1分钟K,传入2,返回5分钟K,具体请查看K线查询教程
  • klineNum 指的是需要返回的K线数量,单只股票查询最多可一次要求返回500根最近的K线
  • codes 代表股票代码

示例:查询InfoSys(股票代码INFY.IN)1分钟K线,返回最近的10根

Python
import requests

api_url = 'https://data.infoway.io/india/v2/batch_kline/1/10/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

返回示例:

Python
{
  "s": "INFY.IN",
  "respList": [
    {
      "t": "1773037920",
      "h": "1303.40",
      "o": "1301.50",
      "l": "1301.50",
      "c": "1303.00",
      "v": "15615.0",
      "vw": "20342210.20",
      "pc": "0.15%",
      "pca": "1.90"
    }
  ]
}

返回字段说明:

字段说明
t秒时间戳(UTC+8)
h最高价
o开盘价
l最低价
C收盘价
v成交量
vw成交额
pc涨跌幅
pca涨跌额

4.3 盘口深度

Infoway API提供全印度一档盘口深度,请求地址:

Python
https://data.infoway.io /india/batch_depth/{codes}

同样在cdoes中传入股票代码即可:

Python
import requests

api_url = 'https://data.infoway.io /india/batch_depth/INFY.IN'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'YOUR_API_KEY_HERE'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

盘口返回示例:

Python
{
  "s": "INFY.IN",
  "t": 1773037524808,
  "a": [
    [
      "1302.0",
    ],
    [
      "1.0",
    ]
  ],
  "b": [
    [
      "1301.9",
    ],
    [
      "172.0",
    ]
  ]
}

a为买盘,包含买一价和买一量,b为卖盘,包含卖一价和卖一量。

4.4 WebSocket订阅全印度股票实时行情

Infoway API支持WebSocket订阅,可订阅全印度市场的实时K线、实时成交、以及实时盘口,非常适合交易所用户。

下面WebSocket代码示例,包含断线重连机制:

Python
import asyncio
import json
import uuid
import logging
from typing import Optional
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("websocket-client")

class CryptoWebsocketClient:
    """加密货币行情WebSocket客户端(对等Java版本逻辑)"""
    
    def __init__(self, api_key: str, business: str = "crypto"):
        # WebSocket连接配置
        self.ws_url = f"wss://data.infoway.io/ws?business=india&apikey=YourAPIKey"
        # 核心状态
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.is_connected = False
        self.reconnect_interval = 10  # 重连间隔(秒)
        self.heartbeat_interval = 30  # 心跳间隔(秒)
        # 任务对象(用于管理异步任务)
        self.reconnect_task: Optional[asyncio.Task] = None
        self.heartbeat_task: Optional[asyncio.Task] = None

    async def connect(self) -> None:
        """建立WebSocket连接"""
        try:
            # 关闭旧连接(防止连接泄露)
            if self.ws and not self.ws.closed:
                await self.ws.close()
            
            # 建立新连接
            self.ws = await websockets.connect(self.ws_url)
            self.is_connected = True
            logger.info(f"WebSocket连接成功,地址: {self.ws_url}")
            
            # 连接成功后启动订阅和心跳
            await self._send_all_subscribe_requests()
            self._start_heartbeat_task()
        
        except WebSocketException as e:
            self.is_connected = False
            logger.error(f"WebSocket连接失败: {str(e)}")
            raise

    async def _send_all_subscribe_requests(self) -> None:
        """发送所有订阅请求(成交明细、盘口、K线)"""
        if not self.ws or self.ws.closed:
            logger.error("连接未建立,无法发送订阅请求")
            return
        
        try:
            # 1. 订阅实时成交明细(协议号10000)
            await self._send_trade_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 2. 订阅实时盘口数据(协议号10003)
            await self._send_depth_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 3. 订阅1分钟K线数据(协议号10006)
            await self._send_kline_subscribe()
            
        except Exception as e:
            logger.error(f"发送订阅请求失败: {str(e)}")
            raise

    def _generate_trace_id(self) -> str:
        """生成唯一trace ID(对等Java的UUID)"""
        return str(uuid.uuid4())

    async def _send_trade_subscribe(self) -> None:
        """发送实时成交明细订阅请求"""
        subscribe_msg = {
            "code": 10000,
            "trace": self._generate_trace_id(),
            "data": {"codes": "INFY.IN"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送成交明细订阅请求: {subscribe_msg}")

    async def _send_depth_subscribe(self) -> None:
        """发送实时盘口数据订阅请求"""
        subscribe_msg = {
            "code": 10003,
            "trace": self._generate_trace_id(),
            "data": {"codes": "INFY.IN"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送盘口数据订阅请求: {subscribe_msg}")

    async def _send_kline_subscribe(self) -> None:
        """发送1分钟K线数据订阅请求"""
        subscribe_msg = {
            "code": 10006,
            "trace": self._generate_trace_id(),
            "data": {
                "arr": [
                    {"type": 1, "codes": "INFY.IN"}  # type=1 表示1分钟K线
                ]
            }
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送K线数据订阅请求: {subscribe_msg}")

    async def _send_heartbeat(self) -> None:
        """发送心跳包(协议号10010)"""
        if not self.ws or self.ws.closed:
            return
        
        heartbeat_msg = {
            "code": 10010,
            "trace": self._generate_trace_id()
        }
        try:
            await self.ws.send(json.dumps(heartbeat_msg))
            logger.debug(f"发送心跳包: {heartbeat_msg}")
        except Exception as e:
            logger.error(f"发送心跳包失败: {str(e)}")
            raise

    def _start_heartbeat_task(self) -> None:
        """启动心跳任务(后台定时发送)"""
        if self.heartbeat_task and not self.heartbeat_task.done():
            self.heartbeat_task.cancel()
        
        async def heartbeat_loop():
            while self.is_connected and self.ws and not self.ws.closed:
                try:
                    await self._send_heartbeat()
                    await asyncio.sleep(self.heartbeat_interval)
                except Exception as e:
                    logger.error(f"心跳任务异常: {str(e)}")
                    break
        
        self.heartbeat_task = asyncio.create_task(heartbeat_loop())

    async def _message_listener(self) -> None:
        """监听服务端推送的消息"""
        while self.is_connected and self.ws and not self.ws.closed:
            try:
                # 阻塞等待接收消息
                message = await self.ws.recv()
                logger.info(f"收到服务端消息: {message}")
                
                # 解析消息并处理(对等Java的OnMessage逻辑)
                self._handle_received_message(message)
                
            except ConnectionClosed:
                logger.warning("WebSocket连接已关闭,停止消息监听")
                self.is_connected = False
                break
            except Exception as e:
                logger.error(f"接收/处理消息异常: {str(e)}")

    def _handle_received_message(self, message: str) -> None:
        """处理接收到的消息(按协议号分类)"""
        try:
            msg_data = json.loads(message)
            code = msg_data.get("code")
            trace = msg_data.get("trace")
            data = msg_data.get("data", {})
            
            if code == 10000:
                logger.info(f"处理成交明细数据 [trace={trace}]: {data}")
            elif code == 10003:
                logger.info(f"处理盘口数据 [trace={trace}]: {data}")
            elif code == 10006:
                logger.info(f"处理K线数据 [trace={trace}]: {data}")
            elif code == 10010:
                logger.debug(f"收到心跳响应 [trace={trace}]")
            else:
                logger.warning(f"未知协议号消息 [code={code}]: {message}")
                
        except json.JSONDecodeError:
            logger.error(f"消息格式错误,无法解析JSON: {message}")
        except Exception as e:
            logger.error(f"处理消息异常: {str(e)}")

    async def _reconnect_loop(self) -> None:
        """自动重连循环(对等Java的startReconnection)"""
        while True:
            if not self.is_connected:
                logger.info(f"尝试重连WebSocket(间隔{self.reconnect_interval}秒)...")
                try:
                    await self.connect()
                except Exception:
                    # 重连失败,等待后重试
                    await asyncio.sleep(self.reconnect_interval)
            else:
                # 连接正常,短暂等待后再次检查
                await asyncio.sleep(1)

    async def start(self) -> None:
        """启动客户端(主入口)"""
        # 启动自动重连任务
        self.reconnect_task = asyncio.create_task(self._reconnect_loop())
        
        try:
            # 首次连接
            await self.connect()
            
            # 启动消息监听
            await self._message_listener()
            
        finally:
            # 清理资源
            self.is_connected = False
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
            if self.reconnect_task:
                self.reconnect_task.cancel()
            if self.ws and not self.ws.closed:
                await self.ws.close()
            logger.info("WebSocket客户端已停止")

async def main():
    """主函数"""
    # 替换为你的实际API Key
    API_KEY = "yourApikey"
    client = CryptoWebsocketClient(api_key=API_KEY)
    await client.start()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("用户中断,退出客户端")
    except Exception as e:
        logger.error(f"客户端运行异常: {str(e)}")

以上仅展示订阅一只股票的方法,实际可同时订阅多个。 如需帮助请联系客服Telegram

五、常见问题

印度股票市场的交易时间是什么?

正常交易日(周一至周五,非印度公众假期):预开盘9:00–9:15,正常交易9:15–15:30,收盘竞价15:40–16:00。NSE与BSE时间完全一致,均为印度标准时间(IST,UTC+5:30)。

有没有免费的NSE/BSE实时行情API?

严格意义上没有官方免费的实时API。Yahoo Finance等免费数据源延迟10–15分钟或更长,不适用于实时应用。

什么是Muhurat Trading?

排灯节(Diwali)当晚的1小时特殊交易时段,象征吉祥开市、新财年开始。印度独有文化+金融结合,许多人视作买股票求好运。

什么是Nifty 50和Sensex?

Nifty 50是NSE的旗舰指数,追踪印度市值最大的50家上市公司;Sensex是BSE的基准指数,追踪30家最具代表性的蓝筹公司。两者走势高度相关,是观察印度股市整体表现的核心指标。

WebSocket接口适合什么场景?

适合需要低延迟实时推送的场景,例如实时行情展示、价格预警、自动化交易信号触发等。相比REST轮询,WebSocket可将有效延迟大幅压缩,同时降低服务器请求压力。

Infoway API支持试用吗?

支持。只需要在我们官网注册账号,自动获得7天试用,无需申请,无需绑定信用卡,无需实名认证。