# 阿昌浅谈ForkJoin
分治算法思想
Hi! 阿昌我又来了。
这次要聊ForkJoin
,那必然要说一说分治算法思想。
简单来说就说一句话分而治之
,将一个任务拆分成一个一个小任务,如果小任务还是很大,就再继续拆分,直到能够处理。
正文
ForkJoin是Java1.7时引入的多线程框架。
它可以进行特殊的任务:分治任务
(把一个大任务拆分成多个小任务并行执行)
举个例子:
现在要计算一个大型数组的乘积,最简单的方式时,用一个遍历For循环在一个线程内完成:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
还有一个方式,把数组拆成两个部分,分开计算,最后加起来就是最终结果,用两个线程并行执行:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
如果拆开的两部分还是很大,那么就继续拆,用4个线程并行执行:
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
Fork/Join任务的原理:判断一个任务是否足够的小,如果是,直接计算,否则,就分拆成几个小任务分别计算。
这个过程可以反复“裂变
”成一系列小任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;
class Scratch { public static void main(String[] args) throws InterruptedException, ExecutionException { long startTime1 = System.currentTimeMillis(); long[] array = new long[1000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } ForkJoinTask<Long> task = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } static Random random = new Random(0);
static long random() { return random.nextInt(10000); } }
class SumTask extends RecursiveTask<Long> { static final int THRESHOLD = 500; long[] array; int start; int end;
SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum *= this.array[i]; try {Thread.sleep(3);} catch (InterruptedException ignored) {} System.out.println(i); } return sum; } int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
|
观察上述代码的执行过程,一个大的计算任务01000首先分裂为两个小任务01000和10002000,这两个小任务仍然太大,继续分裂为更小的0500,5001000,10001500,1500~2000,最后,计算结果被依次合并,得到最终结果。
核心代码SumTask
继承自RecursiveTask
,在compute()
方法中,关键是如何“分裂”出子任务并且提交子任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class SumTask extends RecursiveTask<Long> { protected Long compute() { SumTask subtask1 = new SumTask(...); SumTask subtask2 = new SumTask(...); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); return subresult1 + subresult2; } }
|
主要执行方法
submit
异步开始执行任务同时返回这个任务
,并非直接结果;
这个过程中没有join,所以不会直到任务完成,当获取ForkJoinTask任务对象引用后,.get()可获取到直接结果
1
| ForkJoinTask<Long> submit = ForkJoinPool.commonPool().submit(task);
|
invoke
异步开始执行任务,直接返回直接结果
,这个过程中有join直到任务完成
1
| Long result = ForkJoinPool.commonPool().invoke(task);
|
execute
异步执行这个任务,但是没有任何返回
,这个过程中没有join,所以不会直到任务完成
1
| ForkJoinPool.commonPool().execute(task);
|
代码涉及应用
- Java标准库提供的
java.util.Arrays.parallelSort(array)
可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
- CompletableFuture中也使用到了ForkJoin,当不给定线程池时,就会去使用ForkJoin中的ForkJoinPool
小结
Fork/Join是一种基于“分治
”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool
线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask
或RecursiveAction
。
使用Fork/Join模式可以进行并行计算以提高效率
。