今天和大家分享一个最近做的小项目,通过Websocket获取实时股票价格,并结合 Telegram Bot 实现实时价格预警。你可能会问,为什么监控特斯拉的股票?好吧,因为我套在里面了。。。当然这个脚本可以根据你自己的喜好来调整,只要上游接口支持的数据,都可以监控。
下面是代码:
import asyncio
import json
import websockets
import requests
from telegram import Bot
# WebSocket的订阅地址,这里使用的是infoway.io的数据,可以根据你自己的情况调整
WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
# Telegram Bot Token 和 Chat ID
TELEGRAM_TOKEN = 'your_telegram_bot_token'
CHAT_ID = 'your_telegram_chat_id'
# 设置预警价格阈值(根据你的实际情况调整)
ALERT_THRESHOLD = 700 # 当特斯拉股价超过 700 时发送预警
# 创建 Telegram Bot 实例
bot = Bot(token=TELEGRAM_TOKEN)
# 发送 Telegram 消息
def send_telegram_message(message):
bot.send_message(chat_id=CHAT_ID, text=message)
# WebSocket 连接和接收数据
async def connect_and_receive():
async with websockets.connect(WS_URL) as websocket:
# 发送初始订阅请求
init_message = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "TSLA.US"} # 订阅特斯拉股票
}
await websocket.send(json.dumps(init_message))
# 设置 ping 心跳任务,防止ws断开
async def send_ping():
while True:
await asyncio.sleep(30)
ping_message = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
await websocket.send(json.dumps(ping_message))
# 启动 ping 心跳任务
ping_task = asyncio.create_task(send_ping())
try:
# 持续接收股票数据
while True:
message = await websocket.recv()
data = json.loads(message)
# 从上游接口返回的数据中提取价格信息
if "data" in data and "price" in data["data"]:
price = data["data"]["price"]
print(f"Current TSLA Price: {price}")
# 检查是否超过预警阈值
if price > ALERT_THRESHOLD:
alert_message = f"⚠️ Pre-warning: TSLA stock price exceeded {ALERT_THRESHOLD}. Current price: ${price}."
send_telegram_message(alert_message)
else:
print("Error: Data format is not as expected")
except websockets.exceptions.ConnectionClosedOK:
print("Connection closed normally")
finally:
# 取消 ping 任务
ping_task.cancel()
# 运行主函数
asyncio.run(connect_and_receive())
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優(yōu)質文章
正在加載中
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦