Java 9 Reactive Streams – Java 9 反应式流

最后修改: 2017年 6月 3日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll be looking at the Java 9 Reactive Streams. Simply put, we’ll be able to use the Flow class, which encloses the primary building blocks for building reactive stream processing logic.

在这篇文章中,我们将研究Java 9的反应式流。简单地说,我们将能够使用Flow类,它包含了构建反应式流处理逻辑的主要构建块。

Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure. This specification is defined in the Reactive Manifesto, and there are various implementations of it, for example, RxJava or Akka-Streams.

Reactive Streams是一个用于非阻塞背压的异步流处理的标准。该规范在Reactive Manifesto中定义,并且有各种实现,例如,RxJavaAkka-Streams。

2. Reactive API Overview

2.反应式API概述

To build a Flow, we can use three main abstractions and compose them into asynchronous processing logic.

为了构建一个Flow,我们可以使用三个主要的抽象,并将它们组成异步处理逻辑。

Every Flow needs to process events that are published to it by a Publisher instance; the Publisher has one method – subscribe().

每个流程都需要处理由Publisher实例发布的事件Publisher有一个方法 – subscribe()。

If any of the subscribers want to receive events published by it, they need to subscribe to the given Publisher.

如果任何一个订阅者想要接收由它发布的事件,他们需要订阅给定的发布者

The receiver of messages needs to implement the Subscriber interface. Typically this is the end for every Flow processing because the instance of it does not send messages further.

消息的接收者需要实现Subscriber接口。通常这是每个Flow处理的终点,因为它的实例不会进一步发送消息。

We can think about Subscriber as a Sink. This has four methods that need to be overridden – onSubscribe(), onNext(), onError(), and onComplete(). We’ll be looking at those in the next section.

我们可以将Subscriber视为Sink.,它有四个需要被重载的方法–onSubscribe()、onNext()、onError()、onComplete()。我们将在下一节中讨论这些方法。

If we want to transform incoming message and pass it further to the next Subscriber, we need to implement the Processor interface. This acts both as a Subscriber because it receives messages, and as the Publisher because it processes those messages and sends them for further processing.

如果我们想要转换传入的消息并将其进一步传递给下一个订阅者,我们需要实现处理器接口。它既作为订阅者,因为它接收消息,又作为发布者,因为它处理这些消息并将其发送给进一步处理。

3. Publishing and Consuming Messages

3.发布和消费信息

Let’s say we want to create a simple Flow, in which we have a Publisher publishing messages, and a simple Subscriber consuming messages as they arrive – one at the time.

假设我们想创建一个简单的流程,其中我们有一个发布者发布消息,和一个简单的订阅者在消息到达时消费它们 – 一次一个。

Let’s create an EndSubscriber class. We need to implement the Subscriber interface. Next, we’ll override the required methods.

让我们创建一个EndSubscriber类。我们需要实现Subscriber接口。接下来,我们将覆盖所需的方法。

The onSubscribe() method is called before processing starts. The instance of the Subscription is passed as the argument. It is a class that is used to control the flow of messages between Subscriber and the Publisher:

onSubscribe() 方法在处理开始之前被调用。Subscription的实例被作为参数传递。它是一个用来控制SubscriberPublisher:之间的消息流的类。

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

We also initialized an empty List of consumedElements that’ll be utilized in the tests.

我们还初始化了一个空的ListconsumedElements,它将在测试中被利用。

Now, we need to implement the remaining methods from the Subscriber interface. The main method here is onNext() – this is called whenever the Publisher publishes a new message:

现在,我们需要实现Subscriber接口的其余方法。这里的主要方法是onNext() – 每当Publisher发布一条新消息时,这个方法就会被调用。

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    consumedElements.add(item);
    subscription.request(1);
}

Note that when we started the subscription in the onSubscribe() method and when we processed a message we need to call the request() method on the Subscription to signal that the current Subscriber is ready to consume more messages.

请注意,当我们在onSubscribe() 方法中启动订阅时,以及当我们处理一条消息时,我们需要在Subscription 上调用request() 方法,以示当前Subscriber 已准备好消费更多消息。

Lastly, we need to implement onError() – which is called whenever some exception will be thrown in the processing, as well as onComplete() – called when the Publisher is closed:

最后,我们需要实现onError()–只要在处理过程中抛出一些异常就会调用,以及onComplete()–当Publisher被关闭时调用。

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Done");
}

Let’s write a test for the Processing Flow. We’ll be using the SubmissionPublisher class – a construct from the java.util.concurrent – which implements the Publisher interface.

让我们为处理流程写一个测试。我们将使用SubmissionPublisher类–来自java.util.concurrent的一个构造–它实现了Publisher接口。

We’re going to be submitting N elements to the Publisher – which our EndSubscriber will be receiving:

我们将向Publisher提交N元素–我们的EndSubscriber将接收这些元素。

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

Note, that we’re calling the close() method on the instance of the EndSubscriber. It will invoke onComplete() callback underneath on every Subscriber of the given Publisher.

注意,我们在EndSubscriber的实例上调用close()方法。它将在给定Publisher的每个Subscriber上调用onComplete()回调。

Running that program will produce the following output:

运行该程序将产生以下输出。

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4. Transformation of Messages

4.信息的转化

Let’s say that we want to build similar logic between a Publisher and a Subscriber, but also apply some transformation.

假设我们想在出版者订阅者之间建立类似的逻辑,但也要应用一些转换。

We’ll create the TransformProcessor class that implements Processor and extends SubmissionPublisher – as this will be both Publisher and Subscriber.

我们将创建TransformProcessor类,它实现了Processor并扩展了SubmissionPublisher-,因为这将同时是Publisher和Subscriber。

We’ll pass in a Function that will transform inputs into outputs:

我们将传入一个Function,它将把输入转化为输出。

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {

    private Function<T, R> function;
    private Flow.Subscription subscription;

    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
}

Let’s now write a quick test with a processing flow in which the Publisher is publishing String elements.

现在让我们写一个快速测试,在这个处理流程中,Publisher正在发布Stringelements。

Our TransformProcessor will be parsing the String as Integer – which means a conversion needs to happen here:

我们的TransformProcessor将把String解析为Integer – 这意味着这里需要进行转换。

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

Note, that calling the close() method on the base Publisher will cause the onComplete() method on the TransformProcessor to be invoked.

注意,在基础Publisher上调用close()方法将导致TransformProcessor上的onComplete()方法被调用。

Keep in mind that all publishers in the processing chain need to be closed this way.

请记住,处理链中的所有出版商都需要以这种方式关闭。

5. Controlling Demand for Messages Using the Subscription

5.使用订阅控制对信息的需求

Let’s say that we want to consume only the first element from the Subscription, apply some logic and finish processing. We can use the request() method to achieve this.

假设我们只想从订阅号中消费第一个元素,应用一些逻辑并完成处理。我们可以使用request() 方法来实现这一点。

Let’s modify our EndSubscriber to consume only N number of messages. We’ll be passing that number as the howMuchMessagesConsume constructor argument:

让我们修改我们的EndSubscriber,使其只消耗N个消息。我们将把这个数字作为howMuchMessagesConsume构造器的参数来传递。

public class EndSubscriber<T> implements Subscriber<T> {
 
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
    //...
    
}

We can request elements as long we want to.

我们可以要求元素,只要我们想。

Let’s write a test in which we only want to consume one element from the given Subscription:

让我们写一个测试,其中我们只想从给定的Subscription:中消费一个元素。

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

Although the publisher is publishing six elements, our EndSubscriber will consume only one element because it signals demand for processing only that single one.

尽管发布者正在发布六个元素,我们的结束订阅者将只消费一个元素,因为它只发出了处理这一个元素的信号。

By using the request() method on the Subscription, we can implement a more sophisticated back-pressure mechanism to control the speed of the message consumption.

通过在Subscription上使用request() 方法,我们可以实现一个更复杂的反压机制来控制消息消耗的速度。

6. Conclusion

6.结论

In this article, we had a look at the Java 9 Reactive Streams.

在这篇文章中,我们看了一下Java 9的Reactive Streams。

We saw how to create a processing Flow consisting of a Publisher and a Subscriber. We created a more complex processing flow with the transformation of elements using Processors.

我们看到了如何创建一个由发布者订阅者组成的处理流程。我们创建了一个更复杂的处理流程,使用处理器对元素进行转换。

Finally, we used the Subscription to control the demand for elements by the Subscriber.

最后,我们用Subscription来控制Subscriber对元素的需求。

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。