1. Introduction
Java Flow API was introduced in Java 9 as an implementation of Reactive Stream Specification.
Java Flow API是在Java 9中引入的,作为反应式流规范的一个实现。
In this tutorial, we’ll first investigate reactive streams. Then, we’ll learn about its relation to RxJava and Flow API.
在本教程中,我们将首先研究反应式流。然后,我们将了解它与RxJava和Flow API的关系。
2. What Are Reactive Streams?
The Reactive Manifesto introduced Reactive Streams to specify a standard for asynchronous stream processing with non-blocking backpressure.
The scope of the Reactive Stream Specification is to define a minimal set of interfaces to achieve those ends:
- org.reactivestreams.Publisher is a data provider that publishes data to the subscribers based on their demand
- org.reactivestreams.Subscriber is the consumer of data – it can receive data after subscribing to a publisher
- org.reactivestreams.Subscription is created when a publisher accepts a subscriber
- org.reactivestreams.Processor is both a subscriber and a publisher – it subscribes to a publisher, processes the data and then passes the processed data to the subscriber
Flow API originates from the specification. RxJava precedes it, but since 2.0, RxJava has supported the spec, too.
Flow API起源于该规范。RxJava先于它,但从2.0开始,RxJava也支持该规范。
We’ll go deep into both, but first, let’s see a practical use case.
3. Use Case
For this tutorial, we’ll use a live stream video service as our use case.
A live stream video, contrary to on-demand video streaming, does not depend on the consumer. Therefore the server publishes the stream at its own pace, and it’s the consumer’s responsibility to adapt.
In the most simple form, our model consists of a video stream publisher and a video player as the subscriber.
Let’s implement VideoFrame as our data item:
让我们实现 VideoFrame作为我们的数据项。
public class VideoFrame {
private long number;
// additional data fields
// constructor, getters, setters
Then let’s go through our Flow API and RxJava implementations one-by-one.
然后让我们逐一查看Flow API和RxJava的实现。
4. Implementation With Flow API
4.用Flow API实现
The Flow APIs in JDK 9 correspond to the Reactive Streams Specification. With the Flow API, if the application initially requests N items, then the publisher pushes at most N items to the subscriber.
JDK 9中的Flow APIs对应于Reactive Streams规范。通过Flow API,如果应用程序最初请求了N个项目,那么发布者最多可以向订阅者推送N个项目。
The Flow API interfaces are all in the java.util.concurrent.Flow interface. They are semantically equivalent to their respective Reactive Streams counterparts.
Flow API接口都在java.util.concurrent.Flow接口中。它们在语义上等同于各自的Reactive Streams对应接口。
Let’s implement VideoStreamServer as the publisher of VideoFrame.
public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 5);
We have extended our VideoStreamServer from SubmissionPublisher instead of directly implementing Flow::Publisher. SubmissionPublisher is JDK implementation of Flow::Publisher for asynchronous communication with subscribers, so it lets our VideoStreamServer to emit at its own pace.
Also, it’s helpful for backpressure and buffer handling, because when SubmissionPublisher::subscribe called, it creates an instance of BufferedSubscription, and then adds the new subscription to its chain of subscriptions. BufferedSubscription can buffer issued items up to SubmissionPublisher#maxBufferCapacity.
Now let’s define VideoPlayer, which consumes a stream of VideoFrame. Hence it must implement Flow::Subscriber.
public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
public void onNext(VideoFrame item) {
log.info("play #{}" , item.getNumber());
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}" , throwable.getMessage());
public void onComplete() {
log.error("Video has ended");
VideoPlayer subscribes to VideoStreamServer, then after a successful subscription VideoPlayer::onSubscribe method is called, and it requests for one frame. VideoPlayer::onNext receive the frame and requests for a new one. The number of the requested frames depends on the use case and Subscriber implementations.
Finally, let’s put things together:
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
// submit video frames
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
+ " droped because of backpressure"));
return true;
}, 0, 1, TimeUnit.MILLISECONDS);
5. Implementation With RxJava
RxJava is a Java implementation of ReactiveX. The ReactiveX (or Reactive Extensions) project aims to provide a reactive programming concept. It’s a combination of the Observer pattern, the Iterator pattern, and functional programming.
RxJava是ReactiveX的一个Java实现。ReactiveX(或称Reactive Extensions)项目旨在提供一个反应式编程概念。它是Observer模式、Iterator模式以及函数式编程的结合。
The latest major version for RxJava is 3.x. RxJava supports Reactive Streams since version 2.x with its Flowable base class, but it’s a more significant set than Reactive Streams with several base classes like Flowable, Observable, Single, Completable.
RxJava的最新主要版本是3.x。RxJava从2.x版本开始就通过Flowable基类支持Reactive Streams,但它是一个比Reactive Streams更重要的集合,有几个基类,如Flowable, Observable, Single, Completable。
Flowable as reactive stream compliance component is a flow of 0 to N items with backpressure handling. Flowable extends Publisher from Reactive Streams. Therefore many RxJava operators accept Publisher directly and allow direct interoperation with other Reactive Streams implementations.
Flowable作为反应式流的遵从组件,是一个有背压处理的0到N项的流。Flowable扩展了来自Reactive Streams的Publisher。因此,许多RxJava操作者直接接受Publisher,并允许与其他Reactive Streams实现直接互操作。
Now, Let’s make our video stream generator which is an infinite lazy stream:
Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
// sleep for 1ms;
return new VideoFrame(videoFrame.getNumber() + 1);
Then we define a Flowable instance to generate frames on a separate thread:
It is important to note that an infinite stream is enough for us, but if we need a more flexible way to generate our stream, then Flowable.create is a good choice.
.create(new FlowableOnSubscribe<VideoFrame>() {
AtomicLong frame = new AtomicLong();
public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
while (true) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
//sleep for 1 ms to simualte delay
}, /* Set Backpressure Strategy Here */)
Then, at the next step, VideoPlayer subscribes to this Flowable and observes items on a separate thread.
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
And finally, we’ll configure the strategy for backpressure. If we want to stop the video in case of frame loss, hence we have to use BackpressureOverflowStrategy::ERROR when the buffer is full.
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
6. Comparison of RxJava and Flow API
6.RxJava和Flow API的比较
Even in these two simple implementations, we can see how RxJava’s API is rich, especially for buffer management, error handling, and backpressure strategy. It gives us more options and fewer lines of code with its fluent API. Now let’s consider more complicated cases.
Assume that our player can’t display video frames without a codec. Hence with Flow API, we need to implement a Processor to simulate the codec and sit between server and player. With RxJava, we can do it with Flowable::flatMap or Flowable::map.
假设我们的播放器没有编解码器就不能显示视频帧。因此,通过Flow API,我们需要实现一个Processor来模拟编解码器,并坐在服务器和播放器之间。通过RxJava,我们可以用Flowable::flatMap或Flowable::map来实现。
Or let’s imagine that our player is also going to broadcast live translation audio, so we have to combine streams of video and audio from separate publishers. With RxJava, we can use Flowable::combineLatest, but with Flow API, it is not an easy task.
或者让我们想象一下,我们的播放器也要直播翻译音频,所以我们必须将来自不同发布者的视频和音频流结合起来。使用RxJava,我们可以使用Flowable::combinLatest,但使用Flow API,这并不是一件容易的事。
Although, it is possible to write a custom Processor that subscribes to both streams and sends the combined data to our VideoPlayer. The implementation is a headache, however.
7. Why Flow API?
7.为什么是Flow API?
At this point, we may have a question, what is the philosophy behind the Flow API?
说到这里,我们可能有一个问题,Flow API背后的理念是什么?
If we search for Flow API usages in the JDK, we can find something in java.net.http and jdk.internal.net.http.
如果我们在JDK中搜索Flow API的用法,我们可以在java.net.http和jdk.internal.net.http中找到一些。
Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.
此外,我们可以在reactor项目或reactive stream包中找到适配器。例如,org.reactivestreams.FlowAdapters有将Flow API接口转换为反应流接口的方法,反之亦然。因此,它有助于Flow API与支持反应流的库之间的互操作性。
All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.
所有这些事实都有助于我们理解Flow API的目的。此外,Java希望Flow API能够被接受为反应式规范的标准接口,并在JDK或其他基于Java的库中使用,以实现中间件和实用程序的反应式规范。
8. Conclusions
8.8. 结论
In this tutorial, we’ve got an introduction to Reactive Stream Specification, Flow API, and RxJava.
在本教程中,我们已经对反应式流规范、Flow API和RxJava进行了介绍。
Furthermore, we’ve seen a practical example of Flow API and RxJava implementations for a live video stream.
此外,我们还看到了一个Flow API和RxJava实现直播视频流的实际例子。
But all aspects of Flow API and RxJava like Flow::Processor, Flowable::map and Flowable::flatMap or backpressure strategies are not covered here.
但Flow API和RxJava的所有方面,如Flow::Processor、Flowable::map和Flowable::flatMap或背压策略,这里都没有涉及。
As always, you find the tutorial’s complete code over on GitHub.