你好,我是吴计可师,一个工作十多年的后端开发,曾就职京东、阿里等多家互联网头部企业。
文章可能会比较长,主要解析的非常详解,或涉及一些底层知识,供面试高阶难度用。可以根据自己实际理解情况合理取舍阅读
Fork/Join 是 Java 提供的一种用于并行任务处理的框架,最初在 Java 7 的 java.util.concurrent 包中引入,旨在利用多核处理器的计算能力,帮助开发者更高效地处理可以被拆分成子任务的计算任务。
任务分解(Fork):将一个大任务分解成若干个子任务,直到子任务足够小且易于计算。
任务合并(Join):将各个子任务的计算结果合并,得到最终的任务结果。
工作窃取(Work-Stealing)算法:线程池中的线程如果完成了自己的任务,会主动尝试窃取其他线程的任务队列,以最大化 CPU 的利用率。
ForkJoinPool:
Fork/Join 框架的线程池,负责任务的调度和执行。
使用 ForkJoinPool 时,不需要手动管理线程,它内部会为每个任务分配线程。
RecursiveTask:
用于有返回值的任务。
需要实现 compute() 方法,定义任务的分解逻辑。
RecursiveAction:
用于无返回值的任务。
也需要实现 compute() 方法。
创建任务:将待执行的任务创建为 RecursiveTask 或 RecursiveAction 的子类。
任务分解:在 compute() 方法中递归拆分任务。
提交任务:使用 ForkJoinPool 提交任务。
任务执行与结果合并:子任务被线程池执行,执行结果通过 join() 合并。
import java.util.concurrent.RecursiveTask;import java.util.concurrent.ForkJoinPool;public class ForkJoinExample {// RecursiveTask to compute the sum of an arraystatic class SumTask extends RecursiveTask<Long> {private static final int THRESHOLD = 10;private final int[] numbers;private final int start;private final int end;public SumTask(int[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}protected Long compute() {if (end - start <= THRESHOLD) {long sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;} else {int mid = (start + end) / 2;SumTask leftTask = new SumTask(numbers, start, mid);SumTask rightTask = new SumTask(numbers, mid, end);leftTask.fork(); // Split task into leftlong rightResult = rightTask.compute(); // Compute right task directlylong leftResult = leftTask.join(); // Wait for left task resultreturn leftResult + rightResult;}}}public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();int[] numbers = new int[100];for (int i = 0; i < numbers.length; i++) {numbers[i] = i + 1;}SumTask task = new SumTask(numbers, 0, numbers.length);long result = pool.invoke(task); // Start the ForkJoinTaskSystem.out.println("Sum: " + result);}}
import java.util.concurrent.RecursiveAction;import java.util.concurrent.ForkJoinPool;public class ForkJoinActionExample {static class PrintTask extends RecursiveAction {private static final int THRESHOLD = 5;private final int start;private final int end;public PrintTask(int start, int end) {this.start = start;this.end = end;}protected void compute() {if (end - start <= THRESHOLD) {for (int i = start; i < end; i++) {System.out.println("Printing number: " + i);}} else {int mid = (start + end) / 2;PrintTask leftTask = new PrintTask(start, mid);PrintTask rightTask = new PrintTask(mid, end);invokeAll(leftTask, rightTask);}}}public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();PrintTask task = new PrintTask(0, 20);pool.invoke(task);}}
优势:
充分利用多核 CPU:通过任务分解和工作窃取,最大化 CPU 核心利用率。
简化并行任务的实现:无需手动管理线程,开发者专注于任务的逻辑。
局限性:
任务分解开销:任务分解和合并本身会带来额外的开销,适用于大任务的并行处理。
适用场景有限:更适合 CPU 密集型任务,而不是 I/O 密集型任务。
Fork/Join 框架适用于需要将大任务拆分为多个独立小任务并行处理的场景,比如数组求和、矩阵计算、递归任务等。
5.1. 什么是 Fork/Join 框架?适用于哪些场景?
定义:Fork/Join 是 Java 7 引入的一种用于并行任务执行的框架,能够充分利用多核 CPU 的处理能力。
核心思想:将大任务拆分成多个子任务(Fork),子任务并行处理后合并结果(Join)。
适用场景:
计算密集型任务,例如大数据量的数组求和、矩阵计算。
可递归拆分的任务,例如递归算法的优化(如归并排序、斐波那契数列计算)。
不适用于 I/O 密集型任务或非可分解任务。
ForkJoinPool:提供任务的调度和管理,内部使用工作窃取算法。
ForkJoinTask:
RecursiveTask(有返回值)。
RecursiveAction(无返回值)。
Fork/Join 的基础任务类,分为两种:
工作窃取算法:当线程完成自己的任务时,会尝试从其他线程的任务队列中窃取任务,提高资源利用率。
每个线程有一个双端队列(Deque),用于存储要处理的任务。
线程从队列的尾部(LIFO)获取任务进行处理。
如果线程队列为空,则从其他线程的队列头部(FIFO)窃取任务执行。
工作窃取减少了线程间的锁竞争,能更高效地利用 CPU。
| 特性 | Fork/Join 框架 | 普通线程池 |
|---|---|---|
| 适用场景 | 递归分解任务、计算密集型任务 | 并发任务处理 |
| 任务拆分 | 支持任务分解(Fork) | 不支持任务分解 |
| 工作窃取 | 支持,线程空闲时从其他线程队列窃取任务 | 不支持工作窃取 |
| 任务管理 | 基于 ForkJoinTask | 基于 Runnable 或 Callable |
使用任务分解的阈值(THRESHOLD)。
在任务规模小于阈值时,直接进行计算,不再进一步分解。
合理设置阈值大小,以平衡分解开销和并行计算的收益。
compute() 是 Fork/Join 框架中定义任务行为的核心方法。
主要作用是判断任务是否足够小:
如果是,直接执行任务。
如果不是,将任务拆分成多个子任务,并递归调用 fork() 和 compute()。
使用 join() 方法等待子任务完成,并获取其结果。
将多个子任务的结果进行合并,形成最终结果。
long leftResult = leftTask.join(); // 获取左子任务结果long rightResult = rightTask.compute(); // 直接计算右子任务return leftResult + rightResult; // 合并结果
Fork/Join 框架的任务管理不是基于传统方法调用栈,而是基于工作队列。
任务的递归调用实际上是将子任务放入线程的双端队列中,避免了传统递归方法的栈深度限制。
提高线程利用率,减少空闲线程。
缓解负载不均的情况,提升任务执行效率。
缺点:
窃取任务有额外的开销(如任务调度和线程上下文切换)。
窃取任务可能导致线程竞争,降低性能。
今天的内容就分享到这儿,喜欢的朋友可以关注,点赞。有什么不足的地方欢迎留言指出,您的关注是我前进的动力!