1 回答

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超7個(gè)贊
您的第一個(gè)問(wèn)題是您已經(jīng)在MainThreadwith 調(diào)用中調(diào)用了您的函數(shù):
pool.apply_async(test_post())
...而不是test_post作為在工作線程中執(zhí)行的調(diào)用的參數(shù)傳遞:
pool.apply_async(test_post)
OP:我有一個(gè)非常好的使用線程的腳本,但后來(lái)我讀到它需要手動(dòng)編碼來(lái)維護(hù) n 個(gè)并發(fā)線程(意思是,舊線程完成后立即啟動(dòng)新線程)......
您需要區(qū)分工作單元(作業(yè)、任務(wù))和線程。首先使用池的重點(diǎn)是重用執(zhí)行器,無(wú)論是線程還是進(jìn)程。在實(shí)例化池時(shí)已經(jīng)創(chuàng)建了工作線程,只要您不關(guān)閉池,所有初始線程都會(huì)保持活動(dòng)狀態(tài)。所以你不關(guān)心重新創(chuàng)建線程,你只需調(diào)用現(xiàn)有池的池方法,只要你有一些要分發(fā)的工作。Pool 接受這個(gè)作業(yè)(一個(gè)池方法調(diào)用)并從中創(chuàng)建任務(wù)。這些任務(wù)被放在一個(gè)無(wú)界隊(duì)列中。每當(dāng)工作人員完成任務(wù)時(shí),它都會(huì)阻塞地嘗試get()從這樣的inqueue.
OP:Pool 只執(zhí)行一個(gè)線程而不是 4 個(gè)......我嘗試了不同的方法,但它仍然只執(zhí)行一次。
pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
...是一個(gè)單一調(diào)用、單一任務(wù)的生產(chǎn)工作。如果您想要多次執(zhí)行func,則必須pool.apply_async()多次調(diào)用,或者使用映射池方法,例如
pool.map(func, iterable, chunksize=None)
...,它將一個(gè)函數(shù)映射到一個(gè)可迭代對(duì)象上。pool.apply_async是非阻塞的,這就是為什么它是“異步的”。它立即返回一個(gè)AsyncResult您可以(阻塞地)調(diào)用.wait()或調(diào)用.get()的對(duì)象。
通過(guò)評(píng)論很明顯,要沒(méi)完沒(méi)了并立即對(duì)已完成的任務(wù)(個(gè)體經(jīng)營(yíng)產(chǎn)生的輸入流)的替代品...和程序應(yīng)停止在一個(gè)KeyboardInterrupt或者當(dāng)結(jié)果不具有一定的價(jià)值。
您可以使用 -callback參數(shù)在任何舊apply_async任務(wù)完成后立即安排新任務(wù)。困難在于在 MainThread 的同時(shí)如何防止整個(gè)腳本過(guò)早結(jié)束,同時(shí)保持它對(duì) KeyboardInterrupt 的響應(yīng)。讓 MainThread 在循環(huán)中休眠使其仍然可以立即對(duì) KeyboardInterrupt 做出反應(yīng),同時(shí)防止提前退出。如果結(jié)果應(yīng)該停止程序,您可以讓回調(diào)終止池。然后 MainThread 只需要在他的睡眠循環(huán)中包含對(duì)池狀態(tài)的檢查。
import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool
def test_post(post_id):
time.sleep(randint(1, 3))
status_code = choice([200] * 9 + [404])
return "{} {} Message no.{}: {}".format(
datetime.now(), current_thread().name, post_id, status_code
), status_code
def handle_result(result):
msg, code = result
print(msg)
if code != 200:
print("terminating")
pool.terminate()
else:
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
if __name__ == '__main__':
N_WORKERS = 4
post_cnt = count()
pool = ThreadPool(N_WORKERS)
# initial distribution
for _ in range(N_WORKERS):
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
try:
while pool._state == 0: # check if pool is still alive
time.sleep(1)
except KeyboardInterrupt:
print(" got interrupt")
帶有鍵盤中斷的示例輸出:
$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt
由于不需要的返回值而終止的示例輸出:
$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating
請(qǐng)注意,在您的場(chǎng)景中,您還可以apply_async更頻繁地調(diào)用N_WORKERS-times 為您的初始分發(fā)提供一些緩沖區(qū)以減少延遲。
添加回答
舉報(bào)