一、准备工作
秘钥申请:infoway.io
接口类型: 实时行情接口
支持品种:A股、港股、美股、贵金属期货、外汇、加密货币
请求方式:HTTP、WebSocket低延时推送
二、WebSocket查询美股实时行情
import asyncio import json import websockets # 美股行情的websocket订阅地址 WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey" # 请先在官网https://infoway.io 申请免费API key async def connect_and_receive(): async with websockets.connect(WS_URL) as websocket: # 发送初始化消息,这里订阅的是苹果股票的1分钟K线数据 init_message = { "code": 10004, # K线请求协议号 "trace": "423afec425004bd8a5e02e1ba5f9b2b0", # 可追溯ID(随机字符串) "data": { "arr": [ { "type": 1, # 1分钟K线 "codes": "AAPL" # 订阅的股票代码 } ] } } await websocket.send(json.dumps(init_message)) # 设置ping任务 async def send_ping(): while True: await asyncio.sleep(30) ping_message = { "code": 10010, "trace": "423afec425004bd8a5e02e1ba5f9b2b0" } await websocket.send(json.dumps(ping_message)) # 启动ping任务协程 ping_task = asyncio.create_task(send_ping()) try: # 持续接收消息 while True: message = await websocket.recv() print(f"Message received: {message}") except websockets.exceptions.ConnectionClosedOK: print("Connection closed normally") finally: # 取消ping任务 ping_task.cancel() # 运行主函数 asyncio.run(connect_and_receive())
返回示例
{ "c": "150.00", // 当前价格 "h": "150.20", // 最高价 "l": "149.80", // 最低价 "o": "149.90", // 开盘价 "pca": "0.00", // 价格变化 "pfr": "0.00%", // 价格变化百分比 "s": "AAPL", // 股票代码 "t": 1747550648097, // 时间戳 "ty": 1, // K线类型:1 表示1分钟K线 "v": "0.34716", // 交易量 "vw": "35923.5149678" // 加权平均价格 }
三、查询盘口数据
import asyncio import json import websockets WS_URL = "wss://data.infoway.io/ws?business=stock&apikey=yourApiKey" # 请先在官网https://infoway.io 申请免费API key async def connect_and_receive(): async with websockets.connect(WS_URL) as websocket: # 发送初始化消息,订阅的是苹果股票的盘口数据 init_message = { "code": 10002, # 盘口订阅的请求协议号 "trace": "423afec425004bd8a5e02e1ba5f9b2b0", # 可追溯ID(随机字符串) "data": { "codes": "AAPL" # 订阅的股票代码,这里是苹果股票AAPL } } await websocket.send(json.dumps(init_message)) # 设置ping任务 async def send_ping(): while True: await asyncio.sleep(30) ping_message = { "code": 10010, "trace": "423afec425004bd8a5e02e1ba5f9b2b0" } await websocket.send(json.dumps(ping_message)) # 启动ping任务协程 ping_task = asyncio.create_task(send_ping()) try: # 持续接收消息 while True: message = await websocket.recv() print(f"Message received: {message}") except websockets.exceptions.ConnectionClosedOK: print("Connection closed normally") finally: # 取消ping任务 ping_task.cancel() # 运行主函数 asyncio.run(connect_and_receive())
返回示例
{ "a": [ [ "150.00", // 卖盘价格1 "150.10", // 卖盘价格2 "150.20", // 卖盘价格3 "150.30", // 卖盘价格4 "150.40" // 卖盘价格5 ], [ "10.0000", // 卖盘数量1 "5.0000", // 卖盘数量2 "3.0000", // 卖盘数量3 "1.0000", // 卖盘数量4 "2.0000" // 卖盘数量5 ] ], "b": [ [ "149.90", // 买盘价格1 "149.80", // 买盘价格2 "149.70", // 买盘价格3 "149.60", // 买盘价格4 "149.50" // 买盘价格5 ], [ "8.0000", // 买盘数量1 "4.0000", // 买盘数量2 "2.5000", // 买盘数量3 "1.5000", // 买盘数量4 "3.0000" // 买盘数量5 ] ], "s": "AAPL", // 股票代码 "t": 1747553102161 // 时间戳 }
點(diǎn)擊查看更多內(nèi)容
為 TA 點(diǎn)贊
評(píng)論
評(píng)論
共同學(xué)習(xí),寫下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦