1 回答

TA貢獻(xiàn)1936條經(jīng)驗(yàn) 獲得超7個(gè)贊
AnExecutorService
不會(huì)僅僅因?yàn)橐豁?xiàng)作業(yè)(及其子作業(yè))完成而自行終止。線程池背后的整個(gè)想法是可重用。
因此,只有當(dāng)應(yīng)用程序調(diào)用shutdown()
它時(shí),它才會(huì)終止。
您可以使用它isQuiescent()
來查找是否沒有待處理的作業(yè),這僅在所有提交的作業(yè)都屬于您的特定任務(wù)時(shí)才有效。使用返回的 future 來submit
檢查實(shí)際工作的完成情況要簡潔得多。
在任何一種情況下,排隊(duì)任務(wù)的完成狀態(tài)都不會(huì)說明您正在輪詢的隊(duì)列。當(dāng)您了解提交結(jié)束時(shí),您仍然需要檢查隊(duì)列中是否有未決元素。
此外,建議使用線程安全BlockingQueue
實(shí)現(xiàn)而不是裝飾LinkedList
withsynchronized
塊。加上一些需要清理的其他內(nèi)容,代碼將如下所示:
public class TestForkJoinPoolEnd {
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
ForkJoinTask<?> future = customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
QUEUE.offer("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!future.isDone()) try {
String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println(s);
counter--;
}
} catch (InterruptedException e) {}
for(;;) {
String s = QUEUE.poll();
if (s == null) break;
System.out.println(s);
counter--;
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
customThreadPool.shutdown();
}
static List<String> makeList() {
return IntStream.range(0, MAX_SIZE)
.mapToObj(i -> makeString())
.collect(Collectors.toList());
}
static String makeString() {
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
return s.chars().sum() / SPEED_UP;
}
static void process(String s) {
long start = System.nanoTime();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.nanoTime();
QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
}
}
如果您sleep在接收端的呼叫應(yīng)該模擬一些工作量而不是等待新項(xiàng)目,您也可以使用
int counter = MAX_SIZE + 1;
while (!future.isDone()) {
String s = QUEUE.poll();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {}
}
但邏輯沒有改變。future.isDone()返回后true,我們必須重新檢查隊(duì)列中待處理的元素。我們只保證不會(huì)有新的項(xiàng)目到達(dá),而不保證隊(duì)列已經(jīng)空了。
作為旁注,該makeString()方法可以進(jìn)一步改進(jìn)
static String makeString() {
int targetStringLength = 10;
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('a', 'z' + 1);
buffer.append((char)randomLimitedInt);
}
return buffer.toString();
}
甚至
static String makeString() {
int targetStringLength = 10;
return ThreadLocalRandom.current()
.ints(targetStringLength, 'a', 'z'+1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
添加回答
舉報(bào)