1 回答

TA貢獻1821條經(jīng)驗 獲得超6個贊
您的代碼中有錯誤。實際上,queue.task_done()應(yīng)該只在從隊列中取出元素時調(diào)用,而不是在將它們放入隊列時調(diào)用。
但是您的中間件類正在它剛剛使用的隊列上調(diào)用它.put(),用于self._out列表中的最后一個隊列;queue.task_done()從DistributorMiddleware.distribute()以下位置刪除呼叫:
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
# ^^^^^ you didn't take anything from the queue here!
當(dāng)您刪除該行時,您的測試就通過了。
您在測試中看到異常的原因是因為只有這樣隊列才知道task_done()被調(diào)用得太頻繁了。該queue.task_done()呼叫DistributorMiddleware.distribute()減1,未完成的任務(wù)計數(shù)器,但只有當(dāng)該計數(shù)器下降到低于零能的異常進行檢測。只有當(dāng)最后一個任務(wù)從 中的隊列中取出時,您才會到達那個點test_distribution(),此時未完成的任務(wù)計數(shù)器至少提前一步達到 0。
也許那是為了改為調(diào)用self._in.task_done()?您只是在該while循環(huán)中從該隊列中獲取了一個元素:
async def distribute(self):
while True:
ele = await self._in.get()
# getting an element from self._in
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
self._in.task_done()
# done with ele, so decrement the self._in unfinished tasks counter
添加回答
舉報