1. Introduction
1.绪论
Java Flow API was introduced in Java 9 as an implementation of Reactive Stream Specification.
Java Flow API是在Java 9中引入的,作为反应式流规范的一个实现。
In this tutorial, we’ll first investigate reactive streams. Then, we’ll learn about its relation to RxJava and Flow API.
在本教程中,我们将首先研究反应式流。然后,我们将了解它与RxJava和Flow API的关系。
2. What Are Reactive Streams?
2.什么是反应式流?
The Reactive Manifesto introduced Reactive Streams to specify a standard for asynchronous stream processing with non-blocking backpressure.
反应式宣言引入了反应式流,为具有非阻塞反压的异步流处理指定了一个标准。
The scope of the Reactive Stream Specification is to define a minimal set of interfaces to achieve those ends:
反应式流规范的范围是定义一组最小的接口以实现这些目的。
- org.reactivestreams.Publisher is a data provider that publishes data to the subscribers based on their demand
- org.reactivestreams.Subscriber is the consumer of data – it can receive data after subscribing to a publisher
- org.reactivestreams.Subscription is created when a publisher accepts a subscriber
- org.reactivestreams.Processor is both a subscriber and a publisher – it subscribes to a publisher, processes the data and then passes the processed data to the subscriber
Flow API originates from the specification. RxJava precedes it, but since 2.0, RxJava has supported the spec, too.
Flow API起源于该规范。RxJava先于它,但从2.0开始,RxJava也支持该规范。
We’ll go deep into both, but first, let’s see a practical use case.
我们将深入探讨这两点,但首先,让我们看看一个实际的使用案例。
3. Use Case
3.使用案例
For this tutorial, we’ll use a live stream video service as our use case.
在本教程中,我们将使用一个直播视频服务作为我们的用例。
A live stream video, contrary to on-demand video streaming, does not depend on the consumer. Therefore the server publishes the stream at its own pace, and it’s the consumer’s responsibility to adapt.
与按需视频流相反,实时流视频不依赖于消费者。因此,服务器以自己的速度发布流媒体,而消费者有责任去适应。
In the most simple form, our model consists of a video stream publisher and a video player as the subscriber.
在最简单的形式下,我们的模型由一个视频流发布者和一个作为用户的视频播放器组成。
Let’s implement VideoFrame as our data item:
让我们实现 VideoFrame作为我们的数据项。
public class VideoFrame {
private long number;
// additional data fields
// constructor, getters, setters
}
Then let’s go through our Flow API and RxJava implementations one-by-one.
然后让我们逐一查看Flow API和RxJava的实现。
4. Implementation With Flow API
4.用Flow API实现
The Flow APIs in JDK 9 correspond to the Reactive Streams Specification. With the Flow API, if the application initially requests N items, then the publisher pushes at most N items to the subscriber.
JDK 9中的Flow APIs对应于Reactive Streams规范。通过Flow API,如果应用程序最初请求了N个项目,那么发布者最多可以向订阅者推送N个项目。
The Flow API interfaces are all in the java.util.concurrent.Flow interface. They are semantically equivalent to their respective Reactive Streams counterparts.
Flow API接口都在java.util.concurrent.Flow接口中。它们在语义上等同于各自的Reactive Streams对应接口。
Let’s implement VideoStreamServer as the publisher of VideoFrame.
让我们实现VideoStreamServer作为VideoFrame的发布者。
public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 5);
}
}
We have extended our VideoStreamServer from SubmissionPublisher instead of directly implementing Flow::Publisher. SubmissionPublisher is JDK implementation of Flow::Publisher for asynchronous communication with subscribers, so it lets our VideoStreamServer to emit at its own pace.
我们从SubmissionPublisher扩展了我们的VideoStreamServer,而不是直接实现Flow::Publisher。SubmissionPublisher是JDK对Flow::Publisher的实现,用于与订阅者进行异步通信,所以它让我们的VideoStreamServer以自己的节奏发射信息。
Also, it’s helpful for backpressure and buffer handling, because when SubmissionPublisher::subscribe called, it creates an instance of BufferedSubscription, and then adds the new subscription to its chain of subscriptions. BufferedSubscription can buffer issued items up to SubmissionPublisher#maxBufferCapacity.
另外,它对背压和缓冲区处理很有帮助,因为当SubmissionPublisher::subscribe被调用时,它会创建一个BufferedSubscription的实例,然后将新的订阅添加到其订阅链中。BufferedSubscription可以缓冲发布的项目,最多到SubmissionPublisher#maxBufferCapacity。
Now let’s define VideoPlayer, which consumes a stream of VideoFrame. Hence it must implement Flow::Subscriber.
现在让我们定义VideoPlayer,它消费一个VideoFrame的流。因此,它必须实现Flow::Subscriber。
public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
log.info("play #{}" , item.getNumber());
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}" , throwable.getMessage());
}
@Override
public void onComplete() {
log.error("Video has ended");
}
}
VideoPlayer subscribes to VideoStreamServer, then after a successful subscription VideoPlayer::onSubscribe method is called, and it requests for one frame. VideoPlayer::onNext receive the frame and requests for a new one. The number of the requested frames depends on the use case and Subscriber implementations.
VideoPlayer订阅VideoStreamServer,然后在订阅成功后VideoPlayer::onSubscribe方法被调用,它请求获得一帧。VideoPlayer::onNext接收该帧,并要求获得一个新的帧。请求的帧的数量取决于用例和Subscriber的实现。
Finally, let’s put things together:
最后,让我们把事情放在一起。
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
// submit video frames
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
+ " droped because of backpressure"));
return true;
});
}, 0, 1, TimeUnit.MILLISECONDS);
sleep(1000);
5. Implementation With RxJava
5.用RxJava实现
RxJava is a Java implementation of ReactiveX. The ReactiveX (or Reactive Extensions) project aims to provide a reactive programming concept. It’s a combination of the Observer pattern, the Iterator pattern, and functional programming.
RxJava是ReactiveX的一个Java实现。ReactiveX(或称Reactive Extensions)项目旨在提供一个反应式编程概念。它是Observer模式、Iterator模式以及函数式编程的结合。
The latest major version for RxJava is 3.x. RxJava supports Reactive Streams since version 2.x with its Flowable base class, but it’s a more significant set than Reactive Streams with several base classes like Flowable, Observable, Single, Completable.
RxJava的最新主要版本是3.x。RxJava从2.x版本开始就通过Flowable基类支持Reactive Streams,但它是一个比Reactive Streams更重要的集合,有几个基类,如Flowable, Observable, Single, Completable。
Flowable as reactive stream compliance component is a flow of 0 to N items with backpressure handling. Flowable extends Publisher from Reactive Streams. Therefore many RxJava operators accept Publisher directly and allow direct interoperation with other Reactive Streams implementations.
Flowable作为反应式流的遵从组件,是一个有背压处理的0到N项的流。Flowable扩展了来自Reactive Streams的Publisher。因此,许多RxJava操作者直接接受Publisher,并允许与其他Reactive Streams实现直接互操作。
Now, Let’s make our video stream generator which is an infinite lazy stream:
现在,让我们制作我们的视频流发生器,这是一个无限的懒惰流。
Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
// sleep for 1ms;
return new VideoFrame(videoFrame.getNumber() + 1);
});
Then we define a Flowable instance to generate frames on a separate thread:
然后我们定义一个Flowable实例,在一个单独的线程上生成框架。
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
It is important to note that an infinite stream is enough for us, but if we need a more flexible way to generate our stream, then Flowable.create is a good choice.
需要注意的是,一个无限的流对我们来说已经足够了,但是如果我们需要一个更灵活的方式来生成我们的流,那么Flowable.create是一个不错的选择。
Flowable
.create(new FlowableOnSubscribe<VideoFrame>() {
AtomicLong frame = new AtomicLong();
@Override
public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
while (true) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
//sleep for 1 ms to simualte delay
}
}
}, /* Set Backpressure Strategy Here */)
Then, at the next step, VideoPlayer subscribes to this Flowable and observes items on a separate thread.
然后,在下一步,VideoPlayer订阅了这个Flowable,并在一个单独的线程上观察项目。
videoFlowable
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
And finally, we’ll configure the strategy for backpressure. If we want to stop the video in case of frame loss, hence we have to use BackpressureOverflowStrategy::ERROR when the buffer is full.
最后,我们将配置背压的策略。如果我们想在丢帧的情况下停止视频,因此我们必须在缓冲区满时使用BackpressureOverflowStrategy::ERROR。
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
6. Comparison of RxJava and Flow API
6.RxJava和Flow API的比较
Even in these two simple implementations, we can see how RxJava’s API is rich, especially for buffer management, error handling, and backpressure strategy. It gives us more options and fewer lines of code with its fluent API. Now let’s consider more complicated cases.
即使在这两个简单的实现中,我们也可以看到RxJava的API是多么的丰富,特别是在缓冲区管理、错误处理和反压策略方面。它以其流畅的API为我们提供了更多的选择和更少的代码行。现在让我们来考虑更复杂的情况。
Assume that our player can’t display video frames without a codec. Hence with Flow API, we need to implement a Processor to simulate the codec and sit between server and player. With RxJava, we can do it with Flowable::flatMap or Flowable::map.
假设我们的播放器没有编解码器就不能显示视频帧。因此,通过Flow API,我们需要实现一个Processor来模拟编解码器,并坐在服务器和播放器之间。通过RxJava,我们可以用Flowable::flatMap或Flowable::map来实现。
Or let’s imagine that our player is also going to broadcast live translation audio, so we have to combine streams of video and audio from separate publishers. With RxJava, we can use Flowable::combineLatest, but with Flow API, it is not an easy task.
或者让我们想象一下,我们的播放器也要直播翻译音频,所以我们必须将来自不同发布者的视频和音频流结合起来。使用RxJava,我们可以使用Flowable::combinLatest,但使用Flow API,这并不是一件容易的事。
Although, it is possible to write a custom Processor that subscribes to both streams and sends the combined data to our VideoPlayer. The implementation is a headache, however.
虽然,我们可以编写一个自定义的Processor,订阅两个流,并将合并的数据发送到我们的VideoPlayer。
7. Why Flow API?
7.为什么是Flow API?
At this point, we may have a question, what is the philosophy behind the Flow API?
说到这里,我们可能有一个问题,Flow API背后的理念是什么?
If we search for Flow API usages in the JDK, we can find something in java.net.http and jdk.internal.net.http.
如果我们在JDK中搜索Flow API的用法,我们可以在java.net.http和jdk.internal.net.http中找到一些。
Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.
此外,我们可以在reactor项目或reactive stream包中找到适配器。例如,org.reactivestreams.FlowAdapters有将Flow API接口转换为反应流接口的方法,反之亦然。因此,它有助于Flow API与支持反应流的库之间的互操作性。
All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.
所有这些事实都有助于我们理解Flow API的目的。此外,Java希望Flow API能够被接受为反应式规范的标准接口,并在JDK或其他基于Java的库中使用,以实现中间件和实用程序的反应式规范。
8. Conclusions
8.8. 结论
In this tutorial, we’ve got an introduction to Reactive Stream Specification, Flow API, and RxJava.
在本教程中,我们已经对反应式流规范、Flow API和RxJava进行了介绍。
Furthermore, we’ve seen a practical example of Flow API and RxJava implementations for a live video stream.
此外,我们还看到了一个Flow API和RxJava实现直播视频流的实际例子。
But all aspects of Flow API and RxJava like Flow::Processor, Flowable::map and Flowable::flatMap or backpressure strategies are not covered here.
但Flow API和RxJava的所有方面,如Flow::Processor、Flowable::map和Flowable::flatMap或背压策略,这里都没有涉及。
As always, you find the tutorial’s complete code over on GitHub.
一如既往,你可以在GitHub上找到该教程的完整代码。