2 回答

TA貢獻(xiàn)1802條經(jīng)驗 獲得超10個贊
我遇到了這個答案,并查看了 aiostream 庫。這是我想出的用于合并多個異步生成器的代碼。它不使用任何庫。
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
希望這對您有所幫助,如果有任何錯誤,請告訴我。
添加回答
舉報