Aggregate Runtime Exceptions in Java Streams – Java 流中的聚合运行时异常

最后修改: 2023年 9月 4日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll learn about aggregating exceptions in a stream pipeline.

在本教程中,我们将学习在流管道中聚合异常。

The Stream API in itself does not provide any declarative way to process exceptions. It has a single channel in the pipeline that processes the data, and there is no separate channel for processing exceptions. This means that it does not provide a way to invoke a function when it encounters an exception. Hence we have to fall back to catching exceptions with a try-catch block.

流 API 本身并没有提供任何声明性的异常处理方法。它在流水线中只有一个处理数据的通道,而没有处理异常的单独通道。这意味着它没有提供在遇到异常时调用函数的方法。因此,我们不得不使用 try-catch 块来捕获异常。

As a result, aggregating exceptions in a stream pipeline and handling them can be challenging.

因此,在流水线中汇聚异常并处理它们是一项挑战。

2. Aggregating Exceptions With a Try Catch Block Within the Stream Pipeline

2.在流水线中使用 Try Catch 块聚合异常

There are often cases where a method just has to be called for its effect, for example, a simple database update that might throw an exception due to connection failure. With this in mind, let’s consider a simple example of calling processThrowsExAndNoOutput() in the pipeline:

通常在某些情况下,只需调用方法即可实现其效果,例如,简单的数据库更新可能会因连接失败而抛出异常。考虑到这一点,让我们来看一个在管道中调用 processThrowsExAndNoOutput() 的简单示例:

@Test
public void givenTryCatchInPipeline_whenFoundEx_thenSuppressExIntoRuntimeEx() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    RuntimeException runEx = Arrays.stream(strings)
      .map(str -> {
          try {
              processThrowsExAndNoOutput(str);
              return null;
          } catch (RuntimeException e) {
              return e;
          }
      })
      .filter(Objects::nonNull)
      .collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
          RuntimeException runtimeException = new RuntimeException("Errors Occurred");
          list.forEach(runtimeException::addSuppressed);
          return runtimeException;
      }));
    processExceptions(runEx);
    assertEquals("Errors Occurred", runEx.getMessage());
    assertEquals(3, runEx.getSuppressed().length);
}

In the above program, we treat caught exceptions as data in the stream. The map() method returns either null or the exception. With filter(), only exceptions are allowed downstream. Finally, we reduce it into a RuntimeException using addSuppressed(). We can then call processExceptions() to handle the aggregate exception.

在上述程序中,我们将捕获的异常视为流中的数据map() 方法要么返回空值,要么返回异常。有了 filter() 方法,下游只允许异常。最后,我们使用 addSuppressed() 将其还原为 RuntimeException 异常。然后,我们可以调用 processExceptions() 来处理聚合异常。

That works! But could it be more declarative? Let’s work towards it in the upcoming sections.

这也行得通!但是,它还能更具有声明性吗?让我们在接下来的章节中努力实现它。

3. Aggregating Exceptions by Extracting the Try Catch Block Into a Method

3.通过在方法中提取 Try Catch 块来聚合异常

Let’s make the implementation a little more readable and concise. To do that, we move the try-catch block into a method:

让我们使实现过程更易读、更简洁。为此,我们将 try-catch 块移到一个方法中:

static Throwable callProcessThrowsExAndNoOutput(String input) {
    try {
        processThrowsExAndNoOutput(input);
        return null;
    } catch (RuntimeException e) {
        return e;
    }
}

Now, we can call the above method from inside the pipeline:

现在,我们可以在管道内部调用上述方法:

@Test
public void givenExtractedMethod_whenFoundEx_thenSuppressExIntoRuntimeEx() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    RuntimeException runEx = Arrays.stream(strings)
      .map(str -> callProcessThrowsExAndNoOutput(str))
      .filter(Objects::nonNull)
      .reduce(new RuntimeException("Errors Occurred"), (o1, o2) -> {
          o1.addSuppressed(o2);
          return o1;
      });
    // handle the aggregate exception as before
}

The above approach looks cleaner. However, there is still room for improvement and more use cases to discuss.

上述方法看起来更简洁。不过,仍有改进的余地,也有更多的使用案例需要讨论。

4. Aggregating Exceptions and Output in the Stream Pipeline Using Reflection

4.使用反射聚合流水线中的异常和输出

Most programs have to handle both exceptions and expected output. Let’s take an example of a method which can return either an exception or some output:

大多数程序必须同时处理异常和预期输出。让我们以一个可以返回异常或输出的方法为例:

static Object processReturnsExAndOutput(String input) {
    try {
        return Integer.parseInt(input);
    } catch (Exception e) {
        return new RuntimeException("Exception in processReturnsExAndOutput for " + input, e);
    }
}

Now, let’s look at the pipeline processing:

现在,让我们来看看管道处理过程:

@Test
public void givenProcessMethod_whenStreamResultHasExAndOutput_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c");
    Map map = strings.stream()
      .map(s -> processReturnsExAndOutput(s))
      .collect(Collectors.partitioningBy(o -> o instanceof RuntimeException, Collectors.toList()));
    
    List<RuntimeException> exceptions = (List<RuntimeException>) map.getOrDefault(Boolean.TRUE, List.of());
    List<Integer> results = (List<Integer>) map.getOrDefault(Boolean.FALSE, List.of());
    handleExceptionsAndOutputs(exceptions, results);
}

The above stream pipeline uses partitioningBy() in the terminal collect(). It makes use of reflection to partition the results into a list of exceptions and integers. Further down, the program calls handleExceptionsAndOutputs() to take care of exceptions and the output for further processing.

上述流流水线在终端中使用partitioningBy() collect()它利用反射将结果划分为异常和整数列表。接下来,程序会调用 handleExceptionsAndOutputs() 来处理异常和输出,以便进一步处理。

This time, we didn’t reduce the exceptions into an aggregate RuntimeException. Instead, we passed on the list of exceptions for further processing. This is another way of aggregating the exceptions.

这一次,我们没有将异常减少为一个集合 RuntimeException。相反,我们传递了异常列表,以便进一步处理。这是另一种聚合异常的方法。

As we can see, it is definitely not the cleanest of the approaches, with raw types and casting required. Hence, the upcoming sections will explore more generalized solutions to address the issue at hand.

我们可以看到,这绝对不是最简洁的方法,因为需要原始类型和铸造。因此,接下来的章节将探讨更通用的解决方案来解决手头的问题。

4. Aggregating Exceptions and Output Using a Custom Mapper

4.使用自定义映射器聚合异常和输出

Going forward, we are going to focus more on functional programming.

今后,我们将更多地关注函数式编程

We’ll create a custom mapper function that wraps another map() stream function. It returns a Result object which encapsulates both the result and the exception.

我们将创建一个封装另一个 map() 流函数的自定义映射器函数。它将返回一个封装了结果和异常的 Result 对象。

First, let’s look at the Result class:

首先,让我们来看看 Result 类:

public class Result<R, E extends Throwable> {
    private Optional<R> result;
    private Optional<E> exception;

    public Result(R result) {
        this.result = Optional.of(result);
        this.exception = Optional.empty();
    }

    public Result(E exception) {
        this.exception = Optional.of(exception);
        this.result = Optional.empty();
    }

    public Optional<R> getResult() {
        return result;
    }

    public Optional<E> getException() {
        return exception;
    }
}

The Result class uses Generics and Optional. Since result and exception can hold either a null or a non-null value, we have used Optional. Its usage will become clearer as we further move on.

Result 类使用 GenericsOptional 。由于 resultexception 可以包含空值或非空值,因此我们使用了 Optional。其用法将随着我们的进一步讨论而变得更加清晰。

We discussed the custom mapper at the beginning of this section, Let’s now look at its implementation:

我们在本节开头讨论了自定义映射器,现在来看看它的实现:

public class CustomMapper {
    public static <T, R> Function<T, Result<R, Throwable>> mapper(Function<T, R> func) {
        return arg -> {
            try {
                return new Result(func.apply(arg));
            } catch (Exception e) {
                return new Result(e);
            }
        };
    }
}

Now it is time to see the mapper() in action:

现在我们来看看 mapper() 的运行情况:

@Test
public void givenCustomMapper_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c");
    strings.stream()
      .map(CustomMapper.mapper(Integer::parseInt))
      .collect(Collectors.collectingAndThen(Collectors.toList(),
        list -> handleErrorsAndOutputForResult(list)));
}

This time we used Collectors.CollectingAndThen() to invoke handleErrorsAndOutputForResult() at the end of the pipeline with the list of Result<Integer, Throwable> objects. Let’s take a look at handleErrorsAndOutputForResult():

这一次,我们使用 Collectors.CollectingAndThen() 在流水线末尾调用 handleErrorsAndOutputForResult() Result<Integer, Throwable> 对象列表。让我们来看看 handleErrorsAndOutputForResult() 的情况:</em

static String handleErrorsAndOutputForResult(List<Result<Integer, Throwable>> successAndErrors) {
    logger.info("handle errors and output");
    successAndErrors.forEach(result -> {
        if (result.getException().isPresent()) {
            logger.error("Process Exception " + result.getException().get());
        } else {
            logger.info("Process Result" + result.getResult().get());
        }
    });
    return "Errors and Output Handled";
}

As shown above, we simply iterate over the Result list and fork into a success or failure flow with the help of the method Optional.isPresent(). This can be a useful approach when the success and error cases have to be dealt with distinctly, e.g. sending notifications to separate users.

如上图所示,我们只需遍历 Result 列表,并在方法 Optional.isPresent() 的帮助下叉入成功或失败流程。当需要分别处理成功和错误情形时,例如向不同用户发送通知时,这种方法非常有用。

When the function to be used inside Stream.map() cannot be modified, for example, because it is from an external library, we can use our custom mapper() function to wrap it and handle the outcome in a more generalized manner.

Stream.map()内部使用的函数无法修改时(例如,因为它来自外部库),我们可以使用自定义mapper()函数对其进行封装,并以更通用的方式处理结果。

4. Aggregate Exceptions and Output Using a Custom Collector

4.使用自定义收集器汇总异常和输出

Aggregating the exceptions and output of a pipeline is a kind of collection activity. Hence it makes sense to implement a collector, which is designed for this purpose.

聚合管道的异常和输出是一种收集活动。因此,实现为此目的而设计的收集器是有意义的。

Let’s see how to do that:

让我们看看如何做到这一点:

public class CustomCollector<T, R> {
    private final List<R> results = new ArrayList<>();
    private final List<Throwable> exceptions = new ArrayList<>();

    public static <T, R> Collector<T, ?, CustomCollector<T, R>> of(Function<T, R> mapper) {
        return Collector.of(
          CustomCollector::new,
          (collector, item) -> {
              try {
                  R result = mapper.apply(item);
                  collector.results.add(result);
              } catch (Exception e) {
                  collector.exceptions.add(e);
              }
          },
          (left, right) -> {
              left.results.addAll(right.results);
              left.exceptions.addAll(right.exceptions);
              return left;
          }
        );
    }
    // standard getters...
}

Finally, let’s take a look at how the collector exactly works:

最后,让我们来看看收集器究竟是如何工作的:

@Test
public void givenCustomCollector_whenStreamResultHasExAndSuccess_thenHandleAggrExceptionAndResults() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    Arrays.stream(strings)
      .collect(Collectors.collectingAndThen(CustomCollector.of(Integer::parseInt),
        col -> handleExAndResults(col.getExceptions(), col.getResults())));
}

5. Aggregating Exceptions and Output Using Try and Either From Vavr Library

5.使用 Vavr 库中的 Try 和 Either 聚合异常和输出

Try is a container that holds either an uncaught exception or the actual output in case of success. Just like the custom mapper discussed earlier, Try can also wrap functions.

Try 是一个容器,用于容纳未捕获的异常或成功时的实际输出。 就像前面讨论的自定义映射器一样,Try 也可以封装函数。

Whereas, Either is a more general container that holds either an error type or the expected output type.

Either 是一个更通用的容器,可容纳错误类型或预期输出类型

Let’s see how we can exploit these features together:

让我们一起看看如何利用这些功能:

@Test
public void givenVavrEitherAndTry_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c");
    strings.stream()
      .map(str -> Try.of(() -> Integer.parseInt(str)).toEither())
      .collect(Collectors.collectingAndThen(Collectors.partitioningBy(Either::isLeft, Collectors.toList()),
        map -> handleErrorsAndOutputForEither(map)));
}

As we can see, the program converts the Try object into an Either and then collects it into a map to invoke handleErrorsAndOutputForEither():

我们可以看到,程序将 Try 对象转换为 Either 对象,然后将其收集到一个映射中,以调用 handleErrorsAndOutputForEither() 函数:</em

static void handleErrorsAndOutputForEither(Map<Boolean, List<Either<Throwable, Integer>>> map) {
    logger.info("handle errors and output");
    map.getOrDefault(Boolean.TRUE, List.of())
      .forEach(either -> logger.error("Process Exception " + either.getLeft()));
    map.getOrDefault(Boolean.FALSE, List.of())
      .forEach(either -> logger.info("Process Result " + either.get()));
}

Further, as shown above, the exceptions and output can be processed by swiping left or right on the Either object. As we can see, the Try and Either approach provides us with the most concise solution that we have seen today.

此外,如上图所示,可以通过左右滑动 Either 对象来处理异常和输出。正如我们所见,TryEither 方法为我们提供了目前所见的最简洁的解决方案。

6. Conclusion

6.结论

In this tutorial, we explored a few ways to aggregate runtime exceptions while processing a stream. While many approaches are possible, it is important to maintain the essence of stream processing, including conciseness, immutability, and declarative syntax.

在本教程中,我们探讨了在处理流时聚合运行时异常的几种方法。虽然可以采用多种方法,但重要的是要保持流处理的本质,包括简洁性、不变性和声明式语法。

As usual, the code is available over on GitHub.

与往常一样,代码可在 GitHub 上获取