Introduction to Project Reactor Bus – 项目反应器总线介绍

最后修改: 2017年 1月 6日


1. Overview


In this quick article, we’ll introduce the reactor-bus by setting up a real-life scenario for a reactive, event-driven application.


NOTE: The reactor-bus project has been removed in Reactor 3.x: Archived reactor-bus repository

注意:reactor-bus项目已在Reactor 3.x中被删除。Archived reactor-bus repository/a>

2. The Basics of Project Reactor


2.1. Why Reactor?


Modern applications need to deal with a huge number of concurrent requests and process a significant amount of data. Standard, blocking code is no longer sufficient to fulfill these requirements.


The reactive design pattern is an event-based architectural approach for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.


The Project Reactor is based on this pattern and has a clear and ambitious goal of building non-blocking, reactive applications on the JVM.


2.2. Example Scenarios


Before we get started, here are a few interesting scenarios where leveraging the reactive architectural style would make sense, just to get an idea of where we might apply it:


  • Notification services for a large online shopping platform like Amazon
  • Huge transaction processing services for the banking sector
  • Stocks trading businesses where stocks’ prices change simultaneously

3. Maven Dependencies


Let’s start to use Project Reactor Bus by adding the following dependency into our pom.xml:

让我们开始使用Project Reactor Bus,在我们的pom.xml中添加以下依赖性:


We can check the latest version of reactor-bus in Maven Central.

我们可以在Maven Central中查看reactor-bus的最新版本。

4. Building a Demo Application


To better understand the benefits of the reactor-based approach, let’s look at a practical example.


We’ll build a simple application responsible for sending notifications to the users of an online shopping platform. For example, if a user places a new order, then the app sends an order confirmation via email or SMS.


A typical synchronous implementation would naturally be limited by the email or SMS service’s throughput. Therefore, traffic spikes, such as holidays would generally be problematic.


With a reactive approach, we can design our system to be more flexible and to adapt better to failures or timeouts that may occur in the external systems, such as gateway servers.


Let’s have a look at the application – starting with the more traditional aspects and moving on to the more reactive constructs.


4.1. Simple POJO


First, let’s create a POJO class to represent the notification data:


public class NotificationData {
    private long id;
    private String name;
    private String email;
    private String mobile;
    // getter and setter methods

4.2. The Service Layer


Let’s now define a simple service layer:


public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;


And the implementation, simulating a long-running operation:


public class NotificationServiceimpl implements NotificationService {
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());

Notice that to illustrate a real-life scenario of sending messages via an SMS or email gateway, we’re intentionally introducing a five seconds delay in the initiateNotification method with Thread.sleep(5000).


Consequently, when a thread hits the service, it’ll be blocked for five seconds.


4.3. The Consumer


Let’s now jump into the more reactive aspects of our application and implement a consumer – which we’ll then map to the reactor event bus:


public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    private NotificationService notificationService;
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        try {
        } catch (InterruptedException e) {
            // ignore        


As we can see, the consumer we created implements the Consumer<T> interface. The main logic resides in the accept method.

我们可以看到,我们创建的消费者实现了Consumer<T> 接口。主要的逻辑位于accept方法中。

This is a similar approach we can meet in a typical Spring listener implementation.


4.4. The Controller


Finally, now that we’re able to consume the events, let’s also generate them.


We’re going to do that in a simple controller:


public class NotificationController {

    private EventBus eventBus;

    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();

            eventBus.notify("notificationConsumer", Event.wrap(data));

              "Notification " + i + ": notification task submitted successfully");

This is quite self-explanatory – we’re emitting events through the EventBus here.


For example, if a client hits the URL with a param value of ten, then ten events will be sent through the event bus.


4.5. The Java Config


Let’s now put everything together and create a simple Spring Boot application.

现在让我们把所有东西放在一起,创建一个简单的Spring Boot应用程序。

First, we need to configure EventBus and Environment beans:

首先,我们需要配置EventBusEnvironment Bean。

public class Config {

    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();

    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);

In our case, we’re instantiating the EventBus with a default thread pool available in the environment.


Alternatively, we can use a customized Dispatcher instance:


EventBus evBus = EventBus.create(

Now, we’re ready to create a main application code:


import static reactor.bus.selector.Selectors.$;

public class NotificationApplication implements CommandLineRunner {

    private EventBus eventBus;

    private NotificationConsumer notificationConsumer;

    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);

    public static void main(String[] args) {, args);

In our run method we’re registering the notificationConsumer to be triggered when the notification matches a given selector.


Notice how we’re using the static import of the $ attribute to create a Selector object.


5. Test the Application


Let’s now create a test to see our NotificationApplication in action:


@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    private int port;

    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);

As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.


Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

It’s important to keep in mind that in our scenario there’s no need to process these events in any particular order.


6. Conclusion


In this quick tutorial, we’ve created a simple event-driven application. We’ve also seen how to start writing a more reactive and non-blocking code.


However, this scenario just scratches the surface of the subject and represents just a good base to start experimenting with the reactive paradigm.


As always, the source code is available over on GitHub.