2 回答

TA貢獻(xiàn)1804條經(jīng)驗(yàn) 獲得超8個(gè)贊
我設(shè)法添加了一個(gè) http 服務(wù)器作為協(xié)程。首先我嘗試使用 aiohttp,但最終我找到了 Quart(與 Flask 相同,但它使用 Asyncio)。在 Quart 上運(yùn)行 http 服務(wù)器的示例代碼:
import quart
from quart import request
import json
import time
app = quart.Quart(__name__)
def resp(code, data):
return quart.Response(
status=code,
mimetype="application/json",
response=to_json(data)
)
def to_json(data):
return json.dumps(data) + "\n"
@app.route('/')
def root():
return quart.redirect('/api/status2')
@app.errorhandler(400)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(404)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(405)
def page_not_found(e):
return resp(405, {})
@app.route('/api/status2', methods=['GET'])
def get_status():
timestamp = request.args.get("timestamp")
delay = request.args.get("delay")
if timestamp:
return resp(200, {"time": time.time()})
elif delay:
return resp(200, {"test is:": '1'})
else:
return resp(200, {"", "ask me about time"})
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5000)
為了將此代碼添加為協(xié)同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像這樣更改了我的問(wèn)題中的代碼:
async def launcher_main():
while True:
ans_inc, ans_out = await get_some_data()
asyncio.ensure_future(process(ans_inc, ans_out))
async def main():
await asyncio.gather(launcher_main(),
restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))
剩下的最后一個(gè)問(wèn)題是使“formatdf”類(lèi)對(duì)象的可用參數(shù)到我的 http 服務(wù)器。我已經(jīng)實(shí)現(xiàn)了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函數(shù)添加行。從 quart 調(diào)用它:
elif button:
return resp(200, {"ok": app.config["formatdf"].attr})

TA貢獻(xiàn)1845條經(jīng)驗(yàn) 獲得超8個(gè)贊
我只需要為我的應(yīng)用程序解決這個(gè)問(wèn)題。這是在現(xiàn)有異步應(yīng)用程序中運(yùn)行 aiohttp 服務(wù)器的方法。
https://docs.aiohttp.org/en/stable/web_lowlevel.html
import asyncio
from aiohttp import web
async def web_server():
print(f'Configuring Web Server')
app = web.Application()
app.add_routes([web.get('/hello', web_hello)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner)
print(f'Starting Web Server')
await site.start()
print(f'Web Server Started')
#run forever and ever
await asyncio.sleep(100*3600)
async def web_hello(request):
return web.Response(text="Hello World")
async def main():
tasks = []
#Web Server
tasks.append(asyncio.create_task(web_server()))
results = await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
添加回答
舉報(bào)