1. Introduction
1.绪论
Parallel-collectors is a small library that provides a set of Java Stream API collectors that enable parallel processing – while at the same time circumventing main deficiencies of standard Parallel Streams.
并行收集器是一个小型库,它提供了一组Java流API收集器,可以实现并行处理 – 同时规避了标准并行流的主要缺陷。
2. Maven Dependencies
2.Maven的依赖性
If we want to start using the library, we need to add a single entry in Maven’s pom.xml file:
如果我们想开始使用该库,我们需要在Maven的pom.xml文件中添加一个条目。
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>1.1.0</version>
</dependency>
Or a single line in Gradle’s build file:
或者在Gradle的构建文件中的一个单行。
compile 'com.pivovarit:parallel-collectors:1.1.0'
The newest version can be found on Maven Central.
最新的版本可以在Maven中心找到。
3. Parallel Streams Caveats
3.平行流的注意事项
Parallel Streams were one of Java 8’s highlights, but they turned out to be applicable to heavy CPU processing exclusively.
并行流是Java 8的亮点之一,但它们原来只适用于重型CPU处理。
The reason for this was the fact that Parallel Streams were internally backed by a JVM-wide shared ForkJoinPool, which provided limited parallelism and was used by all Parallel Streams running on a single JVM instance.
其原因是,并行流在内部由JVM范围内的共享ForkJoinPool支持,它提供了有限的并行性,并被运行在单个JVM实例上的所有并行流使用。
For example, imagine we have a list of ids and we want to use them to fetch a list of users and that this operation is expensive.
例如,设想我们有一个id列表,我们想用它们来获取一个用户列表,而且这个操作很昂贵。
We could use Parallel Streams for that:
我们可以使用平行流来实现这一点。
List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.parallelStream()
.map(i -> fetchById(i)) // each operation takes one second
.collect(Collectors.toList());
System.out.println(results); // [user-1, user-2, user-3]
And indeed, we can see that there’s a noticeable speedup. But it becomes problematic if we start running multiple parallel blocking operations… in parallel. This might quickly saturate the pool and result in potentially huge latencies. That’s why it’s important to build bulkheads by creating separate thread pools – to prevent unrelated tasks from influencing each other’s execution.
事实上,我们可以看到有一个明显的速度提升。但是,如果我们开始运行多个并行的阻塞操作……就会出现问题。这可能会使池子迅速饱和并导致潜在的巨大延迟。这就是为什么通过创建独立的线程池来建立隔板很重要–以防止不相关的任务影响彼此的执行。
In order to provide a custom ForkJoinPool instance, we could leverage the trick described here, but this approach relied on an undocumented hack and was faulty until JDK10. We can read more in the issue itself – [JDK8190974].
为了提供一个自定义的ForkJoinPool实例,我们可以利用这里描述的技巧,但是这种方法依赖于一个没有记录的黑客,并且在JDK10之前是有缺陷的。我们可以在该问题本身中阅读更多内容 – [JDK8190974]。
4. Parallel Collectors in Action
4.平行收集器在行动
Parallel Collectors, as the name suggests, are just standard Stream API Collectors that allow performing additional operations in parallel at collect() phase.
并行采集器,顾名思义,只是标准的流API采集器,允许在collect()阶段并行执行额外的操作。
ParallelCollectors (which mirrors Collectors class) class is a facade providing access to the whole functionality of the library.
ParallelCollectors(它反映了Collectors类)类是一个门面,提供对库的整个功能的访问。
If we wanted to redo the above example, we could simply write:
如果我们想重做上述例子,我们可以简单地写。
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Integer> ids = Arrays.asList(1, 2, 3);
CompletableFuture<List<String>> results = ids.stream()
.collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));
System.out.println(results.join()); // [user-1, user-2, user-3]
The result is the same, however, we were able to provide our custom thread pool, specify our custom parallelism level, and the result arrived wrapped in a CompletableFuture instance without blocking the current thread.
然而,结果是一样的,我们能够提供我们的自定义线程池,指定我们的自定义并行级别,并且结果到达时被包裹在一个CompletableFuture实例中,而不会阻塞当前线程。
Standard Parallel Streams, on the other hand, couldn’t achieve any of those.
而标准并行流则无法实现这些。
4.1. ParallelCollectors.parallelToList/ToSet()
4.1.ParallelCollectors.parallelToList/ToSet()
As intuitive as it gets, if we want to process a Stream in parallel and collect results into a List or Set, we can simply use ParallelCollectors.parallelToList or parallelToSet:
就像它一样直观,如果我们想并行处理一个Stream并将结果收集到List或Set中,我们可以简单地使用ParallelCollectors.parallelToList或parallelToSet。
List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.stream()
.collect(parallelToList(i -> fetchById(i), executor, 4))
.join();
4.2. ParallelCollectors.parallelToMap()
4.2.ParallelCollectors.parallelToMap()
If we want to collect Stream elements into a Map instance, just like with Stream API, we need to provide two mappers:
如果我们想把Stream元素收集到一个Map实例中,就像使用Stream API一样,我们需要提供两个映射器。
List<Integer> ids = Arrays.asList(1, 2, 3);
Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
.join(); // {1=user-1, 2=user-2, 3=user-3}
We can also provide a custom Map instance Supplier:
我们还可以提供一个自定义的Map实例Supplier。
Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
.join();
And a custom conflict resolution strategy:
还有一个定制的冲突解决策略。
List<Integer> ids = Arrays.asList(1, 2, 3);
Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
.join();
4.3. ParallelCollectors.parallelToCollection()
4.3.ParallelCollectors.parallelToCollection()
Similarly to the above, we can pass our custom Collection Supplier if we want to obtain results packaged in our custom container:
与上述类似,如果我们想获得打包在我们自定义容器中的结果,我们可以传递我们的自定义Collection Supplier。
List<String> results = ids.stream()
.collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
.join();
4.4. ParallelCollectors.parallelToStream()
4.4.ParallelCollectors.parallelToStream()
If the above isn’t enough, we can actually obtain a Stream instance and continue custom processing there:
如果上述内容还不够,我们实际上可以获得一个Stream实例,并在那里继续进行自定义处理。
Map<Integer, List<String>> results = ids.stream()
.collect(parallelToStream(i -> fetchById(i), executor, 4))
.thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
.join();
4.5. ParallelCollectors.parallel()
4.5.ParallelCollectors.parallel()
This one allows us to stream results in completion order:
这个允许我们按完成度顺序流转结果。
ids.stream()
.collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);
// user-1
// user-3
// user-2
In this case, we can expect the collector to return different results each time since we introduced a random processing delay.
在这种情况下,我们可以预期采集器每次都会返回不同的结果,因为我们引入了一个随机的处理延迟。
4.6. ParallelCollectors.parallelOrdered()
4.6.ParallelCollectors.parallelOrdered()
This facility allows streaming results just like the above, but maintains original order:
该设施允许像上述那样流式传输结果,但保持原始顺序。
ids.stream()
.collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);
// user-1
// user-2
// user-3
In this case, the collector will always maintain the order but might be slower than the above.
在这种情况下,收集器将始终保持秩序,但可能比上述速度慢。
5. Limitations
5.限制条件
At the point of writing, parallel-collectors don’t work with infinite streams even if short-circuiting operations are used – it’s a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations so the stream needs to process all upstream elements before getting terminated.
在撰写本文时,并行收集器不能与无限的流一起工作,即使使用了短路操作–这是由Stream API内部施加的设计限制。简单地说,Streams将收集器视为非短路操作,因此流在被终止之前需要处理所有上游元素。
The other limitation is that short-circuiting operations don’t interrupt the remaining tasks after short-circuiting.
另一个限制是,短路操作不会在短路后中断其余任务。
6. Conclusion
6.结语
We saw how the parallel-collectors library allows us to perform parallel processing by using custom Java Stream API Collectors and CompletableFutures to utilize custom thread pools, parallelism, and non-blocking style of CompletableFutures.
我们看到了并行收集器库如何让我们通过使用自定义的Java Stream API Collectors和CompletableFutures来执行并行处理,以利用自定义线程池、并行性和CompletableFutures的非阻塞风格。
As always, code snippets are available over on GitHub.
一如既往,代码片段可在GitHub上获得。
For further reading, see the parallel-collectors library on GitHub, the author’s blog, and the author’s Twitter account.