ForkJoin 應(yīng)用示例
1. 前言
上一節(jié)我們學(xué)習(xí)了 ForkJoin 的基本概念和核心 API,本節(jié)帶領(lǐng)大家實(shí)現(xiàn)一個(gè)具體的應(yīng)用案例。從實(shí)際應(yīng)用中感受一下 ForkJoin 框架的使用,以及此框架帶來的便利。
本節(jié)先描述待實(shí)現(xiàn)的案例內(nèi)容,接著做編碼實(shí)現(xiàn),然后總結(jié)使用過程中的注意事項(xiàng)。
2. 案例描述
我們?cè)趯?shí)際項(xiàng)目中,常常會(huì)碰到大數(shù)據(jù)集合的處理,如大文件、大表、內(nèi)存中的大數(shù)據(jù)集合。由于數(shù)據(jù)量大,我們常常會(huì)考慮采用多線程的思路提高處理效率。
當(dāng)待處理的數(shù)據(jù)集的每一部分的數(shù)據(jù)處理邏輯基本一致,且可以很好拆分成小的數(shù)據(jù)集進(jìn)行處理時(shí),使用 ForkJoin 進(jìn)行處理比好合適。
我們還是采用 Executor 應(yīng)用示例中的場(chǎng)景:需要對(duì)某個(gè)目錄下的所有文件(成百上千)進(jìn)行加密并用文件的 MD5 串修改文件名稱。
在開始動(dòng)手實(shí)現(xiàn)之前,我們先做一個(gè)簡(jiǎn)單的分析。在這個(gè)案例中,我們將 “對(duì)文件進(jìn)行加密、生成 MD5 串、修改文件名稱” 作為待執(zhí)行任務(wù)的內(nèi)容。所有文件形成的列表就是我們待處理的數(shù)據(jù)范圍。為了校驗(yàn)整個(gè)處理過程是否有文件遺漏,我們最終需要核對(duì)處理結(jié)果,所以我們用 FileForkJoinTask 類(繼承 RecursiveTask)封裝我們的任務(wù)邏輯。為了方便演示,下面編碼中部分?jǐn)?shù)據(jù)采用了模擬的方式生成。
3. 編碼實(shí)現(xiàn)
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
public class ForkJoinTest {
// 模擬待處理的文件列表
private static int fileListSize = new Random().nextInt(15);
private static String[] fileList = new String[fileListSize];
static {
for(int i=0; i<fileListSize; i++) {
fileList[i] = "fileName" + i;
}
}
// 主線程
public static void main(String[] args) throws Exception {
// 創(chuàng)建用于處理任務(wù)的線程池
// ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); 這種創(chuàng)建方式可最大化使用全局系統(tǒng)資源
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交待處理的總?cè)蝿?wù)
Future result = forkJoinPool.submit(new FileDealTask(0, fileListSize, fileList));
// 獲取任務(wù)執(zhí)行結(jié)果
System.out.println("預(yù)備處理的文件個(gè)數(shù)" + fileListSize + ",總共處理的文件個(gè)數(shù):" + result.get());
// 關(guān)閉線程池
forkJoinPool.shutdown();
}
}
上面代碼注釋已經(jīng)很清楚了,我們觀察下面的代碼,看看任務(wù)是怎么切分的,以及子任務(wù)的結(jié)果是怎么做匯總的。
import lombok.SneakyThrows;
import java.util.Random;
import java.util.concurrent.RecursiveTask;
public class FileDealTask extends RecursiveTask<Integer> {
private String[] fileList;
// 當(dāng)子任務(wù)劃分到只需要處理最多10個(gè)文件時(shí),停止分割任務(wù)
private final int threshold = 2;
private int first;
private int last;
public FileDealTask(int first, int last, String[] fileList) {
this.fileList = fileList;
this.first = first;
this.last = last;
}
@SneakyThrows
@Override
protected Integer compute() {
// 執(zhí)行結(jié)果
int result = 0;
// 任務(wù)足夠小則直接處理(對(duì)文件進(jìn)行加密、生成MD5串、修改文件名稱)
if (last - first <= threshold) {
for (int i = first; i < last; i++) {
result = result + 1;
Thread.sleep(new Random().nextInt(2000));
System.out.println(Thread.currentThread().getName() + ":文件" + fileList[i] + "已處理完畢");
}
System.out.println(Thread.currentThread().getName() + ":總共處理的文件數(shù) (" + first + "," + last + ")" + result);
} else {
// 拆分成小任務(wù)
int middle = first + (last - first) / 2;
// 創(chuàng)建兩個(gè)子任務(wù)
FileDealTask leftTask = new FileDealTask(first, middle, fileList);
FileDealTask rightTask = new FileDealTask(middle, last, fileList);
// 觸發(fā)兩個(gè)子任務(wù)開始執(zhí)行
invokeAll(leftTask, rightTask);
// 等待兩個(gè)子任務(wù)執(zhí)行結(jié)果并返回
result = leftTask.join() + rightTask.join();
System.out.println(Thread.currentThread().getName() + ":當(dāng)前任務(wù)繼續(xù)拆分 "
+ " (" + first + "," + middle + "), (" + (middle) + "," + last + ")");
}
return result;
}
}
我們通過在 IDE 中運(yùn)行上面這個(gè)示例,看看實(shí)際的運(yùn)行結(jié)果。
【補(bǔ)充視頻】
上面代碼邏輯中有隨機(jī)內(nèi)容,每次運(yùn)行結(jié)果會(huì)有差異,運(yùn)行上面的代碼,我們觀察某次運(yùn)行結(jié)果如下:
ForkJoinPool-1-worker-2:文件fileName3已處理完畢
ForkJoinPool-1-worker-2:總共處理的文件數(shù) (3,4)1
ForkJoinPool-1-worker-1:文件fileName0已處理完畢
ForkJoinPool-1-worker-1:總共處理的文件數(shù) (0,1)1
ForkJoinPool-1-worker-0:文件fileName4已處理完畢
ForkJoinPool-1-worker-3:文件fileName1已處理完畢
ForkJoinPool-1-worker-3:文件fileName2已處理完畢
ForkJoinPool-1-worker-3:總共處理的文件數(shù) (1,3)2
ForkJoinPool-1-worker-1:當(dāng)前任務(wù)繼續(xù)拆分 (0,1), (1,3)
ForkJoinPool-1-worker-0:文件fileName5已處理完畢
ForkJoinPool-1-worker-0:總共處理的文件數(shù) (4,6)2
ForkJoinPool-1-worker-2:當(dāng)前任務(wù)繼續(xù)拆分 (3,4), (4,6)
ForkJoinPool-1-worker-1:當(dāng)前任務(wù)繼續(xù)拆分 (0,3), (3,6)
預(yù)備處理的文件個(gè)數(shù)6,總共處理的文件個(gè)數(shù):6
首先做了 (0,3)~(3,6),之后對(duì) (0,3) 做了 (0,1), (1,3) 的拆分,對(duì) (3,6) 做了 (3,4), (4,6) 的拆分。和我們的預(yù)期一致。
4. 注意事項(xiàng)
- ForkJoinPool 不是為了替代 ExecutorService,其主要用于實(shí)現(xiàn) “分而治之” 的算法,最適合處理計(jì)算密集型的任務(wù)。
- 做好評(píng)估權(quán)衡,當(dāng)需要處理的數(shù)據(jù)量不是特別大時(shí),沒有必要使用 ForkJoin。其底層使用多線程的方式處理任務(wù),涉及到線程上下文的切換,當(dāng)數(shù)據(jù)量不大的時(shí)候使用串行會(huì)比使用多線程快。
- 執(zhí)行子任務(wù)時(shí)候要注意,使用 invokeAll,不能分別對(duì)子任務(wù)調(diào)用 fork。
5. 視頻演示
6. 小結(jié)
本節(jié)通過一個(gè)實(shí)際例子的編碼實(shí)現(xiàn),展示了 ForkJoinPool 的具體用法。當(dāng)然本節(jié)中的用法相對(duì)比較簡(jiǎn)單,更多的用法還需要大家進(jìn)一步學(xué)習(xí),希望大家多思考勤練習(xí),早日掌握之。