第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

ForkJoin 應用示例

1. 前言

上一節(jié)我們學習了 ForkJoin 的基本概念和核心 API,本節(jié)帶領大家實現(xiàn)一個具體的應用案例。從實際應用中感受一下 ForkJoin 框架的使用,以及此框架帶來的便利。

本節(jié)先描述待實現(xiàn)的案例內容,接著做編碼實現(xiàn),然后總結使用過程中的注意事項。

2. 案例描述

我們在實際項目中,常常會碰到大數(shù)據(jù)集合的處理,如大文件、大表、內存中的大數(shù)據(jù)集合。由于數(shù)據(jù)量大,我們常常會考慮采用多線程的思路提高處理效率。
當待處理的數(shù)據(jù)集的每一部分的數(shù)據(jù)處理邏輯基本一致,且可以很好拆分成小的數(shù)據(jù)集進行處理時,使用 ForkJoin 進行處理比好合適。

我們還是采用 Executor 應用示例中的場景:需要對某個目錄下的所有文件(成百上千)進行加密并用文件的 MD5 串修改文件名稱。

在開始動手實現(xiàn)之前,我們先做一個簡單的分析。在這個案例中,我們將 “對文件進行加密、生成 MD5 串、修改文件名稱” 作為待執(zhí)行任務的內容。所有文件形成的列表就是我們待處理的數(shù)據(jù)范圍。為了校驗整個處理過程是否有文件遺漏,我們最終需要核對處理結果,所以我們用 FileForkJoinTask 類(繼承 RecursiveTask)封裝我們的任務邏輯。為了方便演示,下面編碼中部分數(shù)據(jù)采用了模擬的方式生成。

3. 編碼實現(xiàn)

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class ForkJoinTest {

    // 模擬待處理的文件列表
    private static int fileListSize = new Random().nextInt(15);
    private static String[] fileList = new String[fileListSize];
    static {
        for(int i=0; i<fileListSize; i++) {
            fileList[i] = "fileName" + i;
        }
    }

    // 主線程
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建用于處理任務的線程池
        // ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); 這種創(chuàng)建方式可最大化使用全局系統(tǒng)資源
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交待處理的總任務
        Future result = forkJoinPool.submit(new FileDealTask(0, fileListSize, fileList));
        // 獲取任務執(zhí)行結果
        System.out.println("預備處理的文件個數(shù)" + fileListSize + ",總共處理的文件個數(shù):" + result.get());
        // 關閉線程池
        forkJoinPool.shutdown();
    }
}

上面代碼注釋已經很清楚了,我們觀察下面的代碼,看看任務是怎么切分的,以及子任務的結果是怎么做匯總的。

import lombok.SneakyThrows;
import java.util.Random;
import java.util.concurrent.RecursiveTask;

public class FileDealTask extends RecursiveTask<Integer> {

    private String[] fileList;
    // 當子任務劃分到只需要處理最多10個文件時,停止分割任務
    private final int threshold = 2;
    private int first;
    private int last;

    public FileDealTask(int first, int last, String[] fileList) {
        this.fileList = fileList;
        this.first = first;
        this.last = last;
    }

    @SneakyThrows
    @Override
    protected Integer compute() {
        // 執(zhí)行結果
        int result = 0;

        // 任務足夠小則直接處理(對文件進行加密、生成MD5串、修改文件名稱)
        if (last - first <= threshold) {
            for (int i = first; i < last; i++) {
                result = result + 1;
                Thread.sleep(new Random().nextInt(2000));
                System.out.println(Thread.currentThread().getName() + ":文件" + fileList[i] + "已處理完畢");
            }
            System.out.println(Thread.currentThread().getName() + ":總共處理的文件數(shù) (" + first + "," + last + ")" + result);
        } else {
            // 拆分成小任務
            int middle = first + (last - first) / 2;
            // 創(chuàng)建兩個子任務
            FileDealTask leftTask = new FileDealTask(first, middle, fileList);
            FileDealTask rightTask = new FileDealTask(middle, last, fileList);
            // 觸發(fā)兩個子任務開始執(zhí)行
            invokeAll(leftTask, rightTask);
            // 等待兩個子任務執(zhí)行結果并返回
            result = leftTask.join() + rightTask.join();
            System.out.println(Thread.currentThread().getName() + ":當前任務繼續(xù)拆分 "
                    + " (" + first + "," + middle + "), (" + (middle) + "," + last + ")");
        }
        return result;
    }
}

我們通過在 IDE 中運行上面這個示例,看看實際的運行結果。
【補充視頻】

上面代碼邏輯中有隨機內容,每次運行結果會有差異,運行上面的代碼,我們觀察某次運行結果如下:

ForkJoinPool-1-worker-2:文件fileName3已處理完畢
ForkJoinPool-1-worker-2:總共處理的文件數(shù) (3,4)1
ForkJoinPool-1-worker-1:文件fileName0已處理完畢
ForkJoinPool-1-worker-1:總共處理的文件數(shù) (0,1)1
ForkJoinPool-1-worker-0:文件fileName4已處理完畢
ForkJoinPool-1-worker-3:文件fileName1已處理完畢
ForkJoinPool-1-worker-3:文件fileName2已處理完畢
ForkJoinPool-1-worker-3:總共處理的文件數(shù) (1,3)2
ForkJoinPool-1-worker-1:當前任務繼續(xù)拆分  (0,1), (1,3)
ForkJoinPool-1-worker-0:文件fileName5已處理完畢
ForkJoinPool-1-worker-0:總共處理的文件數(shù) (4,6)2
ForkJoinPool-1-worker-2:當前任務繼續(xù)拆分  (3,4), (4,6)
ForkJoinPool-1-worker-1:當前任務繼續(xù)拆分  (0,3), (3,6)
預備處理的文件個數(shù)6,總共處理的文件個數(shù):6

首先做了 (0,3)~(3,6),之后對 (0,3) 做了 (0,1), (1,3) 的拆分,對 (3,6) 做了 (3,4), (4,6) 的拆分。和我們的預期一致。

4. 注意事項

  1. ForkJoinPool 不是為了替代 ExecutorService,其主要用于實現(xiàn) “分而治之” 的算法,最適合處理計算密集型的任務。
  2. 做好評估權衡,當需要處理的數(shù)據(jù)量不是特別大時,沒有必要使用 ForkJoin。其底層使用多線程的方式處理任務,涉及到線程上下文的切換,當數(shù)據(jù)量不大的時候使用串行會比使用多線程快。
  3. 執(zhí)行子任務時候要注意,使用 invokeAll,不能分別對子任務調用 fork。

5. 視頻演示

6. 小結

本節(jié)通過一個實際例子的編碼實現(xiàn),展示了 ForkJoinPool 的具體用法。當然本節(jié)中的用法相對比較簡單,更多的用法還需要大家進一步學習,希望大家多思考勤練習,早日掌握之。