ForkJoin 應用示例
1. 前言
上一節(jié)我們學習了 ForkJoin 的基本概念和核心 API,本節(jié)帶領大家實現(xiàn)一個具體的應用案例。從實際應用中感受一下 ForkJoin 框架的使用,以及此框架帶來的便利。
本節(jié)先描述待實現(xiàn)的案例內容,接著做編碼實現(xiàn),然后總結使用過程中的注意事項。
2. 案例描述
我們在實際項目中,常常會碰到大數(shù)據(jù)集合的處理,如大文件、大表、內存中的大數(shù)據(jù)集合。由于數(shù)據(jù)量大,我們常常會考慮采用多線程的思路提高處理效率。
當待處理的數(shù)據(jù)集的每一部分的數(shù)據(jù)處理邏輯基本一致,且可以很好拆分成小的數(shù)據(jù)集進行處理時,使用 ForkJoin 進行處理比好合適。
我們還是采用 Executor 應用示例中的場景:需要對某個目錄下的所有文件(成百上千)進行加密并用文件的 MD5 串修改文件名稱。
在開始動手實現(xiàn)之前,我們先做一個簡單的分析。在這個案例中,我們將 “對文件進行加密、生成 MD5 串、修改文件名稱” 作為待執(zhí)行任務的內容。所有文件形成的列表就是我們待處理的數(shù)據(jù)范圍。為了校驗整個處理過程是否有文件遺漏,我們最終需要核對處理結果,所以我們用 FileForkJoinTask 類(繼承 RecursiveTask)封裝我們的任務邏輯。為了方便演示,下面編碼中部分數(shù)據(jù)采用了模擬的方式生成。
3. 編碼實現(xiàn)
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
public class ForkJoinTest {
// 模擬待處理的文件列表
private static int fileListSize = new Random().nextInt(15);
private static String[] fileList = new String[fileListSize];
static {
for(int i=0; i<fileListSize; i++) {
fileList[i] = "fileName" + i;
}
}
// 主線程
public static void main(String[] args) throws Exception {
// 創(chuàng)建用于處理任務的線程池
// ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); 這種創(chuàng)建方式可最大化使用全局系統(tǒng)資源
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交待處理的總任務
Future result = forkJoinPool.submit(new FileDealTask(0, fileListSize, fileList));
// 獲取任務執(zhí)行結果
System.out.println("預備處理的文件個數(shù)" + fileListSize + ",總共處理的文件個數(shù):" + result.get());
// 關閉線程池
forkJoinPool.shutdown();
}
}
上面代碼注釋已經很清楚了,我們觀察下面的代碼,看看任務是怎么切分的,以及子任務的結果是怎么做匯總的。
import lombok.SneakyThrows;
import java.util.Random;
import java.util.concurrent.RecursiveTask;
public class FileDealTask extends RecursiveTask<Integer> {
private String[] fileList;
// 當子任務劃分到只需要處理最多10個文件時,停止分割任務
private final int threshold = 2;
private int first;
private int last;
public FileDealTask(int first, int last, String[] fileList) {
this.fileList = fileList;
this.first = first;
this.last = last;
}
@SneakyThrows
@Override
protected Integer compute() {
// 執(zhí)行結果
int result = 0;
// 任務足夠小則直接處理(對文件進行加密、生成MD5串、修改文件名稱)
if (last - first <= threshold) {
for (int i = first; i < last; i++) {
result = result + 1;
Thread.sleep(new Random().nextInt(2000));
System.out.println(Thread.currentThread().getName() + ":文件" + fileList[i] + "已處理完畢");
}
System.out.println(Thread.currentThread().getName() + ":總共處理的文件數(shù) (" + first + "," + last + ")" + result);
} else {
// 拆分成小任務
int middle = first + (last - first) / 2;
// 創(chuàng)建兩個子任務
FileDealTask leftTask = new FileDealTask(first, middle, fileList);
FileDealTask rightTask = new FileDealTask(middle, last, fileList);
// 觸發(fā)兩個子任務開始執(zhí)行
invokeAll(leftTask, rightTask);
// 等待兩個子任務執(zhí)行結果并返回
result = leftTask.join() + rightTask.join();
System.out.println(Thread.currentThread().getName() + ":當前任務繼續(xù)拆分 "
+ " (" + first + "," + middle + "), (" + (middle) + "," + last + ")");
}
return result;
}
}
我們通過在 IDE 中運行上面這個示例,看看實際的運行結果。
【補充視頻】
上面代碼邏輯中有隨機內容,每次運行結果會有差異,運行上面的代碼,我們觀察某次運行結果如下:
ForkJoinPool-1-worker-2:文件fileName3已處理完畢
ForkJoinPool-1-worker-2:總共處理的文件數(shù) (3,4)1
ForkJoinPool-1-worker-1:文件fileName0已處理完畢
ForkJoinPool-1-worker-1:總共處理的文件數(shù) (0,1)1
ForkJoinPool-1-worker-0:文件fileName4已處理完畢
ForkJoinPool-1-worker-3:文件fileName1已處理完畢
ForkJoinPool-1-worker-3:文件fileName2已處理完畢
ForkJoinPool-1-worker-3:總共處理的文件數(shù) (1,3)2
ForkJoinPool-1-worker-1:當前任務繼續(xù)拆分 (0,1), (1,3)
ForkJoinPool-1-worker-0:文件fileName5已處理完畢
ForkJoinPool-1-worker-0:總共處理的文件數(shù) (4,6)2
ForkJoinPool-1-worker-2:當前任務繼續(xù)拆分 (3,4), (4,6)
ForkJoinPool-1-worker-1:當前任務繼續(xù)拆分 (0,3), (3,6)
預備處理的文件個數(shù)6,總共處理的文件個數(shù):6
首先做了 (0,3)~(3,6),之后對 (0,3) 做了 (0,1), (1,3) 的拆分,對 (3,6) 做了 (3,4), (4,6) 的拆分。和我們的預期一致。
4. 注意事項
- ForkJoinPool 不是為了替代 ExecutorService,其主要用于實現(xiàn) “分而治之” 的算法,最適合處理計算密集型的任務。
- 做好評估權衡,當需要處理的數(shù)據(jù)量不是特別大時,沒有必要使用 ForkJoin。其底層使用多線程的方式處理任務,涉及到線程上下文的切換,當數(shù)據(jù)量不大的時候使用串行會比使用多線程快。
- 執(zhí)行子任務時候要注意,使用 invokeAll,不能分別對子任務調用 fork。
5. 視頻演示
6. 小結
本節(jié)通過一個實際例子的編碼實現(xiàn),展示了 ForkJoinPool 的具體用法。當然本節(jié)中的用法相對比較簡單,更多的用法還需要大家進一步學習,希望大家多思考勤練習,早日掌握之。