生產(chǎn)者與消費(fèi)者案例
1. 前言
本節(jié)內(nèi)容是通過之前學(xué)習(xí)的 synchronized 關(guān)鍵字,實(shí)現(xiàn)多線程并發(fā)編程中最經(jīng)典的生產(chǎn)者與消費(fèi)者模式,這是本節(jié)課程的核心內(nèi)容,所有的知識(shí)點(diǎn)都是圍繞這一經(jīng)典模型展開的。本節(jié)有如下知識(shí)點(diǎn):
- 生產(chǎn)者與消費(fèi)者模型介紹,這是打開本節(jié)知識(shí)大門的鑰匙,也是本節(jié)內(nèi)容的基礎(chǔ);
- 了解生產(chǎn)者與消費(fèi)者案例實(shí)現(xiàn)的三種方式,我們本節(jié)以 synchronized 關(guān)鍵字聯(lián)合 wait/notify 機(jī)制進(jìn)行實(shí)現(xiàn);
- wait 方法和 notify 方法介紹,這是我們實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者案例的技術(shù)基礎(chǔ);
- 生產(chǎn)者與消費(fèi)者案例代碼實(shí)現(xiàn),這是我們本節(jié)內(nèi)容的核心,一定要對(duì)此知識(shí)點(diǎn)進(jìn)行深入的學(xué)習(xí)和掌握。
2. 生產(chǎn)者與消費(fèi)者模型介紹
定義: 生產(chǎn)者消費(fèi)者模式是一個(gè)十分經(jīng)典的多線程并發(fā)協(xié)作的模式。
意義:弄懂生產(chǎn)者消費(fèi)者問題能夠讓我們對(duì)并發(fā)編程的理解加深。
介紹:所謂生產(chǎn)者 - 消費(fèi)者問題,實(shí)際上主要是包含了兩類線程,一種是生產(chǎn)者線程用于生產(chǎn)數(shù)據(jù),另一種是消費(fèi)者線程用于消費(fèi)數(shù)據(jù),為了解耦生產(chǎn)者和消費(fèi)者的關(guān)系,通常會(huì)采用共享的數(shù)據(jù)區(qū)域。
共享的數(shù)據(jù)區(qū)域就像是一個(gè)倉(cāng)庫(kù),生產(chǎn)者生產(chǎn)數(shù)據(jù)之后直接放置在共享數(shù)據(jù)區(qū)中,并不需要關(guān)心消費(fèi)者的行為。而消費(fèi)者只需要從共享數(shù)據(jù)區(qū)中去獲取數(shù)據(jù),就不再需要關(guān)心生產(chǎn)者的行為。
3. 生產(chǎn)者與消費(fèi)者三種實(shí)現(xiàn)方式
在實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題時(shí),可以采用三種方式:
- 使用 Object 的 wait/notify 的消息通知機(jī)制,本節(jié)課程我們采用該方式結(jié)合 synchronized 關(guān)鍵字進(jìn)行生產(chǎn)者與消費(fèi)者模式的實(shí)現(xiàn);
- 使用 Lock 的 Condition 的 await/signal 的消息通知機(jī)制;
- 使用 BlockingQueue 實(shí)現(xiàn)。本文主要將這三種實(shí)現(xiàn)方式進(jìn)行總結(jié)歸納。
4. wait 與 notify
Java 中,可以通過配合調(diào)用 Object 對(duì)象的 wait () 方法和 notify () 方法或 notifyAll () 方法來(lái)實(shí)現(xiàn)線程間的通信。
wait 方法:我們之前對(duì) wait 方法有了基礎(chǔ)的了解,在線程中調(diào)用 wait () 方法,將阻塞當(dāng)前線程,并且釋放鎖,直至等到其他線程調(diào)用了調(diào)用 notify () 方法或 notifyAll () 方法進(jìn)行通知之后,當(dāng)前線程才能從 wait () 方法出返回,繼續(xù)執(zhí)行下面的操作。
notify 方法:即喚醒,notify 方法使原來(lái)在該對(duì)象上 wait 的線程退出 waiting 狀態(tài),使得該線程從等待隊(duì)列中移入到同步隊(duì)列中去,等待下一次能夠有機(jī)會(huì)獲取到對(duì)象監(jiān)視器鎖。
notifyAll 方法:即喚醒全部 waiting 線程,與 notify 方法在效果上一致。
5. 生產(chǎn)者與消費(fèi)者案例
為了更好地理解并掌握生產(chǎn)者與消費(fèi)者模式的實(shí)現(xiàn),我們先來(lái)進(jìn)行場(chǎng)景設(shè)計(jì),然后再通過實(shí)例代碼進(jìn)行實(shí)現(xiàn)并觀察運(yùn)行結(jié)果。
場(chǎng)景設(shè)計(jì):
- 創(chuàng)建一個(gè)工廠類 ProductFactory,該類包含兩個(gè)方法,produce 生產(chǎn)方法和 consume 消費(fèi)方法;
- 對(duì)于 produce 方法,當(dāng)沒有庫(kù)存或者庫(kù)存達(dá)到 10 時(shí),停止生產(chǎn)。為了更便于觀察結(jié)果,每生產(chǎn)一個(gè)產(chǎn)品,sleep 5000 毫秒;
- 對(duì)于 consume 方法,只要有庫(kù)存就進(jìn)行消費(fèi)。為了更便于觀察結(jié)果,每消費(fèi)一個(gè)產(chǎn)品,sleep 5000 毫秒;
- 庫(kù)存使用 LinkedList 進(jìn)行實(shí)現(xiàn),此時(shí) LinkedList 即共享數(shù)據(jù)內(nèi)存;
- 創(chuàng)建一個(gè) Producer 生產(chǎn)者類,用于調(diào)用 ProductFactory 的 produce 方法。生產(chǎn)過程中,要對(duì)每個(gè)產(chǎn)品從 0 開始進(jìn)行編號(hào);
- 創(chuàng)建一個(gè) Consumer 消費(fèi)者類,用于調(diào)用 ProductFactory 的 consume 方法;
- 創(chuàng)建一個(gè)測(cè)試類,main 函數(shù)中創(chuàng)建 2 個(gè)生產(chǎn)者和 3 個(gè)消費(fèi)者,運(yùn)行程序進(jìn)行結(jié)果觀察。
實(shí)例:創(chuàng)建一個(gè)工廠類 ProductFactory
class ProductFactory {
private LinkedList<String> products; //根據(jù)需求定義庫(kù)存,用 LinkedList 實(shí)現(xiàn)
private int capacity = 10; // 根據(jù)需求:定義最大庫(kù)存 10
public ProductFactory() {
products = new LinkedList<String>();
}
// 根據(jù)需求:produce 方法創(chuàng)建
public synchronized void produce(String product) {
while (capacity == products.size()) { //根據(jù)需求:如果達(dá)到 10 庫(kù)存,停止生產(chǎn)
try {
System.out.println("警告:線程("+Thread.currentThread().getName() + ")準(zhǔn)備生產(chǎn)產(chǎn)品,但產(chǎn)品池已滿");
wait(); // 庫(kù)存達(dá)到 10 ,生產(chǎn)線程進(jìn)入 wait 狀態(tài)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product); //如果沒有到 10 庫(kù)存,進(jìn)行產(chǎn)品添加
try {
Thread.sleep(5000); //根據(jù)需求為了便于觀察結(jié)果,每生產(chǎn)一個(gè)產(chǎn)品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程("+Thread.currentThread().getName() + ")生產(chǎn)了一件產(chǎn)品:" + product+";當(dāng)前剩余商品"+products.size()+"個(gè)");
notify(); //生產(chǎn)了產(chǎn)品,通知消費(fèi)者線程從 wait 狀態(tài)喚醒,進(jìn)行消費(fèi)
}
// 根據(jù)需求:consume 方法創(chuàng)建
public synchronized String consume() {
while (products.size()==0) { //根據(jù)需求:沒有庫(kù)存消費(fèi)者進(jìn)入wait狀態(tài)
try {
System.out.println("警告:線程("+Thread.currentThread().getName() + ")準(zhǔn)備消費(fèi)產(chǎn)品,但當(dāng)前沒有產(chǎn)品");
wait(); //庫(kù)存為 0 ,無(wú)法消費(fèi),進(jìn)入 wait ,等待生產(chǎn)者線程喚醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String product = products.remove(0) ; //如果有庫(kù)存則消費(fèi),并移除消費(fèi)掉的產(chǎn)品
try {
Thread.sleep(5000);//根據(jù)需求為了便于觀察結(jié)果,每消費(fèi)一個(gè)產(chǎn)品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程("+Thread.currentThread().getName() + ")消費(fèi)了一件產(chǎn)品:" + product+";當(dāng)前剩余商品"+products.size()+"個(gè)");
notify();// 通知生產(chǎn)者繼續(xù)生產(chǎn)
return product;
}
}
實(shí)例:Producer 生產(chǎn)者類創(chuàng)建
class Producer implements Runnable {
private ProductFactory productFactory; //關(guān)聯(lián)工廠類,調(diào)用 produce 方法
public Producer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
int i = 0 ; // 根據(jù)需求,對(duì)產(chǎn)品進(jìn)行編號(hào)
while (true) {
productFactory.produce(String.valueOf(i)); //根據(jù)需求 ,調(diào)用 productFactory 的 produce 方法
i++;
}
}
}
實(shí)例:Consumer 消費(fèi)者類創(chuàng)建
class Consumer implements Runnable {
private ProductFactory productFactory;
public Consumer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
while (true) {
productFactory.consume();
}
}
}
實(shí)例: 創(chuàng)建測(cè)試類,2 個(gè)生產(chǎn)者,3 個(gè)消費(fèi)者
public class DemoTest extends Thread{
public static void main(String[] args) {
ProductFactory productFactory = new ProductFactory();
new Thread(new Producer(productFactory),"1號(hào)生產(chǎn)者"). start();
new Thread(new Producer(productFactory),"2號(hào)生產(chǎn)者"). start();
new Thread(new Consumer(productFactory),"1號(hào)消費(fèi)者"). start();
new Thread(new Consumer(productFactory),"2號(hào)消費(fèi)者"). start();
new Thread(new Consumer(productFactory),"3號(hào)消費(fèi)者"). start();
}
}
結(jié)果驗(yàn)證:
線程(1號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:0;當(dāng)前剩余商品1個(gè)
線程(3號(hào)消費(fèi)者)消費(fèi)了一件產(chǎn)品:0;當(dāng)前剩余商品0個(gè)
警告:線程(2號(hào)消費(fèi)者)準(zhǔn)備消費(fèi)產(chǎn)品,但當(dāng)前沒有產(chǎn)品
警告:線程(1號(hào)消費(fèi)者)準(zhǔn)備消費(fèi)產(chǎn)品,但當(dāng)前沒有產(chǎn)品
線程(2號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:0;當(dāng)前剩余商品1個(gè)
線程(2號(hào)消費(fèi)者)消費(fèi)了一件產(chǎn)品:0;當(dāng)前剩余商品0個(gè)
警告:線程(1號(hào)消費(fèi)者)準(zhǔn)備消費(fèi)產(chǎn)品,但當(dāng)前沒有產(chǎn)品
線程(2號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:1;當(dāng)前剩余商品1個(gè)
線程(3號(hào)消費(fèi)者)消費(fèi)了一件產(chǎn)品:1;當(dāng)前剩余商品0個(gè)
線程(1號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:1;當(dāng)前剩余商品1個(gè)
線程(3號(hào)消費(fèi)者)消費(fèi)了一件產(chǎn)品:1;當(dāng)前剩余商品0個(gè)
線程(2號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:2;當(dāng)前剩余商品1個(gè)
線程(1號(hào)消費(fèi)者)消費(fèi)了一件產(chǎn)品:2;當(dāng)前剩余商品0個(gè)
警告:線程(2號(hào)消費(fèi)者)準(zhǔn)備消費(fèi)產(chǎn)品,但當(dāng)前沒有產(chǎn)品
線程(2號(hào)生產(chǎn)者)生產(chǎn)了一件產(chǎn)品:3;當(dāng)前剩余商品1個(gè)
...
...
結(jié)果分析:
從結(jié)果來(lái)看,生產(chǎn)者線程和消費(fèi)者線程合作無(wú)間,當(dāng)沒有產(chǎn)品時(shí),消費(fèi)者線程進(jìn)入等待;當(dāng)產(chǎn)品達(dá)到 10 個(gè)最大庫(kù)存是,生產(chǎn)者進(jìn)入等待。這就是經(jīng)典的生產(chǎn)者 - 消費(fèi)者模型。
6. 小結(jié)
實(shí)現(xiàn)多線程并發(fā)編程中最經(jīng)典的生產(chǎn)者與消費(fèi)者模式,這是本節(jié)課程的核心內(nèi)容,所有的知識(shí)點(diǎn)都是圍繞這一經(jīng)典模型展開的。 在掌握 synchronized 關(guān)鍵字,wait 方法和 notify 方法的基礎(chǔ)上,理解并掌握生產(chǎn)者與消費(fèi)者模式是本節(jié)課程的最終目標(biāo)。