Handling Exceptions in Project Reactor – 处理项目反应器中的异常情况

最后修改: 2021年 8月 29日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll look at several ways to handle exceptions in Project Reactor. Operators introduced in the code examples are defined in both the Mono and Flux classes. However, we’ll only focus on methods in the Flux class.

在本教程中,我们将研究在Project Reactor中处理异常的几种方法。代码示例中介绍的操作符在MonoFlux类中都有定义。然而,我们将只关注Flux类中的方法

2. Maven Dependencies

2.Maven的依赖性

Let’s start by adding the Reactor core dependency:

让我们从添加Reactor核心依赖性开始。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId
    <version>3.4.9</version>
</dependency>

3. Throwing Exceptions Directly in a Pipeline Operator

3.在管道运营商中直接抛出异常情况

The simplest way to handle an Exception is by throwing it. If something abnormal happens during the processing of a stream element, we can throw an Exception with the throw keyword as if it were a normal method execution.

处理Exception的最简单方法是抛出它。如果在处理一个流元素的过程中发生了异常,我们可以用throw关键字抛出一个Exception,就像正常的方法执行一样

Let’s assume we need to parse a stream of Strings to Integers. If an element isn’t a numeric String, we’ll need to throw an Exception.

让我们假设我们需要将一个Strings流解析为Integers。如果一个元素不是一个数字String,我们就需要抛出一个Exception

It’s a common practice to use the map operator for such a conversion:

使用map操作符进行这样的转换是一种常见的做法。

Function<String, Integer> mapper = input -> {
    if (input.matches("\\D")) {
        throw new NumberFormatException();
    } else {
        return Integer.parseInt(input);
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

As we can see, the operator throws an Exception if an input element is invalid. When we throw the Exception this way, Reactor catches it and signals an error downstream:

我们可以看到,如果一个输入元素无效,操作者会抛出一个Exception。当我们以这种方式抛出Exception时,Reactor会抓住它并向下游发出错误信号

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

This solution works, but it’s not elegant. As specified in the Reactive Streams specification, rule 2.13, an operator must return normally. Reactor helped us by converting the Exception to an error signal. However, we could do better.

这个解决方案是可行的,但它并不优雅。正如 Reactive Streams 规范中规定的那样,规则 2.13,操作者必须正常返回。Reactor通过将Exception转换为一个错误信号来帮助我们。然而,我们可以做得更好。

Essentially, reactive streams rely on the onError method to indicate a failure condition. In most cases, this condition must be triggered by an invocation of the error method on the Publisher. Using an Exception for this use case brings us back to traditional programming.

从本质上讲,反应式流依赖于onError方法来指示失败条件。在大多数情况下,这个条件必须由Publisher上的error方法的调用触发。在这个用例中使用Exception使我们回到了传统的编程。

4. Handling Exceptions in the handle Operator

4.处理handle操作符中的异常情况

Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations.

map操作符类似,我们可以使用handle操作符来逐一处理一个流中的项目。不同的是,Reactor为handle操作符提供了一个输出汇,允许我们应用更复杂的转换。

Let’s update our example from the previous section to use the handle operator:

让我们更新上一节的例子,使用handle操作符。

BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
    if (input.matches("\\D")) {
        sink.error(new NumberFormatException());
    } else {
        sink.next(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

Unlike the map operator, the handle operator receives a functional consumer, called once for each element. This consumer has two parameters: an element coming from upstream and a SynchronousSink that builds an output to be sent downstream.

map操作符不同,handle操作符接收一个功能消费者,为每个元素调用一次。这个消费者有两个参数:一个来自上游的元素和一个SynchronousSink,它建立了一个要发送到下游的输出。

If the input element is a numeric String, we call the next method on the sink, providing it with the Integer converted from the input. If it isn’t a numeric String, we’ll indicate the situation by calling the error method with an Exception object.

如果输入元素是一个数字String,我们会调用汇流排上的next方法,为它提供从输入转换而来的Integer。如果它不是一个数字String,我们将通过调用error方法和一个Exception对象来表明这种情况。

Notice that an invocation of the error method will cancel the subscription to the upstream and invoke the onError method on the downstream. Such collaboration of error and onError is the standard way to handle Exceptions in reactive streams.

注意,调用error方法将取消对上游的订阅,并在下游调用onError方法。这种erroronError的协作是处理反应式流中Exception的标准方式。

Let’s verify the output stream:

让我们验证一下输出流。

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

5. Handling Exceptions in the flatMap Operator

5.处理flatMap操作符中的异常情况

Another commonly used operator that supports error handling is flatMap. This operator transforms input elements into Publishers, then flattens the Publishers into a new stream. We can take advantage of these Publishers to signify an erroneous state.

另一个支持错误处理的常用操作符是flatMap。这个操作符将输入元素转换为Publishers,然后将Publishers平铺到一个新的流中。我们可以利用这些Publishers来表示一个错误的状态。

Let’s try the same example using flatMap:

让我们用flatMap来试试同样的例子。

Function<String, Publisher<Integer>> mapper = input -> {
    if (input.matches("\\D")) {
        return Mono.error(new NumberFormatException());
    } else {
        return Mono.just(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

Unsurprisingly, the result is the same as before.

不出所料,结果和以前一样。

Notice the only difference between handle and flatMap regarding error handling is that the handle operator calls the error method on a sink, while flatMap calls it on a Publisher.

注意handleflatMap之间关于错误处理的唯一区别是,handle操作符在sink上调用error方法,而flatMapPublisher上调用。

If we’re dealing with a stream represented by a Flux object, we can also use concatMap to handle errors. This method behaves in much the same way as flatMap, but it doesn’t support asynchronous processing.

如果我们处理的是由Flux对象代表的流,我们也可以使用concatMap来处理错误。这个方法的行为方式与flatMap基本相同,但它不支持异步处理。

6. Avoiding NullPointerException

6.避免NullPointerException

This section covers the handling of null references, which often cause NullPointerExceptions, a commonly encountered Exception in Java. To avoid this exception, we usually compare a variable with null and direct the execution to a different way if that variable is actually null. It’s tempting to do the same in reactive streams:

本节介绍了对null引用的处理,它经常导致NullPointerExceptions,这是Java中经常遇到的Exception。为了避免这种异常,我们通常会将一个变量与null进行比较,如果该变量实际上是null,则将执行引向另一个途径。在反应式流中做同样的事情是很诱人的。

Function<String, Integer> mapper = input -> {
    if (input == null) {
        return 0;
    } else {
        return Integer.parseInt(input);
    }
};

We may think that a NullPointerException won’t occur because we already handled the case when the input value is null. However, the reality tells a different story:

我们可能认为NullPointerException不会发生,因为我们已经处理了输入值为null时的情况。然而,现实告诉我们一个不同的故事。

Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NullPointerException.class)
    .verify();

Apparently, a NullPointerException triggered an error downstream, meaning our null check didn’t work.

显然,一个NullPointerException引发了下游的错误,意味着我们的null检查没有成功

To understand why that happened, we need to go back to the Reactive Streams specification. Rule 2.13 of the specification says that “calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller”.

为了理解为什么会发生这种情况,我们需要回到反应式流的规范中。规则 2.该规范的第13条规定:”调用onSubscribeonNextonErroronComplete必须正常返回,除非提供的任何参数为null,在这种情况下,它必须向调用者抛出java.lang.NullPointerException“。

As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function.

按照规范的要求,当null值到达map函数时,Reactor抛出一个NullPointerException

Therefore, there’s nothing we can do about a null value when it reaches a certain stream. We can’t handle it or convert it to a non-null value before passing it downstream. Therefore, the only way to avoid a NullPointerException is to make sure that null values won’t make it to the pipeline.

因此,当一个null值到达某个流时,我们对它无能为力。我们无法处理它,也无法在向下游传递之前将其转换为非null值。因此,避免NullPointerException的唯一方法是确保null值不会进入流水线

7. Conclusion

7.结语

In this article, we’ve walked through Exception handling in Project Reactor. We discussed a couple of examples and clarified the process. We also covered a special case of exception that can happen when processing a reactive stream — NullPointerException.

在这篇文章中,我们已经走过了Project Reactor中的Exception处理。我们讨论了几个例子,并澄清了这个过程。我们还介绍了在处理反应式流时可能发生的一种特殊的异常情况–NullPointerException

As usual, the source code for our application is available over on GitHub.

像往常一样,我们的应用程序的源代码可在GitHub上获得