第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Queue.asyncio ValueError: task_done() 調(diào)用了太多次

Queue.asyncio ValueError: task_done() 調(diào)用了太多次

qq_笑_17 2021-08-11 22:09:41
我實現(xiàn)了一段代碼,它從一個隊列中獲取一個元素,并將相同的對象從隊列列表中放入每個隊列中。問題是,當(dāng)我運行特定測試時,出現(xiàn)ValueError: task_done() called too many times異常。這個錯誤發(fā)生在測試代碼中,而不是在被測試的代碼中。我正在asyncio.Queue使用協(xié)程和編程。我將每Queue.get一個都與一個準確的Queue.task_done電話相匹配。我正在用pytest測試代碼。我正在使用以下庫:蟒蛇 3.7pytest==3.10.0pytest-asyncio==0.9.0我有兩個文件:middleware.py包含我的類實現(xiàn)和test_middleware.py實現(xiàn)pytest測試。文件middlware.py:import asyncioclass DistributorMiddleware:    def __init__(self, in_queue, out_list_queue):        self._in = in_queue        self._out = out_list_queue    async def distribute(self):        while True:            ele = await self._in.get()            count=0            for queue in self._out:                await queue.put(ele)                count+=1                print(f'inserted ele in {count}')            queue.task_done()            if ele == None:                break        for queue in self._out:            await queue.join()文件test_middleware.py:import pytestimport asyncio                from asyncio import Queuefrom middleware import DistributorMiddlewareimport randomimport os@pytest.mark.asyncio                                                                                     async def test_distribution(request, event_loop):                                                            q_list = [ Queue() for _ in range(10) ]                                                                  _in = Queue()    distrib = DistributorMiddleware(_in, q_list)                                                             event_loop.create_task(distrib.distribute())                                                             num_ele = random.randint(1, 10)    ele_set = set()    for _ in range(num_ele):                                                                                     ele = os.urandom(4)                                                                                      ele_set.add(ele)        await _in.put(ele)    await _in.put(None)                                                                                  
查看完整描述

1 回答

?
達令說

TA貢獻1821條經(jīng)驗 獲得超6個贊

您的代碼中有錯誤。實際上,queue.task_done()應(yīng)該只在從隊列中取出元素時調(diào)用,而不是在將它們放入隊列時調(diào)用。


但是您的中間件類正在它剛剛使用的隊列上調(diào)用它.put(),用于self._out列表中的最后一個隊列;queue.task_done()從DistributorMiddleware.distribute()以下位置刪除呼叫:


async def distribute(self):


    while True:

        ele = await self._in.get()

        count=0

        for queue in self._out:

            await queue.put(ele)

            count+=1

            print(f'inserted ele in {count}')

        queue.task_done()

        # ^^^^^ you didn't take anything from the queue here!

當(dāng)您刪除該行時,您的測試就通過了。


您在測試中看到異常的原因是因為只有這樣隊列才知道task_done()被調(diào)用得太頻繁了。該queue.task_done()呼叫DistributorMiddleware.distribute()減1,未完成的任務(wù)計數(shù)器,但只有當(dāng)該計數(shù)器下降到低于零能的異常進行檢測。只有當(dāng)最后一個任務(wù)從 中的隊列中取出時,您才會到達那個點test_distribution(),此時未完成的任務(wù)計數(shù)器至少提前一步達到 0。


也許那是為了改為調(diào)用self._in.task_done()?您只是在該while循環(huán)中從該隊列中獲取了一個元素:


async def distribute(self):


    while True:

        ele = await self._in.get()

        # getting an element from self._in

        count=0

        for queue in self._out:

            await queue.put(ele)

            count+=1

            print(f'inserted ele in {count}')

        self._in.task_done()

        # done with ele, so decrement the self._in unfinished tasks counter


查看完整回答
反對 回復(fù) 2021-08-11
  • 1 回答
  • 0 關(guān)注
  • 887 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號