阿昌浅谈ForkJoin
阿昌 Java小菜鸡
# 阿昌浅谈ForkJoin

分治算法思想

Hi! 阿昌我又来了。

这次要聊ForkJoin,那必然要说一说分治算法思想

简单来说就说一句话分而治之,将一个任务拆分成一个一个小任务,如果小任务还是很大,就再继续拆分,直到能够处理。

正文

ForkJoin是Java1.7时引入的多线程框架。

它可以进行特殊的任务:分治任务(把一个大任务拆分成多个小任务并行执行)

举个例子

现在要计算一个大型数组的乘积,最简单的方式时,用一个遍历For循环在一个线程内完成:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一个方式,把数组拆成两个部分,分开计算,最后加起来就是最终结果,用两个线程并行执行:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆开的两部分还是很大,那么就继续拆,用4个线程并行执行:

┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘

Fork/Join任务的原理:判断一个任务是否足够的小,如果是,直接计算,否则,就分拆成几个小任务分别计算。

这个过程可以反复“裂变”成一系列小任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class Scratch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建1000个随机数组成的数组:
long startTime1 = System.currentTimeMillis();
long[] array = new long[1000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);

static long random() {
return random.nextInt(10000);
}
}

class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;

SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum *= this.array[i];
// 故意放慢计算速度:
try {Thread.sleep(3);} catch (InterruptedException ignored) {}
System.out.println(i);
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}

观察上述代码的执行过程,一个大的计算任务01000首先分裂为两个小任务01000和10002000,这两个小任务仍然太大,继续分裂为更小的0500,5001000,10001500,1500~2000,最后,计算结果被依次合并,得到最终结果。

核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class SumTask extends RecursiveTask<Long> {
protected Long compute() {
// “分裂”子任务:
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// invokeAll会并行运行两个子任务:
invokeAll(subtask1, subtask2);
// 获得子任务的结果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
// 汇总结果:
return subresult1 + subresult2;
}
}

主要执行方法

  • submit

    异步开始执行任务同时返回这个任务,并非直接结果;

    这个过程中没有join,所以不会直到任务完成,当获取ForkJoinTask任务对象引用后,.get()可获取到直接结果

    1
    ForkJoinTask<Long> submit = ForkJoinPool.commonPool().submit(task);
  • invoke

    异步开始执行任务,直接返回直接结果,这个过程中有join直到任务完成

    1
    Long result = ForkJoinPool.commonPool().invoke(task);
  • execute

    异步执行这个任务,但是没有任何返回,这个过程中没有join,所以不会直到任务完成

    1
    ForkJoinPool.commonPool().execute(task);

代码涉及应用

  • Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
  • CompletableFuture中也使用到了ForkJoin,当不给定线程池时,就会去使用ForkJoin中的ForkJoinPool

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率

 请作者喝咖啡