Logging a Reactive Sequence – 记录一个反应性的序列

最后修改: 2018年 11月 29日


1. Overview


With the introduction of Spring WebFlux, we got another powerful tool to write reactive, non-blocking applications. While using this technology is now way easier than before, debugging reactive sequences in Spring WebFlux can be quite cumbersome.

随着Spring WebFlux的推出,我们得到了另一个强大的工具来编写反应式、非阻塞式应用程序。虽然现在使用这项技术比以前容易得多,但在Spring WebFlux中调试反应式序列可能相当麻烦

In this quick tutorial, we’ll see how to easily log events in asynchronous sequences and how to avoid some simple mistakes.


2. Maven Dependency


Let’s add the Spring WebFlux dependency to our project so we can create reactive streams:

让我们把Spring WebFlux的依赖性添加到我们的项目中,这样我们就可以创建反应式流。


We can get the latest spring-boot-starter-webflux dependency from Maven Central.


3. Creating a Reactive Stream


To begin let’s create a reactive stream using Flux and use the log() method to enable logging:

首先,让我们使用Flux创建一个反应式流,并使用log() 方法来启用记录。

Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Next, we will subscribe to it to consume generated values:



4. Logging Reactive Stream


After running the above application we see our logger in action:


2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

We see every event that occurred on our stream. Five values were emitted and then stream closed with an onComplete() event.


5. Advanced Logging Scenario


We can modify our application to see a more interesting scenario. Let’s add take() to Flux which will instruct the stream to provide only a specific number of events:


Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

After executing the code, we’ll see the following output:


2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

As we can see, take() caused the stream to cancel after emitting three events.


The placement of log() in your stream is crucial. Let’s see how placing log() after take() will produce different output:


Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

And the output:


2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

As we can see changing the point of observation changed the output. Now the stream produced three events, but instead of cancel(), we see onComplete(). This is because we observe the output of using take() instead of what was requested by this method.


6. Conclusion


In this quick article, we saw how to log reactive streams using built-in log() method.


And as always, the source code for the above example can be found over on GitHub.
