Debugging Reactive Streams in Java – 在Java中调试反应式流

最后修改: 2018年 12月 8日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Debugging reactive streams is probably one of the main challenges we’ll have to face once we start using these data structures.

一旦我们开始使用这些数据结构,调试反应流可能是我们不得不面对的主要挑战之一。

And having in mind that Reactive Streams have been gaining popularity over the last years, it’s a good idea to know how we can carry out this task efficiently.

考虑到反应式流在过去几年中越来越受欢迎,我们不妨了解一下如何才能有效地执行这项任务。

Let’s start by setting up a project using a reactive stack to see why this is often troublesome.

让我们从设置一个使用反应式堆栈的项目开始,看看为什么这往往是麻烦的。

2. Scenario with Bugs

2.有缺陷的场景

We want to simulate a real-case scenario, where several asynchronous processes are running, and where we’ve introduced some defects in the code that will eventually trigger exceptions.

我们想模拟一个真实的场景,其中有几个异步进程正在运行,而且我们在代码中引入了一些缺陷,最终会触发异常。

To understand the big picture, we’ll mention that our application will be consuming and processing streams of simple Foo objects which contain only an id, a formattedName, and a quantity field.

为了理解大局,我们会提到我们的应用程序将消费和处理简单的Foo对象流,这些对象只包含一个id、一个formattedName和一个quantity字段。

2.1. Analyzing the Log Output

2.1.分析日志输出

Now, let’s examine a snippet and the output it generates when an unhandled error shows up:

现在,让我们来检查一个片段,以及当出现未处理的错误时它所产生的输出。

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .map(FooReporter::reportResult)
      .subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux.map(FooNameHelper::substringFooName)
      .map(FooQuantityHelper::divideFooQuantity)
      .subscribe();
}

After running our application for a few seconds, we’ll realize that it’s logging exceptions from time to time.

在运行我们的应用程序几秒钟后,我们会发现它在不时地记录异常。

Having a close look at one of the errors, we’ll find something similar to this:

仔细看看其中一个错误,我们会发现与此类似的东西。

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

Based on the root cause, and noticing the FooNameHelper class mentioned in the stack trace, we can imagine that on some occasions, our Foo objects are being processed with a formattedName value that is shorter than expected.

根据根本原因,并注意到堆栈跟踪中提到的FooNameHelper类,我们可以想象,在某些情况下,我们的Foo对象被处理,其formattedName值比预期的短。

Of course, this is just a simplified case, and the solution seems rather obvious.

当然,这只是一个简化的案例,而解决方案似乎相当明显。

But let’s imagine this was a real-case scenario where the exception itself doesn’t help us solve the issue without some context information.

但让我们想象一下这是一个真实的案例,如果没有一些上下文信息,异常本身并不能帮助我们解决问题。

Was the exception triggered as a part of the processFoo, or of the processFooInAnotherScenario method?

异常是作为processFoo,的一部分触发的,还是processFooInAnotherScenario方法的一部分?

Did other previous steps affect the formattedName field before arriving at this stage?

在到达这个阶段之前,之前的其他步骤是否影响了formattedName字段?

The log entry wouldn’t help us figure out these questions.

日志条目不会帮助我们弄清这些问题。

To make things worse, sometimes the exception isn’t even thrown from within our functionality.

更糟的是,有时异常甚至不是从我们的功能中抛出的。

For example, imagine we rely on a reactive repository to persist our Foo objects. If an error rises at that point, we might not even have a clue on where to get started to debug our code.

例如,设想我们依靠一个反应式存储库来持久化我们的Foo对象。如果这时出现了错误,我们可能连从哪里开始调试我们的代码都不知道。

We need tools to debug reactive streams efficiently.

我们需要工具来有效地调试反应式流。

3. Using a Debug Session

3.使用调试会话

One option to figure out what’s going on with our application is to start a debugging session using our favorite IDE.

弄清我们的应用程序发生了什么的一个选择是使用我们最喜欢的IDE启动一个调试会话。

We’ll have to set up a couple of conditional breakpoints and analyze the flow of data when each step in the stream gets executed.

我们必须设置几个条件性断点,分析数据流中每一步被执行时的数据流。

Indeed, this might be a cumbersome task, especially when we’ve got a lot of reactive processes running and sharing resources.

的确,这可能是一项繁琐的任务,特别是当我们有很多反应式进程在运行并共享资源时。

Additionally, there are many circumstances where we can’t start a debugging session for security reasons.

此外,在很多情况下,由于安全原因,我们不能启动调试会话。

4. Logging Information With the doOnErrorMethod or Using the Subscribe Parameter

4.用doOnErrorMethod或使用订阅参数来记录信息

Sometimes, we can add useful context information, by providing a Consumer as a second parameter of the subscribe method:

有时,我们可以通过提供一个Consumer作为subscribe方法的第二个参数来添加有用的上下文信息

public void processFoo(Flux<Foo> flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

Note: It’s worth mentioning that if we don’t need to carry out further processing on the subscribe method, we can chain the doOnError function on our publisher:

注意:值得一提的是,如果我们不需要对subscribe方法进行进一步的处理,我们可以在我们的发布者上链doOnError函数:

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

Now we’ll have some guidance on where the error might be coming from, even though we still don’t have much information about the actual element that generated the exception.

现在我们将有一些关于错误可能来自何处的指导,尽管我们仍然没有关于产生异常的实际元素的很多信息。

5. Activating Reactor’s Global Debug Configuration

5.激活Reactor的全局调试配置

The Reactor library provides a Hooks class that lets us configure the behavior of Flux and Mono operators.

Reactor库提供了一个Hooks类,使我们能够配置FluxMono操作符的行为。

By just adding the following statement, our application will instrument the calls to the publishers’ methods, wrap the construction of the operator, and capture a stack trace:

通过添加以下语句,我们的应用程序将记录对发布者方法的调用,包住操作者的构造,并捕获堆栈跟踪

Hooks.onOperatorDebug();

After the debug mode gets activated, our exception logs will include some helpful information:

调试模式被激活后,我们的异常日志将包括一些有用的信息。

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

As we can see, the first section remains relatively the same, but the following sections provide information about:

正如我们所看到的,第一部分相对保持不变,但以下各部分提供了有关信息。

  1. The assembly trace of the publisher — here we can confirm that the error was first generated in the processFoo method.
  2. The operators that observed the error after it was first triggered, with the user class where they were chained.

Note: In this example, mainly to see this clearly, we’re adding the operations on different classes.

注意:在这个例子中,主要是为了清楚地看到这一点,我们在不同的类上添加操作。

We can toggle the debug mode on or off at any time, but it won’t affect Flux and Mono objects that have already been instantiated.

我们可以随时切换调试模式,但它不会影响已经被实例化的FluxMono对象。

5.1. Executing Operators on Different Threads

5.1.在不同的线程上执行操作者

One other aspect to keep in mind is that the assembly trace is generated properly even if there are different threads operating on the stream.

还有一个需要注意的方面是,即使有不同的线程在流上操作,也会正确地生成汇编跟踪。

Let’s have a look at the following example:

让我们看一下下面的例子。

public void processFoo(Flux<Foo> flux) {
    flux.publishOn(Schedulers.newSingle("foo-thread"))
       // ...
      .publishOn(Schedulers.newSingle("bar-thread"))
      .map(FooReporter::reportResult)
      .subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

Now if we check the logs we’ll appreciate that in this case, the first section might change a little bit, but the last two remain fairly the same.

现在,如果我们检查一下日志,就会发现在这种情况下,第一节可能会有一点变化,但最后两节却保持相当的不变。

The first part is the thread stack trace, therefore it’ll show only the operations carried out by a particular thread.

第一部分是线程堆栈跟踪,因此它将只显示某个特定线程所进行的操作。

As we’ve seen, that’s not the most important section when we’re debugging the application, so this change is acceptable.

正如我们所看到的,当我们调试应用程序时,这并不是最重要的部分,所以这种改变是可以接受的。

6. Activating the Debug Output on a Single Process

6.激活单个进程的调试输出

Instrumenting and generating a stack trace in every single reactive process is costly.

在每一个反应式过程中进行仪器检测并生成堆栈跟踪,成本很高。

Thus, we should implement the former approach only in critical cases.

因此,我们应该只在关键情况下实施前一种方法

Anyhow, Reactor provides a way to enable the debug mode on single crucial processes, which is less memory-consuming.

无论如何,Reactor提供了一种在单个关键进程上启用调试模式的方法,这对内存的消耗较少

We’re referring to the checkpoint operator:

我们指的是检查点操作员。

public void processFoo(Flux<Foo> flux) {
    
    // ...

    flux.checkpoint("Observed error on processFoo", true)
      .subscribe();
}

Note that in this manner, the assembly trace will be logged at the checkpoint stage:

注意,以这种方式,装配跟踪将在检查点阶段被记录下来。

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
	...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

We should implement the checkpoint method towards the end of the reactive chain.

我们应该在反应链的末端实现检查点方法。

Otherwise, the operator won’t be able to observe errors occurring downstream.

否则,操作员将无法观察到下游发生的错误。

Also, let’s note that the library offers an overloaded method. We can avoid:

另外,让我们注意到,该库提供了一个重载方法。我们可以避免。

  • specifying a description for the observed error if we use the no-args option
  • generating a filled stack trace (which is the most costly operation), by providing just the custom description

7. Logging a Sequence of Elements

7.记录一个元素的序列

Finally, Reactor publishers offer one more method that could potentially come in handy in some cases.

最后,Reactor出版商还提供了一种方法,在某些情况下有可能派上用场。

By calling the log method in our reactive chain, the application will log each element in the flow with the state that it has at that stage.

通过在我们的反应式链中调用 log方法,应用程序将以该阶段的状态记录流程中的每个元素

Let’s try it out in our example:

让我们在我们的例子中尝试一下。

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .log();
      .map(FooReporter::reportResult)
      .doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
      })
      .subscribe();
}

And check the logs:

并检查日志。

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.OnAssembly.1 - request(unbounded)
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

We can easily see the state of each Foo object at this stage, and how the framework cancels the flow when an exception happens.

我们可以很容易地看到每个Foo对象在这个阶段的状态,以及当异常发生时框架是如何取消流程的。

Of course, this approach is also costly, and we’ll have to use it in moderation.

当然,这种方法也是有代价的,我们要适度地使用。

8. Conclusion

8.结语

We can consume a lot of our time and effort troubleshooting problems if we don’t know the tools and mechanisms to debug our application properly.

如果我们不知道正确调试我们的应用程序的工具和机制,我们就会消耗大量的时间和精力来排除问题。

This is especially true if we’re not used to handling reactive and asynchronous data structures, and we need extra help to figure out how things work.

如果我们不习惯处理反应式和异步数据结构,而需要额外的帮助来弄清事情的运作方式,那就更是如此。

As always, the full example is available over on the GitHub repo.

一如既往,完整的示例可在GitHub repo上获得。