How to Access the First Element of a Flux – 如何访问通量的第一个元素

最后修改: 2022年 10月 27日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore various ways of accessing the first element of a Flux with Spring 5 WebFlux.

在本教程中,我们将探讨使用Spring 5 WebFlux访问Flux的第一个元素的各种方法。

Firstly, we’ll use non-blocking methods of the API, such as next() and take(). After that, we’ll see how to achieve the same thing with the help of elementAt() method, where we need to specify the index.

首先,我们将使用API的非阻塞方法,如next()take()。之后,我们将看到如何在elementAt()方法的帮助下实现同样的事情,我们需要指定索引。

Finally, we’ll learn about the blocking methods of the API, and we’ll use blockFirst() to access the first element of the flux.

最后,我们将学习API的阻塞方法,我们将使用blockFirst()来访问flux的第一个元素。

2. Test Setup

2.测试设置

For the code examples in this article, we’ll use the Payment class, which only has one field, the payment amount:

在本文的代码示例中,我们将使用Payment类,它只有一个字段,即付款amount

public class Payment {
    private final int amount;
    // constructor and getter
}

In the tests, we’ll construct a flux of payments using the test helper method called fluxOfThreePayments:

在测试中,我们将使用名为fluxOfThreePayments的测试辅助方法构建一个flux的支付。

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

After that, we’ll use Spring Reactor’s StepVerifier to test the results.

之后,我们将使用Spring Reactor的StepVerifier来测试结果。

3. next()

3.next()

First, let’s try the next() method. This method will return the first element of the flux, wrapped into the reactive Mono type:

首先,让我们试试next()方法。这个方法将返回通量的第一个元素,被包装成反应式的Mono类型。

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

On the other hand, if we’ll call next() on an empty flux, the result will be an empty Mono. Consequently, blocking it will return null:

另一方面,如果我们在一个空的flux上调用next(),结果将是一个空的Mono。因此,阻止它将返回null

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.next();

    StepVerifier.create(firstPayment)
      .verifyComplete();
}

4. take()

4.take()

The take() method of a reactive flux is equivalent to limit() for Java 8 Streams. In other words, we can use take(1) to limit the flux to exactly one element and then use it in a blocking or non-blocking way:

反应式流的take()方法等同于Java 8 Streams的limit()换句话说,我们可以使用take(1)来将flux精确限制在一个元素上,然后以阻塞或非阻塞的方式使用它。

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);

    StepVerifier.create(firstPaymentFlux)
      .expectNext(paymentOf100)
      .verifyComplete();
}

Similarly, for an empty flux, take(1) will return an empty flux:

同样地,对于一个空的通量,take(1)将返回一个空的通量。

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();

    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);

    StepVerifier.create(firstPaymentFlux)
      .verifyComplete();
}

5. elementAt()

5.elementAt()

The Flux API also offers the elementAt() method. We can use elementAt(0) to get the first element of a flux in a non-blocking way:

Flux API还提供了elementAt()方法。我们可以使用elementAt(0)来以非阻塞的方式获得一个flux的第一个元素。

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

Though, if the index passed as a parameter is greater than the number of elements emitted by the flux, an error will be emitted:

不过,如果作为参数传递的索引大于通量所发出的元素数,就会发出错误。

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.elementAt(0);

    StepVerifier.create(firstPayment)
      .expectError(IndexOutOfBoundsException.class);
}

6. blockFirst()

6.blockFirst()

Alternatively, we can also use blockFirst(). Though, as the name suggests, this is a blocking method. As a result, if we use blockFirst(), we’ll be leaving the reactive world, and we’ll lose all its benefits:

另外,我们也可以使用blockFirst()。尽管如其名所示,这是一个阻塞方法。因此,如果我们使用blockFirst(),我们就会离开反应式世界,我们会失去它的所有好处。

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();

    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. toStream()

7.toStream()

Finally, we can convert the flux to a Java 8 stream and then access the first element:

最后,我们可以将通量转换为一个Java 8流,然后访问第一个元素。

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
      .findFirst();

    assertThat(firstPayment).contains(paymentOf100);
}

But, yet again, if we do this, we won’t be able to continue using the reactive pipelines.

但是,同样,如果我们这样做,我们将无法继续使用反应式管道。

8. Conclusion

8.结语

In this article, we discussed the API of Java’s reactive streams. We’ve seen various ways of accessing the first element of a Flux, and we learned that we should stick to the non-blocking solutions if we want to use the reactive pipelines.

在这篇文章中,我们讨论了Java的反应式流的API。我们看到了访问Flux的第一个元素的各种方法,我们了解到,如果我们想使用反应式管道,我们应该坚持使用非阻塞的解决方案。

As always, code from this article can be found over on GitHub.

一如既往,本文的代码可以在GitHub上找到over