移相器 Phaser
1. 前言
本節(jié)帶領(lǐng)大家認(rèn)識(shí)第四個(gè)常用的 Java 并發(fā)工具類之 Phaser。
本節(jié)先介紹 Phaser 工具類表達(dá)的概念和最基本用法,接著通過一個(gè)生活中的例子為大家解釋 Phaser 工具類的使用場(chǎng)合,然后通過簡(jiǎn)單的編碼實(shí)現(xiàn)此場(chǎng)景,最后帶領(lǐng)大家熟悉 Phaser 工具類的其他重要方法。
下面我們正式開始介紹吧。
2. 概念解釋
Phaser 表示 “階段器”,一個(gè)可重用的同步 barrier,與 CyclicBarrier 相比,Phaser 更靈活,而且側(cè)重于 “重用”。Phaser 中允許 “注冊(cè)的同步者(parties)” 隨時(shí)間而變化。Phaser 可以通過構(gòu)造器初始化 parties 個(gè)數(shù),也可以在 Phaser 運(yùn)行期間隨時(shí)加入新的 parties,以及在運(yùn)行期間注銷 parties。
是不是又強(qiáng)大又抽象,沒關(guān)系,我們通過一張圖可以直白了解其提供的邏輯模型。
概念已經(jīng)了解了,Phaser 工具類最基本的用法是怎樣的呢?看下面。
3. 基本用法
// 創(chuàng)建一個(gè) Phaser 對(duì)象
Phaser phaser = new Phaser();
// 將線程 m 作為同步者之一進(jìn)行同步控制注冊(cè)
phaser.register();
// 線程 m 開始處理邏輯
...
// 線程 m 等待同一周期內(nèi),其他線程到達(dá),然后進(jìn)入新的周期,并繼續(xù)同步進(jìn)行
phaser.arriveAndAwaitAdvance();
...
// 線程 m 執(zhí)行完畢后做同步控制注銷
phaser.arriveAndDeregister();
這個(gè)工具類相對(duì)而言比較復(fù)雜,大家不要著急,結(jié)合后面的案例仔細(xì)體會(huì)。Phaser 應(yīng)用在哪些場(chǎng)合比較合適呢?下面我們給出最常用的場(chǎng)景說明。
4. 常用場(chǎng)景
Phaser 適合用于具有多階段處理的任務(wù),在每個(gè)階段有多個(gè)線程并行處理的場(chǎng)景。這樣描述很抽象,我們舉一個(gè)生活中的例子:有一個(gè)開發(fā)小組總共 4 個(gè)人,約定一起去旅游。計(jì)劃一起出發(fā),先去景點(diǎn) A 自有活動(dòng),3 個(gè)小時(shí)后去景點(diǎn) B 自有活動(dòng),2 個(gè)小時(shí)候后活動(dòng)結(jié)束統(tǒng)一集合。這個(gè)場(chǎng)景中 4 個(gè)人相當(dāng)于 4 個(gè)線程,分了 4 個(gè)階段完成了整個(gè)計(jì)劃。像類似這樣的場(chǎng)景很適合用 Phaser 解決。請(qǐng)看下面代碼。
5. 場(chǎng)景案例
public class PhaserTest {
// 先構(gòu)建一個(gè)階段器對(duì)象
private static TravelPhaser travelPhaser = new TravelPhaser();
// 主邏輯
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建 5 個(gè)線程代表每一位同事
for (int i = 1; i < 5; i++) {
// 對(duì)每一個(gè)需要同步控制的線程進(jìn)行同步控制注冊(cè)
travelPhaser.register();
// 模擬每一位同事開始旅游行動(dòng)
Thread thread = new Thread(new Colleague(travelPhaser), "同事" + i);
thread.start();
}
}
}
上述代碼在注冊(cè)好需要同步控制的所有線程之后,開啟了每一個(gè)線程(每位同事)的處理。每一個(gè)線程(每位同事)如何行動(dòng)呢,代碼如下:
import java.util.Random;
/**
* 模擬人以及旅游的各類狀態(tài)
*/
public class Colleague implements Runnable {
private TravelPhaser travelPhaser;
public Colleague(TravelPhaser travelPhaser) {
this.travelPhaser = travelPhaser;
}
/**
* 模擬每位同事的動(dòng)作
*/
@Override
public void run() {
doAnything();
System.out.println(Thread.currentThread().getName() + "到達(dá)出發(fā)集合地");
travelPhaser.arriveAndAwaitAdvance();
doAnything();
System.out.println(Thread.currentThread().getName() + "已經(jīng)在景點(diǎn) A 自由活動(dòng)結(jié)束");
travelPhaser.arriveAndAwaitAdvance();
doAnything();
System.out.println(Thread.currentThread().getName() + "已經(jīng)在景點(diǎn) B 自由活動(dòng)結(jié)束");
travelPhaser.arriveAndAwaitAdvance();
doAnything();
System.out.println(Thread.currentThread().getName() + "到達(dá)返程集合地");
travelPhaser.arriveAndAwaitAdvance();
}
/**
* 模擬用時(shí)
*/
private void doAnything() {
try {
Thread.sleep(new Random().nextInt(10000));
} catch (Exception e) {}
}
}
上述代碼模擬了每位同事的旅游過程。代碼中使用了 arriveAndAwaitAdvance () 進(jìn)行每個(gè)旅游階段的控制。我們?cè)俳又磳?duì)旅游各個(gè)階段的自定義控制:
import java.util.concurrent.Phaser;
/**
* 對(duì)每一個(gè)階段進(jìn)行自定義控制
*/
public class TravelPhaser extends Phaser {
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
// 第1階段,旅游前的集合
case 0:
System.out.println("出發(fā)前小組人員集合完畢,總?cè)藬?shù):"+getRegisteredParties());
return false;
// 第2階段,景點(diǎn) A 游玩
case 1:
System.out.println("景點(diǎn) A 游玩結(jié)束");
return false;
// 第3階段,景點(diǎn) B 游玩
case 2:
System.out.println("景點(diǎn) B 游玩結(jié)束");
return false;
// 第4階段,旅游結(jié)束返程集合
case 3:
System.out.println("所有活動(dòng)結(jié)束后小組人員集合完畢,總?cè)藬?shù):"+getRegisteredParties());
return true;
default:
return true;
}
}
}
上述代碼只是在各個(gè)階段打印了一些描述信息,實(shí)際中可以做更多的邏輯控制。運(yùn)行上面代碼,我們觀察一下運(yùn)行結(jié)果。
同事1到達(dá)出發(fā)集合地
同事4到達(dá)出發(fā)集合地
同事2到達(dá)出發(fā)集合地
同事3到達(dá)出發(fā)集合地
出發(fā)前小組人員集合完畢,總?cè)藬?shù):4
同事3已經(jīng)在景點(diǎn) A 自由活動(dòng)結(jié)束
同事2已經(jīng)在景點(diǎn) A 自由活動(dòng)結(jié)束
同事1已經(jīng)在景點(diǎn) A 自由活動(dòng)結(jié)束
同事4已經(jīng)在景點(diǎn) A 自由活動(dòng)結(jié)束
景點(diǎn) A 游玩結(jié)束
同事4已經(jīng)在景點(diǎn) B 自由活動(dòng)結(jié)束
同事2已經(jīng)在景點(diǎn) B 自由活動(dòng)結(jié)束
同事1已經(jīng)在景點(diǎn) B 自由活動(dòng)結(jié)束
同事3已經(jīng)在景點(diǎn) B 自由活動(dòng)結(jié)束
景點(diǎn) B 游玩結(jié)束
同事2到達(dá)返程集合地
同事3到達(dá)返程集合地
同事1到達(dá)返程集合地
同事4到達(dá)返程集合地
所有活動(dòng)結(jié)束后小組人員集合完畢,總?cè)藬?shù):4
觀察結(jié)果,和我們的預(yù)期一致。注意體會(huì) Phaser 提供的多線程共同協(xié)作的模型。
6. 其他方法介紹
除過上面代碼中使用的最基本的 register ()、arriveAndAwaitAdvance ()、arriveAndDeregister ()、getRegisteredParties () 方法之外,還有下面幾個(gè)方法大家可以了解一下。
-
awaitAdvance (int phase) 方法。
具有阻塞功能,等待 phase 周期數(shù)下其他所有的 parties 都到達(dá)后返回。如果指定的 phase 與當(dāng)前的 phase 不一致,則立即返回。 -
awaitAdvanceInterruptibly (int phase) 方法。
同 awaitAdvance 類似,但支持中斷響應(yīng),即 waiter 線程如果被外部中斷,則此方法立即返回。 -
forceTermination () 方法。
用于強(qiáng)制終止 phase,此后 Phaser 對(duì)象將不可用,即 register 等將不再有效。
7. 小結(jié)
本節(jié)介紹了 Phaser 基本概念原理,并且通過一個(gè)簡(jiǎn)單的例子,介紹了其使用場(chǎng)景和基本用法。希望大家在學(xué)習(xí)過程中,多思考勤練習(xí),早日掌握之。