Guide to Stream.reduce() – Stream.reduce()指南

最后修改: 2019年 3月 7日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

The Stream API provides a rich repertoire of intermediate, reduction and terminal functions, which also support parallelization.

Stream API提供了丰富的中间函数、还原函数和终端函数,它们也支持并行化。

More specifically, reduction stream operations allow us to produce one single result from a sequence of elements, by repeatedly applying a combining operation to the elements in the sequence.

更具体地说,还原流操作允许我们从一个元素序列中产生一个单一的结果,方法是对序列中的元素重复应用一个组合操作。

In this tutorial, we’ll look at the general-purpose Stream.reduce() operation and see it in some concrete use cases.

在本教程中,我们将看看通用的Stream.reduce()操作并在一些具体的使用案例中看到它。

2. The Key Concepts: Identity, Accumulator and Combiner

2.关键概念 身份、累积器和组合器

Before we look deeper into using the Stream.reduce() operation, let’s break down the operation’s participant elements into separate blocks. That way, we’ll understand more easily the role that each one plays.

在我们深入研究使用Stream.reduce()操作之前,让我们将该操作的参与元素分解成独立的块。这样,我们就会更容易理解每个人所扮演的角色。

  • Identity – an element that is the initial value of the reduction operation and the default result if the stream is empty
  • Accumulator – a function that takes two parameters: a partial result of the reduction operation and the next element of the stream
  • Combiner – a function used to combine the partial result of the reduction operation when the reduction is parallelized or when there’s a mismatch between the types of the accumulator arguments and the types of the accumulator implementation

3. Using Stream.reduce()

3.使用Stream.reduce()

To better understand the functionality of the identity, accumulator and combiner elements, let’s look at some basic examples:

为了更好地理解身份、累加器和组合器元素的功能,让我们看看一些基本的例子。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int result = numbers
  .stream()
  .reduce(0, (subtotal, element) -> subtotal + element);
assertThat(result).isEqualTo(21);

In this case, the Integer value 0 is the identity. It stores the initial value of the reduction operation and also the default result when the stream of Integer values is empty.

在这种情况下,Integer值0是标识。它存储了还原操作的初始值,也是Integer值流为空时的默认结果。

Likewise, the lambda expression:

同样地,lambda表达式

subtotal, element -> subtotal + element

is the accumulator since it takes the partial sum of Integer values and the next element in the stream.

是累加器,因为它取的是Integer值和流中下一个元素的部分和。

To make the code even more concise, we can use a method reference instead of a lambda expression:

为了使代码更加简洁,我们可以使用方法引用而不是lambda表达式。

int result = numbers.stream().reduce(0, Integer::sum);
assertThat(result).isEqualTo(21);

Of course, we can use a reduce() operation on streams holding other types of elements.

当然,我们可以对持有其他类型元素的流使用reduce()操作。

For instance, we can use reduce() on an array of String elements and join them into a single result:

例如,我们可以在一个String元素的数组上使用reduce(),并将它们连接成一个结果。

List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
String result = letters
  .stream()
  .reduce("", (partialString, element) -> partialString + element);
assertThat(result).isEqualTo("abcde");

Similarly, we can switch to the version that uses a method reference:

同样地,我们可以切换到使用方法引用的版本。

String result = letters.stream().reduce("", String::concat);
assertThat(result).isEqualTo("abcde");

Let’s use the reduce() operation for joining the uppercase elements of the letters array:

让我们使用reduce()操作来连接letters数组的大写元素。

String result = letters
  .stream()
  .reduce(
    "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase());
assertThat(result).isEqualTo("ABCDE");

In addition, we can use reduce() in a parallelized stream (more on this later):

此外,我们可以在一个并行的流中使用reduce()(后面会有更多介绍)。

List<Integer> ages = Arrays.asList(25, 30, 45, 28, 32);
int computedAges = ages.parallelStream().reduce(0, (a, b) -> a + b, Integer::sum);

When a stream executes in parallel, the Java runtime splits the stream into multiple substreams. In such cases, we need to use a function to combine the results of the substreams into a single one. This is the role of the combiner — in the above snippet, it’s the Integer::sum method reference.

当一个流并行执行时,Java运行时将该流分割成多个子流。在这种情况下,我们需要使用一个函数将子流的结果合并成一个。 这就是组合器的作用 – 在上面的片段中,它是Integer::sum方法引用。

Funnily enough, this code won’t compile:

有趣的是,这段代码不会被编译。

List<User> users = Arrays.asList(new User("John", 30), new User("Julie", 35));
int computedAges = 
  users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());

In this case, we have a stream of User objects, and the types of the accumulator arguments are Integer and User. However, the accumulator implementation is a sum of Integers, so the compiler just can’t infer the type of the user parameter.

在这种情况下,我们有一个User对象的流,累加器参数的类型是IntegerUser.。然而,累加器的实现是Integers的和,所以编译器就是无法推断出user参数的类型。

We can fix this issue by using a combiner:

我们可以通过使用一个组合器来解决这个问题。

int result = users.stream()
  .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
assertThat(result).isEqualTo(65);

To put it simply, if we use sequential streams and the types of the accumulator arguments and the types of its implementation match, we don’t need to use a combiner.

简单地说,如果我们使用顺序流,并且累加器参数的类型和其实现的类型相匹配,我们就不需要使用组合器。

4. Reducing in Parallel

4.平行减少

As we learned before, we can use reduce() on parallelized streams.

正如我们之前学到的,我们可以在并行化的流上使用reduce()

When we use parallelized streams, we should make sure that reduce() or any other aggregate operations executed on the streams are:

当我们使用并行化的流时,我们应该确保reduce()或任何其他在流上执行的聚合操作是。

  • associative: the result is not affected by the order of the operands
  • non-interfering: the operation doesn’t affect the data source
  • stateless and deterministic: the operation doesn’t have state and produces the same output for a given input

We should fulfill all these conditions to prevent unpredictable results.

我们应该满足所有这些条件,以防止出现不可预知的结果。

As expected, operations performed on parallelized streams, including reduce(), are executed in parallel, hence taking advantage of multi-core hardware architectures.

正如预期的那样,对并行化流进行的操作,包括reduce(),都是并行执行的,因此利用了多核硬件架构的优势。

For obvious reasons, parallelized streams are much more performant than the sequential counterparts. Even so, they can be overkill if the operations applied to the stream aren’t expensive, or the number of elements in the stream is small.

由于显而易见的原因,并行化的流比顺序化的流性能要高得多。即便如此,如果应用于流的操作并不昂贵,或者流中的元素数量较少,那么它们可能会被过度使用。

Of course, parallelized streams are the right way to go when we need to work with large streams and perform expensive aggregate operations.

当然,当我们需要处理大型流并执行昂贵的聚合操作时,并行化的流是正确的方式。

Let’s create a simple JMH (the Java Microbenchmark Harness) benchmark test and compare the respective execution times when using the reduce() operation on a sequential and a parallelized stream:

让我们创建一个简单的JMH(Java Microbenchmark Harness)基准测试,并比较在顺序流和并行流上使用reduce()操作时各自的执行时间。

@State(Scope.Thread)
private final List<User> userList = createUsers();

@Benchmark
public Integer executeReduceOnParallelizedStream() {
    return this.userList
      .parallelStream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

@Benchmark
public Integer executeReduceOnSequentialStream() {
    return this.userList
      .stream()
      .reduce(
        0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum);
}

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds per operation).

在上述 JMH 基准中,我们比较了执行的平均时间。我们简单地创建了一个List,其中包含大量的User对象。接下来,我们在一个顺序流和一个并行流上调用reduce(),并检查后者的执行速度是否比前者快(以每次操作的秒数计)。

These are our benchmark results:

这些是我们的基准结果。

Benchmark                                                   Mode  Cnt  Score    Error  Units
JMHStreamReduceBenchMark.executeReduceOnParallelizedStream  avgt    5  0,007 ±  0,001   s/op
JMHStreamReduceBenchMark.executeReduceOnSequentialStream    avgt    5  0,010 ±  0,001   s/op

5. Throwing and Handling Exceptions While Reducing

5.抛出和处理异常,同时减少

In the above examples, the reduce() operation doesn’t throw any exceptions. But it might, of course.

在上面的例子中,reduce()操作并没有抛出任何异常。但当然,它可能会。

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

例如,假设我们需要将一个流的所有元素除以一个提供的系数,然后将它们相加。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
int divider = 2;
int result = numbers.stream().reduce(0, a / divider + b / divider);

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

只要divider变量不为零,这就可以工作。但如果它是零,reduce()将抛出一个ArithmeticException异常:除以零。

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

我们可以通过使用try/catch块,轻松地捕获异常并对其做一些有用的处理,例如记录它,从它那里恢复等等,这取决于使用情况。

public static int divideListElements(List<Integer> values, int divider) {
    return values.stream()
      .reduce(0, (a, b) -> {
          try {
              return a / divider + b / divider;
          } catch (ArithmeticException e) {
              LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
          }
          return 0;
      });
}

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

虽然这种方法可以工作,但我们用try/catch块污染了lambda表达式。我们不再有以前那种干净的单线程。

To fix this issue, we can use the extract function refactoring technique and extract the try/catch block into a separate method:

为了解决这个问题,我们可以使用提取函数重构技术,将try/catch块提取到一个单独的方法

private static int divide(int value, int factor) {
    int result = 0;
    try {
        result = value / factor;
    } catch (ArithmeticException e) {
        LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero");
    }
    return result
}

Now the implementation of the divideListElements() method is again clean and streamlined:

现在,divideListElements()方法的实现又变得干净而精简。

public static int divideListElements(List<Integer> values, int divider) {
    return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider));
}

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

假设divideListElements()是一个由抽象的NumberUtils类实现的实用方法,我们可以创建一个单元测试来检查divideListElements()方法的行为。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Let’s also test the divideListElements() method when the supplied List of Integer values contains a 0:

让我们也测试一下当提供的ListInteger值包含0时的divideListElements()方法。

List<Integer> numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);

Finally, let’s test the method implementation when the divider is 0 too:

最后,让我们测试一下除数为0时的方法实现。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

6.复杂的自定义对象

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator and combiner for the data type.

我们也可以使用Stream.reduce()与包含非基本字段的自定义对象。为此,我们需要为数据类型提供相关的identity accumulator combiner

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

假设我们的用户是一个评论网站的一部分。我们的每个用户可以拥有一个评分,该评分是许多评论的平均数。

First, let’s start with our Review object.

首先,让我们从我们的Review对象开始。

Each Review should contain a simple comment and score:

每个评论应包含一个简单的评论和分数。

public class Review {

    private int points;
    private String review;

    // constructor, getters and setters
}

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

接下来,我们需要定义我们的Rating,,它将与points字段一起容纳我们的评论。随着我们添加更多的评论,这个字段将相应地增加或减少。

public class Rating {

    double points;
    List<Review> reviews = new ArrayList<>();

    public void add(Review review) {
        reviews.add(review);
        computeRating();
    }

    private double computeRating() {
        double totalPoints = 
          reviews.stream().map(Review::getPoints).reduce(0, Integer::sum);
        this.points = totalPoints / reviews.size();
        return this.points;
    }

    public static Rating average(Rating r1, Rating r2) {
        Rating combined = new Rating();
        combined.reviews = new ArrayList<>(r1.reviews);
        combined.reviews.addAll(r2.reviews);
        combined.computeRating();
        return combined;
    }

}

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

我们还添加了一个平均数函数,以计算基于两个输入评分的平均值。这对我们的combineraccumulator组件很有帮助。

Next, let’s define a list of Users, each with their own sets of reviews:

接下来,让我们定义一个Users列表,每个人都有自己的评论集。

User john = new User("John", 30);
john.getRating().add(new Review(5, ""));
john.getRating().add(new Review(3, "not bad"));
User julie = new User("Julie", 35);
john.getRating().add(new Review(4, "great!"));
john.getRating().add(new Review(2, "terrible experience"));
john.getRating().add(new Review(4, ""));
List<User> users = Arrays.asList(john, julie);

Now that John and Julie are accounted for, let’s use Stream.reduce() to compute an average rating across both users.

现在John和Julie已经被计算在内,让我们使用Stream.reduce()来计算两个用户的平均评分。

As an identity, let’s return a new Rating if our input list is empty:

作为一个标识,如果我们的输入列表为空,让我们返回一个新的Rating

Rating averageRating = users.stream()
  .reduce(new Rating(), 
    (rating, user) -> Rating.average(rating, user.getRating()), 
    Rating::average);

If we do the math, we should find that the average score is 3.6:

如果我们进行计算,我们应该发现,平均得分是3.6。

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Conclusion

7.结论

In this article, we learned how to use the Stream.reduce() operation.

在这篇文章中,我们学习了如何使用Stream.reduce()操作。

In addition, we learned how to perform reductions on sequential and parallelized streams and how to handle exceptions while reducing.

此外,我们还学习了如何对顺序流和并行流进行还原,以及如何在还原时处理异常情况。

As usual, all the code samples shown in this tutorial are available over on GitHub.

像往常一样,本教程中显示的所有代码样本都可以在GitHub上获得