Introduction to Project Reactor Bus – 项目反应器总线介绍

最后修改: 2017年 1月 6日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this quick article, we’ll introduce the reactor-bus by setting up a real-life scenario for a reactive, event-driven application.

在这篇快速文章中,我们将通过设置一个反应式、事件驱动应用程序的真实场景来介绍反应式总线

NOTE: The reactor-bus project has been removed in Reactor 3.x: Archived reactor-bus repository

注意:reactor-bus项目已在Reactor 3.x中被删除。Archived reactor-bus repository/a>

2. The Basics of Project Reactor

2.项目反应器的基础知识

2.1. Why Reactor?

2.1.为什么是反应堆?

Modern applications need to deal with a huge number of concurrent requests and process a significant amount of data. Standard, blocking code is no longer sufficient to fulfill these requirements.

现代应用程序需要处理大量的并发请求和处理大量的数据。标准的、阻塞的代码已经不足以满足这些要求。

The reactive design pattern is an event-based architectural approach for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.

反应式设计模式是一种基于事件的架构方法,用于异步处理来自单个或多个服务处理程序的大量并发的服务请求

The Project Reactor is based on this pattern and has a clear and ambitious goal of building non-blocking, reactive applications on the JVM.

项目Reactor是基于这种模式的,它有一个明确而雄心勃勃的目标,即在JVM上构建非阻塞、反应式的应用程序.

2.2. Example Scenarios

2.2.示例场景

Before we get started, here are a few interesting scenarios where leveraging the reactive architectural style would make sense, just to get an idea of where we might apply it:

在我们开始之前,这里有几个有趣的场景,利用反应式架构风格是有意义的,只是为了了解我们可能应用它的地方。

  • Notification services for a large online shopping platform like Amazon
  • Huge transaction processing services for the banking sector
  • Stocks trading businesses where stocks’ prices change simultaneously

3. Maven Dependencies

3.Maven的依赖性

Let’s start to use Project Reactor Bus by adding the following dependency into our pom.xml:

让我们开始使用Project Reactor Bus,在我们的pom.xml中添加以下依赖性:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

We can check the latest version of reactor-bus in Maven Central.

我们可以在Maven Central中查看reactor-bus的最新版本。

4. Building a Demo Application

4.建立一个演示应用程序

To better understand the benefits of the reactor-based approach, let’s look at a practical example.

为了更好地理解基于反应器的方法的好处,让我们看看一个实际的例子。

We’ll build a simple application responsible for sending notifications to the users of an online shopping platform. For example, if a user places a new order, then the app sends an order confirmation via email or SMS.

我们将建立一个简单的应用程序,负责向一个在线购物平台的用户发送通知。例如,如果一个用户下了一个新的订单,那么这个应用程序就会通过电子邮件或短信发送订单确认。

A typical synchronous implementation would naturally be limited by the email or SMS service’s throughput. Therefore, traffic spikes, such as holidays would generally be problematic.

一个典型的同步实现自然会受到电子邮件或SMS服务的吞吐量的限制。因此,流量高峰,如节假日,通常会有问题。

With a reactive approach, we can design our system to be more flexible and to adapt better to failures or timeouts that may occur in the external systems, such as gateway servers.

通过反应式方法,我们可以将我们的系统设计得更加灵活,更好地适应外部系统(如网关服务器)可能出现的故障或超时。

Let’s have a look at the application – starting with the more traditional aspects and moving on to the more reactive constructs.

让我们来看看这个应用–从更传统的方面开始,到更多的反应式结构。

4.1. Simple POJO

4.1.简单的POJO

First, let’s create a POJO class to represent the notification data:

首先,让我们创建一个POJO类来表示通知数据。

public class NotificationData {
	
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. The Service Layer

4.2.服务层

Let’s now define a simple service layer:

现在我们来定义一个简单的服务层。

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

And the implementation, simulating a long-running operation:

而实施,模拟了一个长期运行的操作。

@Service
public class NotificationServiceimpl implements NotificationService {
	
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
		
      Thread.sleep(5000);
		
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

Notice that to illustrate a real-life scenario of sending messages via an SMS or email gateway, we’re intentionally introducing a five seconds delay in the initiateNotification method with Thread.sleep(5000).

请注意,为了说明通过SMS或电子邮件网关发送消息的真实场景,我们故意在initiateNotification方法中用Thread.sleep(5000)引入5秒的延迟。

Consequently, when a thread hits the service, it’ll be blocked for five seconds.

因此,当一个线程击中服务时,它将被阻断五秒钟。

4.3. The Consumer

4.3.消费者

Let’s now jump into the more reactive aspects of our application and implement a consumer – which we’ll then map to the reactor event bus:

现在让我们跳到我们应用程序的更多反应性方面,并实现一个消费者–然后我们将其映射到反应器事件总线。

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
	
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }	
    }
}

 

As we can see, the consumer we created implements the Consumer<T> interface. The main logic resides in the accept method.

我们可以看到,我们创建的消费者实现了Consumer<T> 接口。主要的逻辑位于accept方法中。

This is a similar approach we can meet in a typical Spring listener implementation.

这是我们在典型的Spring监听器实现中可以见到的类似方法。

4.4. The Controller

4.4.控制器

Finally, now that we’re able to consume the events, let’s also generate them.

最后,现在我们能够消费事件,让我们也来生成它们。

We’re going to do that in a simple controller:

我们将在一个简单的控制器中做到这一点。

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

This is quite self-explanatory – we’re emitting events through the EventBus here.

这是不言自明的–我们在这里通过EventBus来发射事件。

For example, if a client hits the URL with a param value of ten, then ten events will be sent through the event bus.

例如,如果一个客户点击参数值为10的URL,那么将通过事件总线发送10个事件。

4.5. The Java Config

4.5.Java配置

Let’s now put everything together and create a simple Spring Boot application.

现在让我们把所有东西放在一起,创建一个简单的Spring Boot应用程序。

First, we need to configure EventBus and Environment beans:

首先,我们需要配置EventBusEnvironment Bean。

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

In our case, we’re instantiating the EventBus with a default thread pool available in the environment.

在我们的案例中,我们正在用环境中可用的默认线程池来实例化EventBus

Alternatively, we can use a customized Dispatcher instance:

另外,我们可以使用一个自定义的Dispatcher实例。

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

Now, we’re ready to create a main application code:

现在,我们准备创建一个主程序代码。

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

In our run method we’re registering the notificationConsumer to be triggered when the notification matches a given selector.

在我们的run方法中,我们正在注册notificationConsumer,以便在通知匹配给定的选择器时被触发

Notice how we’re using the static import of the $ attribute to create a Selector object.

注意我们如何使用$属性的静态导入来创建一个Selector对象。

5. Test the Application

5.测试应用程序

Let’s now create a test to see our NotificationApplication in action:

现在让我们创建一个测试,看看我们的NotificationApplication的运行情况。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.

我们可以看到,一旦请求被执行,所有10个任务都会立即被提交,而不会产生任何阻塞。而一旦提交,通知事件就会被并行处理。

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

It’s important to keep in mind that in our scenario there’s no need to process these events in any particular order.

重要的是要记住,在我们的方案中,不需要以任何特定的顺序处理这些事件。

6. Conclusion

6.结论

In this quick tutorial, we’ve created a simple event-driven application. We’ve also seen how to start writing a more reactive and non-blocking code.

在这个快速教程中,我们已经创建了一个简单的事件驱动的应用程序。我们还看到了如何开始编写更多的反应式和非阻塞式代码。

However, this scenario just scratches the surface of the subject and represents just a good base to start experimenting with the reactive paradigm.

然而,这种情况只是触及了主题的表面,只是代表了一个很好的基础,可以开始尝试反应式范式

As always, the source code is available over on GitHub.

像往常一样,源代码可在GitHub上获得