我是異步新手。嘗試創(chuàng)建一個異步裝飾器來阻塞 I/O 代碼。def singletonAsyncMaker(func): async def inner(obj, ): loop = asyncio.get_event_loop() tasks = (loop.run_in_executor(None, func, i) for i in obj) return await asyncio.gather(*tasks) def main(obj): return asyncio.run(inner(obj)) return main@singletonAsyncMakerdef sleeper(obj): sleep(2) return objx = sleeper([8, 5, 6, 6, 4645, 63, 4, 6, 1, 64, 614, 24, 65, ])print(x)這通常運行良好,并且在正常的非異步函數(shù)中也運行良好,但是當(dāng)通過 Sanic 服務(wù)器的路由函數(shù)調(diào)用時,它會拋出此錯誤。Traceback (most recent call last): File "/app.py", line 99, in extractImageResponse result = perform_textract_bytestream(images) File "async_experiment.py", line 51, in main return asyncio.run(inner(obj)) File "env/lib/python3.8/asyncio/runners.py", line 33, in run raise RuntimeError(RuntimeError: asyncio.run() cannot be called from a running event loop@app.route("/v0/xxxxxxxxxxxxxxx", methods=['POST'])def function_name(request): ----------------- x = sleeper(list) ----------------- return jsonify(json_op)嘗試將async添加到 sanicfunction_name定義并在調(diào)用之前等待sleeper(),但沒有成功。我知道 sanic 可以與 asyncio 一起使用,這對此有幫助嗎?有什么修復(fù)或解決方法嗎?
1 回答

九州編程
TA貢獻(xiàn)1785條經(jīng)驗 獲得超4個贊
無需asyncio.run在 Sanic 內(nèi)部調(diào)用。
@app.route("/v0/xxxxxxxxxxxxxxx", methods=['POST'])
async def function_name(request):
await call_to_some_coroutine()
添加回答
舉報
0/150
提交
取消