Python 生產(chǎn)者消費(fèi)者模型
1. 簡(jiǎn)介
生產(chǎn)者和消費(fèi)者問題是線程模型中的經(jīng)典問題:
- 生產(chǎn)者和消費(fèi)者共享同一個(gè)存儲(chǔ)空間
- 生產(chǎn)者往存儲(chǔ)空間中添加產(chǎn)品,消費(fèi)者從存儲(chǔ)空間中取走產(chǎn)品
- 當(dāng)存儲(chǔ)空間為空時(shí),消費(fèi)者阻塞,當(dāng)存儲(chǔ)空間滿時(shí),生產(chǎn)者阻塞
Python 的內(nèi)置模塊 queue 提供了對(duì)生產(chǎn)者和消費(fèi)者模型的支持,模塊 queue 定義了類 Queue,類 Queue 表示一個(gè)被生產(chǎn)者和消費(fèi)者共享的隊(duì)列,類 Queue 提供如下常用方法:
方法 | 功能 |
---|---|
get() | 從隊(duì)列中取走數(shù)據(jù),如果隊(duì)列為空,則阻塞 |
put(item) | 向隊(duì)列中放置數(shù)據(jù),如果隊(duì)列為慢,則阻塞 |
join() | 如果隊(duì)列不為空,則等待隊(duì)列變?yōu)榭?/td> |
task_done() | 消費(fèi)者從隊(duì)列中取走一項(xiàng)數(shù)據(jù),當(dāng)隊(duì)列變?yōu)榭諘r(shí),喚醒調(diào)用 join() 的線程 |
2. 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
創(chuàng)建生產(chǎn)者線程和消費(fèi)者線程,使用一個(gè)共享隊(duì)列連接這兩個(gè)線程,代碼如下:
import threading
import queue
q = queue.Queue()
- 導(dǎo)入 threading 模塊和 queue 模塊
- 創(chuàng)建共享隊(duì)列 q
def produce():
for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
q.put(item)
print('produce %s' % item)
- 創(chuàng)建生產(chǎn)者線程的入口函數(shù) produce
- 生產(chǎn)者生產(chǎn) 8 個(gè)數(shù)據(jù)
- 調(diào)用 q.put(item) 將生產(chǎn)的數(shù)據(jù)放入到共享隊(duì)列 q 中
def consume():
for i in range(8):
item = q.get()
print(' consume %s' % item)
- 創(chuàng)建消費(fèi)者線程的入口函數(shù) consume
- 消費(fèi)者消費(fèi) 8 個(gè)數(shù)據(jù)
- 調(diào)用 q.get() 從共享隊(duì)列 q 中取走數(shù)據(jù)
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
producer.join()
consumer.join()
- 創(chuàng)建生產(chǎn)者線程 producer,線程入口為 produce
- 創(chuàng)建消費(fèi)者線程 consumer,線程入口為 consume
- 啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程,并等待它們結(jié)束
運(yùn)行程序,輸出結(jié)果如下:
produce a
produce b
consume a
produce c
consume b
consume c
produce d
consume d
produce e
consume e
produce f
consume f
produce g
consume g
produce h
consume h
- 生產(chǎn)者生產(chǎn)了 8 個(gè)數(shù)據(jù):a、b、c、d、e、f、g、h
- 消費(fèi)者取走了 8 個(gè)數(shù)據(jù):a、b、c、d、e、f、g、h
3. 實(shí)現(xiàn)生產(chǎn)者、計(jì)算者、消費(fèi)者模型
創(chuàng)建生產(chǎn)者、計(jì)算者、消費(fèi)者線程:
- 生產(chǎn)者生產(chǎn) 8 個(gè)數(shù)據(jù)
- 計(jì)算者對(duì)生產(chǎn)者輸出的數(shù)據(jù)進(jìn)行加工,將加工后的數(shù)據(jù)送往消費(fèi)者
- 消費(fèi)者取走計(jì)算者輸出的數(shù)據(jù)
import threading
import queue
q0 = queue.Queue()
q1 = queue.Queue()
- 導(dǎo)入模塊 threading 和模塊 queue
- 使用兩個(gè)共享隊(duì)列連接這三個(gè)線程
- 共享隊(duì)列 q0 連接生產(chǎn)者和計(jì)算者
- 共享隊(duì)列 q1 連接計(jì)算者和消費(fèi)者
def produce():
for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
q0.put(item)
print('produce %s' % item)
- 創(chuàng)建生產(chǎn)者線程的入口函數(shù) produce
- 生產(chǎn)者生產(chǎn) 8 個(gè)數(shù)據(jù)
- 調(diào)用 q0.put(item) 將生產(chǎn)的數(shù)據(jù)放入到共享隊(duì)列 q0 中
def compute():
for i in range(8):
item = q0.get()
item = item.upper()
q1.put(item)
- 創(chuàng)建計(jì)算者線程的入口函數(shù) compute
- 調(diào)用 q0.get() 讀取生產(chǎn)者輸出數(shù)據(jù),并進(jìn)行加工
- 調(diào)用 q1.put(item) 將加工后的數(shù)據(jù)放入到共享隊(duì)列 q1 中
def consume():
for i in range(8):
item = q1.get()
print(' consume %s' % item)
- 創(chuàng)建消費(fèi)者線程的入口函數(shù) consume
- 消費(fèi)者消費(fèi) 8 個(gè)數(shù)據(jù)
- 調(diào)用 q1.get() 從共享隊(duì)列 q1 中取走數(shù)據(jù)
producer = threading.Thread(target=produce, args=())
computer = threading.Thread(target=compute, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
computer.start()
consumer.start()
producer.join()
computer.join()
consumer.join()
- 創(chuàng)建生產(chǎn)者線程 producer,線程入口為 produce
- 創(chuàng)建計(jì)算者線程 computer,線程入口為 compute
- 創(chuàng)建消費(fèi)者線程 consumer,線程入口為 consume
- 啟動(dòng)生產(chǎn)者線程、計(jì)算者線程、消費(fèi)者線程,并等待它們結(jié)束
運(yùn)行程序,輸出結(jié)果如下:
produce a
produce b
produce c
consume A
produce d
produce e
consume B
produce f
consume C
produce g
consume D
produce h
consume E
consume F
consume G
consume H
- 生產(chǎn)者生產(chǎn)了 8 個(gè)數(shù)據(jù):a、b、c、d、e、f、g、h
- 計(jì)算者將數(shù)據(jù)加工為:A、B、C、D、E、F、G、H
- 消費(fèi)者取走了 8 個(gè)數(shù)據(jù):A、B、C、D、E、F、G、H
4. 同步生產(chǎn)者與消費(fèi)者的推進(jìn)速度
在生產(chǎn)者、消費(fèi)者模型中,可能會(huì)存在兩者推進(jìn)速度不匹配的問題:生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度較快,但是,消費(fèi)者取走數(shù)據(jù)的速度較慢。
可以使用 queue 的 task_done() 方法和 join() 方法同步生產(chǎn)者與消費(fèi)者的推進(jìn)速度:
- 生產(chǎn)者調(diào)用 join() 方法,等待隊(duì)列中所有的數(shù)據(jù)被取走
- 消費(fèi)者調(diào)用 task_done() 方法,表示取走了隊(duì)列中的一項(xiàng)數(shù)據(jù),當(dāng)隊(duì)列為空時(shí),喚醒阻塞在 join() 方法中的生產(chǎn)者
import threading
import queue
q = queue.Queue()
- 導(dǎo)入 threading 模塊和 queue 模塊
- 創(chuàng)建共享隊(duì)列 q
def produce():
for item in ['A', 'B', 'C', 'D']:
q.put(item)
print('produce %s' % item)
q.join()
print('------------ q is empty')
for item in ['E', 'F', 'G', 'H']:
q.put(item)
print('produce %s' % item)
q.join()
print('------------ q is empty')
- 創(chuàng)建生產(chǎn)者線程的入口函數(shù) produce
- 首先,生產(chǎn) 4 個(gè)數(shù)據(jù):A、B、C、D
- 調(diào)用 q.put(item) 將它們放入到隊(duì)列 q 中
- 調(diào)用 q.join() 等待消費(fèi)者將它們?nèi)咳∽?/li>
- 然后,生產(chǎn) 4 個(gè)數(shù)據(jù):E、F、G、G
- 調(diào)用 q.put(item) 將它們放入到隊(duì)列 q 中
- 調(diào)用 q.join() 等待消費(fèi)者將它們?nèi)咳∽?/li>
def consume():
for i in range(8):
item = q.get()
print(' consume %s' % item)
q.task_done()
- 創(chuàng)建消費(fèi)者線程的入口函數(shù) consume
- 調(diào)用 q.get() 從隊(duì)列 q 中取走一個(gè)數(shù)據(jù)
- 調(diào)用 q.task_done(),表示已經(jīng)從隊(duì)列 q 中取走了一個(gè)數(shù)據(jù),當(dāng)隊(duì)列為空時(shí),喚醒生產(chǎn)者
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
- 創(chuàng)建生產(chǎn)者線程 producer,線程入口為 produce
- 創(chuàng)建消費(fèi)者線程 consumer,線程入口為 consume
- 啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程,并等待它們結(jié)束
運(yùn)行程序,輸出結(jié)果如下:
produce A
produce B
consume A
consume B
produce C
consume C
produce D
consume D
------------ q is empty
produce E
consume E
produce F
consume F
produce G
produce H
consume G
consume H
------------ q is empty
- 生產(chǎn)者生產(chǎn)第一批數(shù)據(jù) A、B、C、D,消費(fèi)者將其取走
- 當(dāng)?shù)谝慌鷶?shù)據(jù)完全被消費(fèi)者取走后,生產(chǎn)者才開始生產(chǎn)第二批數(shù)據(jù)
- 生產(chǎn)者生產(chǎn)第二批數(shù)據(jù) E、F、G、H,消費(fèi)者將其取走