你好,我是吴计可师,一个工作十多年的后端开发,曾就职京东、阿里等多家互联网头部企业。
文章可能会比较长,主要解析的非常详解,或涉及一些底层知识,供面试高阶难度用。可以根据自己实际理解情况合理取舍阅读
parallelStream 是 Java 8 引入的 Stream API 中的一种操作方式,允许我们使用多核处理器并行处理数据,从而提高性能。它使得我们可以以并行的方式执行集合中的操作,而不需要手动编写线程池或同步代码。用起来代码简洁,但是你是否遇到过一些问题呢?
当我们调用 parallelStream() 时,底层会使用 Fork-Join 线程池 来分配任务,Fork-Join 是 Java 提供的一种用于并行处理的框架。它允许将任务分解成多个子任务并行执行,然后合并结果。
ForkJoinPool 是一个可重用的线程池,能够自动管理并行任务。每个线程处理流的一个子部分。默认情况下,ForkJoinPool 使用 CPU 核心数来决定并行执行的任务数。
拆分数据:
当我们调用 parallelStream() 时,Stream 会被分成多个部分,每个部分由不同的线程处理。Java 使用 数据分割(splitting)机制来把输入流分割成多个子流,这些子流是可以并行处理的。
这些子流会被划分到不同的线程,通常,分割策略是递归的,也就是说,每个子流在处理前会继续分割直到大小合适。
并行处理:
每个子流在不同的线程中执行相应的操作。例如,如果是一个 map 操作,那么每个子流会在其对应的线程中并行地执行 map 操作。
并行处理的好处是可以充分利用多核 CPU 的性能,减少总的执行时间。
合并结果:
在并行处理完成后,需要将各个线程的结果合并。在 Java 中,这个过程是通过 合并(reduction)操作完成的。比如,在 collect 操作时,合并步骤会把每个子流的结果收集到一个最终的结果中。
性能提升:使用多个 CPU 核心并行处理数据时,可以大大提高程序的处理效率,尤其是处理大数据集时。
代码简洁:与手动实现多线程相比,parallelStream 提供了一种更简单、更直观的并行处理方式,代码更简洁。
上下文切换开销:对于小数据集或简单操作,线程上下文切换的开销可能会比并行处理的性能提升还要高。因此,在小数据量下,使用 parallelStream 可能反而会导致性能下降。
线程安全问题:在并行流操作中,如果涉及到共享可变数据,可能会引发并发问题。需要确保操作是线程安全的,或者操作不修改共享的状态。
不适合所有任务:不是所有任务都适合并行处理。比如,带有 I/O 操作的任务、任务本身计算复杂度较低等情况,使用并行流反而会因为上下文切换等开销导致性能下降。
调度器的限制:parallelStream 默认使用的 ForkJoinPool 是一个全局共享的线程池,如果有多个并行流同时运行,可能会导致线程池竞争资源,从而影响性能。
import java.util.*;
import java.util.stream.*;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 使用 parallelStream 执行并行操作
int sum = list.parallelStream()
.mapToInt(Integer::intValue)
.sum();
System.out.println("Sum: " + sum);
}
}
parallelStream 默认使用的是 ForkJoinPool.commonPool 作为底层线程池。这个线程池是 全局共享的,因此如果系统中有多个并行流同时运行,可能会导致 线程池资源争夺,影响其他任务的执行。
如果 parallelStream 与其他并发操作(例如其他并行流、ExecutorService 等)一起运行,它们可能会共享同一个线程池资源,导致并行流的性能下降。
解决方案:通过设置系统属性或传入自定义的 ForkJoinPool 来避免共享全局线程池。
其实最主要的问题是同一个项目中的parallelStream用的是同一个公用的ForkJoinPool,而你的同一个项目中,任务类型可能差别很大。因此不能随手一个parallelStream。
当数据量非常大,且操作是 CPU 密集型时,使用 parallelStream 可以显著提高性能。
对于 I/O 密集型任务,如数据库访问、文件操作等,parallelStream 可能不适用,因为线程可能会频繁阻塞,导致性能下降。
如果操作是无状态且独立的(即不会修改共享数据),使用并行流会更有效。
6.1 parallelStream 如何提高性能?
parallelStream 通过将数据分成多个块并在多个线程中并行处理,能够利用多核 CPU 的优势,提升处理大数据集时的性能。它使用 ForkJoinPool 将任务分配到不同的线程上执行,每个线程处理数据的一部分,最后合并结果。
线程创建开销:对于小规模数据集,创建多个线程的开销可能大于并行化带来的性能提升。
上下文切换开销:频繁的线程上下文切换会影响性能。
线程池资源争夺:parallelStream 默认使用全局共享的线程池(ForkJoinPool.commonPool),多个并行流共享同一个线程池资源,可能导致性能下降。
I/O 密集型任务:parallelStream 适合 CPU 密集型任务,对于 I/O 密集型任务(如文件读写、数据库访问等),并行化可能导致性能下降。
数据量小:对于小规模数据,使用 parallelStream 可能反而增加性能损耗,因为并行化的开销(线程创建、任务分配等)可能超过串行处理的效率。
I/O 密集型任务:如果流操作是 I/O 密集型的(如网络请求、文件读写),并行化可能导致性能下降。
共享可变状态:当流的操作涉及到共享的可变状态时,使用并行流可能导致线程安全问题。
顺序要求严格的操作:如果操作顺序是非常重要的,使用 parallelStream 可能会打乱顺序,除非使用 forEachOrdered()。
parallelStream 使用 ForkJoinPool 来并行执行任务,底层是通过将数据拆分为多个子任务来实现并行化。具体的分割方式由流的实现决定:
对于大部分常见的数据结构,parallelStream 会尝试均匀地分割数据,以最大化并行度。
如果数据量不均匀,可能会导致一些线程的负载过重,影响性能。
parallelStream 本身并不保证操作是线程安全的。如果操作过程中有共享的可变状态,可能会导致竞态条件。为了解决线程安全问题,可以使用线程安全的集合(如 ConcurrentHashMap)或同步机制。
parallelStream 默认情况下不会保证结果的顺序。如果你需要保证处理结果的顺序,可以使用 forEachOrdered() 方法。例如:
list.parallelStream().forEachOrdered(item -> System.out.println(item));
但是,使用 forEachOrdered() 会牺牲并行的性能,因为它会强制结果顺序执行。
在 parallelStream 中,reduce 操作是通过将数据拆分成多个子任务,并对每个子任务独立执行 reduce 操作,最后合并结果的方式来保证并行性。为了保证线程安全和正确性,reduce 操作需要是 无副作用 的,并且必须是 可结合的(即对于多个元素执行 reduce 操作的顺序不影响最终结果)。
今天的内容就分享到这儿,喜欢的朋友可以关注,点赞。有什么不足的地方欢迎留言指出,您的关注是我前进的动力!