你好,我是吴计可师,一个工作十多年的后端开发,曾就职京东、阿里等多家互联网头部企业。
文章可能会比较长,主要解析的非常详解,或涉及一些底层知识,供面试高阶难度用。可以根据自己实际理解情况合理取舍阅读
Fork/Join 是 Java 提供的一种用于并行任务处理的框架,最初在 Java 7 的 java.util.concurrent 包中引入,旨在利用多核处理器的计算能力,帮助开发者更高效地处理可以被拆分成子任务的计算任务。
任务分解(Fork):将一个大任务分解成若干个子任务,直到子任务足够小且易于计算。
任务合并(Join):将各个子任务的计算结果合并,得到最终的任务结果。
工作窃取(Work-Stealing)算法:线程池中的线程如果完成了自己的任务,会主动尝试窃取其他线程的任务队列,以最大化 CPU 的利用率。
ForkJoinPool:
Fork/Join 框架的线程池,负责任务的调度和执行。
使用 ForkJoinPool 时,不需要手动管理线程,它内部会为每个任务分配线程。
RecursiveTask:
用于有返回值的任务。
需要实现 compute() 方法,定义任务的分解逻辑。
RecursiveAction:
用于无返回值的任务。
也需要实现 compute() 方法。
创建任务:将待执行的任务创建为 RecursiveTask 或 RecursiveAction 的子类。
任务分解:在 compute() 方法中递归拆分任务。
提交任务:使用 ForkJoinPool 提交任务。
任务执行与结果合并:子任务被线程池执行,执行结果通过 join() 合并。
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
// RecursiveTask to compute the sum of an array
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10;
private final int[] numbers;
private final int start;
private final int end;
public SumTask(int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(numbers, start, mid);
SumTask rightTask = new SumTask(numbers, mid, end);
leftTask.fork(); // Split task into left
long rightResult = rightTask.compute(); // Compute right task directly
long leftResult = leftTask.join(); // Wait for left task result
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] numbers = new int[100];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = i + 1;
}
SumTask task = new SumTask(numbers, 0, numbers.length);
long result = pool.invoke(task); // Start the ForkJoinTask
System.out.println("Sum: " + result);
}
}
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinActionExample {
static class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 5;
private final int start;
private final int end;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println("Printing number: " + i);
}
} else {
int mid = (start + end) / 2;
PrintTask leftTask = new PrintTask(start, mid);
PrintTask rightTask = new PrintTask(mid, end);
invokeAll(leftTask, rightTask);
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
PrintTask task = new PrintTask(0, 20);
pool.invoke(task);
}
}
优势:
充分利用多核 CPU:通过任务分解和工作窃取,最大化 CPU 核心利用率。
简化并行任务的实现:无需手动管理线程,开发者专注于任务的逻辑。
局限性:
任务分解开销:任务分解和合并本身会带来额外的开销,适用于大任务的并行处理。
适用场景有限:更适合 CPU 密集型任务,而不是 I/O 密集型任务。
Fork/Join 框架适用于需要将大任务拆分为多个独立小任务并行处理的场景,比如数组求和、矩阵计算、递归任务等。
5.1. 什么是 Fork/Join 框架?适用于哪些场景?
定义:Fork/Join 是 Java 7 引入的一种用于并行任务执行的框架,能够充分利用多核 CPU 的处理能力。
核心思想:将大任务拆分成多个子任务(Fork),子任务并行处理后合并结果(Join)。
适用场景:
计算密集型任务,例如大数据量的数组求和、矩阵计算。
可递归拆分的任务,例如递归算法的优化(如归并排序、斐波那契数列计算)。
不适用于 I/O 密集型任务或非可分解任务。
ForkJoinPool:提供任务的调度和管理,内部使用工作窃取算法。
ForkJoinTask:
RecursiveTask(有返回值)。
RecursiveAction(无返回值)。
Fork/Join 的基础任务类,分为两种:
工作窃取算法:当线程完成自己的任务时,会尝试从其他线程的任务队列中窃取任务,提高资源利用率。
每个线程有一个双端队列(Deque),用于存储要处理的任务。
线程从队列的尾部(LIFO)获取任务进行处理。
如果线程队列为空,则从其他线程的队列头部(FIFO)窃取任务执行。
工作窃取减少了线程间的锁竞争,能更高效地利用 CPU。
特性 | Fork/Join 框架 | 普通线程池 |
---|---|---|
适用场景 | 递归分解任务、计算密集型任务 | 并发任务处理 |
任务拆分 | 支持任务分解(Fork) | 不支持任务分解 |
工作窃取 | 支持,线程空闲时从其他线程队列窃取任务 | 不支持工作窃取 |
任务管理 | 基于 ForkJoinTask | 基于 Runnable 或 Callable |
使用任务分解的阈值(THRESHOLD)。
在任务规模小于阈值时,直接进行计算,不再进一步分解。
合理设置阈值大小,以平衡分解开销和并行计算的收益。
compute() 是 Fork/Join 框架中定义任务行为的核心方法。
主要作用是判断任务是否足够小:
如果是,直接执行任务。
如果不是,将任务拆分成多个子任务,并递归调用 fork() 和 compute()。
使用 join() 方法等待子任务完成,并获取其结果。
将多个子任务的结果进行合并,形成最终结果。
long leftResult = leftTask.join(); // 获取左子任务结果
long rightResult = rightTask.compute(); // 直接计算右子任务
return leftResult + rightResult; // 合并结果
Fork/Join 框架的任务管理不是基于传统方法调用栈,而是基于工作队列。
任务的递归调用实际上是将子任务放入线程的双端队列中,避免了传统递归方法的栈深度限制。
提高线程利用率,减少空闲线程。
缓解负载不均的情况,提升任务执行效率。
缺点:
窃取任务有额外的开销(如任务调度和线程上下文切换)。
窃取任务可能导致线程竞争,降低性能。
今天的内容就分享到这儿,喜欢的朋友可以关注,点赞。有什么不足的地方欢迎留言指出,您的关注是我前进的动力!