1 回答
TA貢獻(xiàn)1851條經(jīng)驗(yàn) 獲得超4個(gè)贊
您不能將 a 傳遞threading.Lock給async with,因?yàn)樗皇菫楫惒绞褂枚O(shè)計(jì)的,它是一個(gè)阻塞原語(yǔ)。更重要的是,async with threading.Lock()即使它確實(shí)有效也沒(méi)有意義,因?yàn)槟鷮@得一把全新的鎖,它總是會(huì)成功。為了使鎖定有意義,您必須在多個(gè)線程之間共享一個(gè)鎖,例如存儲(chǔ)在對(duì)象的屬性中,或以另一種方式與對(duì)象相關(guān)聯(lián)。這個(gè)答案的其余部分將假設(shè)您在threading.Lock線程之間共享。
由于threading.Lock總是阻塞,因此您可以從 asyncio 使用它的唯一方法是在專用線程中獲取它,暫停當(dāng)前協(xié)程的執(zhí)行,直到獲取鎖。此功能已包含在run_in_executor事件循環(huán)方法中,您可以應(yīng)用該方法:
_pool = concurrent.futures.ThreadPoolExecutor()
async def work(lock, other_args...):
? ? # lock is a threading.Lock shared between threads
? ? loop = asyncio.get_event_loop()
? ? # Acquire the lock in a worker thread, suspending us while waiting.
? ? await loop.run_in_executor(_pool, lock.acquire)
? ? ... access the object with the lock held ...
? ? # Can release directly because release() doesn't block and a
? ? # threading.Lock can be released from any thread.
? ? lock.release()
您可以通過(guò)創(chuàng)建異步上下文管理器來(lái)使其使用起來(lái)更優(yōu)雅(并且異常安全):
_pool = concurrent.futures.ThreadPoolExecutor()
@contextlib.asynccontextmanager
async def async_lock(lock):
? ? loop = asyncio.get_event_loop()
? ? await loop.run_in_executor(_pool, lock.acquire)
? ? try:
? ? ? ? yield? # the lock is held
? ? finally:
? ? ? ? lock.release()
然后你可以按如下方式使用它:
# lock is a threading.Lock shared between threads
async with async_lock(lock):
? ? ... access the object with the lock held ...
當(dāng)然,在 asyncio 之外你不會(huì)使用其中的任何一個(gè),你只是直接獲取鎖:
# lock is a threading.Lock shared between threads
with lock:
? ?... access the object ...
請(qǐng)注意,我們使用單獨(dú)的線程池而不是傳遞None給run_in_executor()重用默認(rèn)池。這是為了避免在持有鎖的函數(shù)本身需要訪問(wèn)線程池以供其他用途的情況下出現(xiàn)死鎖run_in_executor()。通過(guò)保持線程池私有,我們避免了因其他人使用同一池而導(dǎo)致死鎖的可能性。
添加回答
舉報(bào)
