Handling Exceptions in Project Reactor – 处理项目反应器中的异常情况

最后修改: 2021年 8月 29日


1. Overview


In this tutorial, we’ll look at several ways to handle exceptions in Project Reactor. Operators introduced in the code examples are defined in both the Mono and Flux classes. However, we’ll only focus on methods in the Flux class.

在本教程中,我们将研究在Project Reactor中处理异常的几种方法。代码示例中介绍的操作符在MonoFlux类中都有定义。然而,我们将只关注Flux类中的方法

2. Maven Dependencies


Let’s start by adding the Reactor core dependency:



3. Throwing Exceptions Directly in a Pipeline Operator


The simplest way to handle an Exception is by throwing it. If something abnormal happens during the processing of a stream element, we can throw an Exception with the throw keyword as if it were a normal method execution.


Let’s assume we need to parse a stream of Strings to Integers. If an element isn’t a numeric String, we’ll need to throw an Exception.


It’s a common practice to use the map operator for such a conversion:


Function<String, Integer> mapper = input -> {
    if (input.matches("\\D")) {
        throw new NumberFormatException();
    } else {
        return Integer.parseInt(input);

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

As we can see, the operator throws an Exception if an input element is invalid. When we throw the Exception this way, Reactor catches it and signals an error downstream:



This solution works, but it’s not elegant. As specified in the Reactive Streams specification, rule 2.13, an operator must return normally. Reactor helped us by converting the Exception to an error signal. However, we could do better.

这个解决方案是可行的,但它并不优雅。正如 Reactive Streams 规范中规定的那样,规则 2.13,操作者必须正常返回。Reactor通过将Exception转换为一个错误信号来帮助我们。然而,我们可以做得更好。

Essentially, reactive streams rely on the onError method to indicate a failure condition. In most cases, this condition must be triggered by an invocation of the error method on the Publisher. Using an Exception for this use case brings us back to traditional programming.


4. Handling Exceptions in the handle Operator


Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations.


Let’s update our example from the previous section to use the handle operator:


BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
    if (input.matches("\\D")) {
        sink.error(new NumberFormatException());
    } else {

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

Unlike the map operator, the handle operator receives a functional consumer, called once for each element. This consumer has two parameters: an element coming from upstream and a SynchronousSink that builds an output to be sent downstream.


If the input element is a numeric String, we call the next method on the sink, providing it with the Integer converted from the input. If it isn’t a numeric String, we’ll indicate the situation by calling the error method with an Exception object.


Notice that an invocation of the error method will cancel the subscription to the upstream and invoke the onError method on the downstream. Such collaboration of error and onError is the standard way to handle Exceptions in reactive streams.


Let’s verify the output stream:



5. Handling Exceptions in the flatMap Operator


Another commonly used operator that supports error handling is flatMap. This operator transforms input elements into Publishers, then flattens the Publishers into a new stream. We can take advantage of these Publishers to signify an erroneous state.


Let’s try the same example using flatMap:


Function<String, Publisher<Integer>> mapper = input -> {
    if (input.matches("\\D")) {
        return Mono.error(new NumberFormatException());
    } else {
        return Mono.just(Integer.parseInt(input));

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);


Unsurprisingly, the result is the same as before.


Notice the only difference between handle and flatMap regarding error handling is that the handle operator calls the error method on a sink, while flatMap calls it on a Publisher.


If we’re dealing with a stream represented by a Flux object, we can also use concatMap to handle errors. This method behaves in much the same way as flatMap, but it doesn’t support asynchronous processing.


6. Avoiding NullPointerException


This section covers the handling of null references, which often cause NullPointerExceptions, a commonly encountered Exception in Java. To avoid this exception, we usually compare a variable with null and direct the execution to a different way if that variable is actually null. It’s tempting to do the same in reactive streams:


Function<String, Integer> mapper = input -> {
    if (input == null) {
        return 0;
    } else {
        return Integer.parseInt(input);

We may think that a NullPointerException won’t occur because we already handled the case when the input value is null. However, the reality tells a different story:


Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);


Apparently, a NullPointerException triggered an error downstream, meaning our null check didn’t work.


To understand why that happened, we need to go back to the Reactive Streams specification. Rule 2.13 of the specification says that “calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller”.

为了理解为什么会发生这种情况,我们需要回到反应式流的规范中。规则 2.该规范的第13条规定:”调用onSubscribeonNextonErroronComplete必须正常返回,除非提供的任何参数为null,在这种情况下,它必须向调用者抛出java.lang.NullPointerException“。

As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function.


Therefore, there’s nothing we can do about a null value when it reaches a certain stream. We can’t handle it or convert it to a non-null value before passing it downstream. Therefore, the only way to avoid a NullPointerException is to make sure that null values won’t make it to the pipeline.


7. Conclusion


In this article, we’ve walked through Exception handling in Project Reactor. We discussed a couple of examples and clarified the process. We also covered a special case of exception that can happen when processing a reactive stream — NullPointerException.

在这篇文章中,我们已经走过了Project Reactor中的Exception处理。我们讨论了几个例子,并澄清了这个过程。我们还介绍了在处理反应式流时可能发生的一种特殊的异常情况–NullPointerException

As usual, the source code for our application is available over on GitHub.
