Reactive Systems in Java – Java中的反应式系统

最后修改: 2020年 7月 14日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll understand the basics of creating reactive systems in Java using Spring and other tools and frameworks.

在本教程中,我们将了解使用Spring及其他工具和框架在Java中创建反应式系统的基础知识。

In the process, we’ll discuss how reactive programming is just a driver towards creating a reactive system. This will help us understand the rationale for creating reactive systems and different specifications, libraries, and standards it has inspired along the way.

在这个过程中,我们将讨论反应式编程如何只是创建反应式系统的一个驱动力。这将有助于我们理解创建反应式系统的理由,以及它一路走来所激发的不同规范、库和标准。

2. What Are Reactive Systems?

2.什么是反应型系统?

Over the last few decades, the technology landscape has seen several disruptions that have led to a complete transformation in the way we see value in technology. The computing world before the Internet could never have imagined the ways and means in which it will change our current day.

在过去的几十年里,技术领域发生了几次颠覆性的变化,导致我们看待技术价值的方式发生了彻底的转变。互联网之前的计算世界永远无法想象它将以何种方式和途径改变我们今天的生活。

With the reach of the Internet to the masses and the ever-evolving experience it promises, application architects need to be on their toes to meet their demand.

随着互联网对大众的影响以及它所承诺的不断发展的体验,应用架构师需要保持警惕以满足他们的需求。

Fundamentally, this means that we can never design an application in the way we used to earlier. A highly responsive application is no longer a luxury but a necessity.

从根本上说,这意味着我们再也不能像以前那样设计一个应用程序。一个高度响应的应用程序不再是一种奢侈品,而是一种必需品

That, too, is in the face of random failures and unpredictable load. The need of the hour is not just to get the correct result but get to get it fast! It’s quite important to drive the amazing user experiences we promise to deliver.

这也是在面对随机故障和不可预测的负载时。当务之急是不仅要得到正确的结果,而且要得到快速的结果!这对推动我们承诺提供的惊人的用户体验相当重要。

This is what creates the need for an architectural style that can give us Reactive Systems.

这就是为什么需要一种能够给我们提供反应式系统的架构风格。

2.1. Reactive Manifesto

2.1.反应式宣言

Back in the year 2013, a team of developers, lead by Jonas Boner came together to define a set of core principles in a document known as the Reactive Manifesto. This is what laid the foundation for an architecture style to create Reactive Systems. Since then, this manifesto has gathered a lot of interest from the developer community.

早在2013年,由Jonas Boner领导的开发人员团队在一份被称为反应式宣言的文件中共同定义了一套核心原则。这就为创建反应式系统的架构风格奠定了基础。从那时起,这个宣言已经在开发者社区中聚集了大量的兴趣。

Basically, this document prescribes the recipe for a reactive system to be flexible, loosely-coupled, and scalable. This makes such systems easy to develop, tolerant of failures, and most importantly highly responsive, the underpinning for incredible user experiences.

基本上,这份文件规定了反应式系统的配方,即灵活、松散耦合和可扩展。这使得此类系统易于开发,能够容忍失败,而且最重要的是具有高度的响应性,这是令人难以置信的用户体验的基础。

So what is this secret recipe? Well, it’s hardly any secret! The manifesto defines the fundamental characteristics or principles of a reactive system:

那么这个秘方是什么呢?嗯,这几乎不是什么秘密宣言定义了一个反应式系统的基本特征或原则。

  • Responsive: A reactive system should provide a rapid and consistent response time and hence a consistent quality of service
  • Resilient: A reactive system should remain responsive in case of random failures through replication and isolation
  • Elastic: Such a system should remain responsive under unpredictable workloads through cost-effective scalability
  • Message-Driven: It should rely on asynchronous message passing between system components

These principles sound simple and sensible but aren’t always easier to implement in complex enterprise architecture. In this tutorial, we’ll develop a sample system in Java with these principles in mind!

这些原则听起来简单合理,但在复杂的企业架构中并不总是容易实现。在本教程中,我们将结合这些原则在Java中开发一个示例系统

3. What Is Reactive Programming?

3.什么是反应式编程?

Before we proceed, it’s important to understand the difference between reactive programming and reactive systems. We use both these terms quite often and easily misunderstand one for the other. As we’ve seen earlier, reactive systems are a result of a specific architectural style.

在我们继续之前,了解反应式编程和反应式系统之间的区别很重要。我们经常使用这两个术语,而且很容易把一个误解为另一个。正如我们前面看到的,反应式系统是一种特定的架构风格的结果。

In contrast, reactive programming is a programming paradigm where the focus is on developing asynchronous and non-blocking components. The core of reactive programming is a data stream that we can observe and react to, even apply back pressure as well. This leads to non-blocking execution and hence to better scalability with fewer threads of execution.

相比之下,反应式编程是一种编程范式,重点是开发异步和非阻塞组件。反应式编程的核心是一个数据流,我们可以对其进行观察和反应,甚至还可以施加反压力。这导致了非阻塞式的执行,从而以较少的执行线程实现了更好的可扩展性。

Now, this doesn’t mean that reactive systems and reactive programming are mutually exclusive. In fact, reactive programming is an important step towards realizing a reactive system, but it’s not everything!

现在,这并不意味着反应式系统和反应式编程是相互排斥的。事实上,反应式编程是实现反应式系统的一个重要步骤,但它不是全部!

3.1. Reactive Streams

3.1.反应式流

Reactive Streams is a community initiative that started back in the year 2013 to provide a standard for asynchronous stream processing with non-blocking backpressure. The objective here was to define a set of interfaces, methods, and protocols that can describe the necessary operations and entities.

Reactive Streams是一项社区倡议,该倡议早在2013年就开始了,旨在为具有非阻塞反压的异步流处理提供一个标准。这里的目标是定义一套能够描述必要操作和实体的接口、方法和协议。

Since then, several implementations in multiple programming languages have emerged which conforms to the reactive streams specification. These include Akka Streams, Ratpack, and Vert.x to name a few.

从那时起,已经出现了一些符合反应式流规范的多种编程语言的实现。其中包括Akka Streams、Ratpack和Vert.x,仅举几例。

3.2. Reactive Libraries for Java

3.2.Java的反应式库

One of the initial objectives behind the reactive streams was to be eventually included as an official Java standard library. As a result, the reactive streams specification is semantically equivalent to the Java Flow library, introduced in Java 9.

反应式流背后的最初目标之一是最终被列为官方的Java标准库。因此,反应式流规范在语义上等同于Java 9中引入的Java Flow库。

Apart from that, there are a few popular choices to implement reactive programming in Java:

除此以外,在Java中实现反应式编程有几个流行的选择。

  • Reactive Extensions: Popularly known as ReactiveX, they provide API for asynchronous programming with observable streams. These are available for multiple programming languages and platforms, including Java where it’s known as RxJava
  • Project Reactor: This is another reactive library, grounds-up based on the reactive streams specification, targetted towards building non-applications on the JVM. It also happens to be the foundation of the reactive stack in the Spring ecosystem

4. A Simple Application

4.一个简单的应用

For the purpose of this tutorial, we’ll develop a simple application based on microservices architecture with a minimal frontend. The application architecture should have enough elements to create a reactive system.

为了本教程的目的,我们将开发一个基于微服务架构的简单应用,并配备一个最小的前端。该应用架构应该有足够的元素来创建一个反应式系统。

For our application, we’ll adopt end-to-end reactive programming and other patterns and tools to accomplish the fundamental characteristics of a reactive system.

对于我们的应用,我们将采用端到端的反应式编程和其他模式和工具来完成反应式系统的基本特征。

4.1. Architecture

4.1.建筑

We’ll begin by defining a simple application architecture which doesn’t necessarily exhibit the characteristics of reactive systems. From there on, we’ll make the necessary changes to achieve these characteristics one by one.

我们将首先定义一个简单的应用程序架构,它不一定表现出反应式系统的特征。从那以后,我们将逐一进行必要的改变以实现这些特征。

So, first, let’s begin by defining a simple architecture:

因此,首先,让我们从定义一个简单的架构开始。

This is quite a simple architecture that has a bunch of microservices to facilitate a commerce use-case where we can place an order. It also has a frontend for user experience, and all communication happens as REST over HTTP. Moreover, every microservice manages their data in individual databases, a practice known as database-per-service.

这是一个相当简单的架构,它有一堆微服务来促进商务用例,我们可以下订单。它也有一个用于用户体验的前端,所有的通信都是通过HTTP的REST方式进行。此外,每个微服务都在单独的数据库中管理他们的数据,这种做法被称为 “每服务数据库”。

We’ll go ahead and create this simple application in the following sub-sections. This will be our base to understand the fallacies of this architecture and ways and means to adopt principles and practices so that we can transform this into a reactive system.

我们将在下面的小节中继续创建这个简单的应用程序。这将是我们了解这种架构的谬误的基础,以及采用原则和实践的方法和手段,以便我们能够将其转化为一个反应式系统。

4.3. Inventory Microservice

4.3.库存微服务

Inventory microservice will be responsible for managing a list of products and their current stock. It will also allow altering the stock as orders are processed. We’ll use Spring Boot with MongoDB to develop this service.

库存微服务将负责管理产品列表及其当前库存。它还将允许在处理订单时改变库存。我们将使用Spring Boot和MongoDB来开发这个服务。

Let’s begin by defining a controller to expose some endpoints:

让我们首先定义一个控制器来暴露一些端点。

@GetMapping
public List<Product> getAllProducts() {
    return productService.getProducts();
}
 
@PostMapping
public Order processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}
 
@DeleteMapping
public Order revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

and a service to encapsulate our business logic:

和一个服务来封装我们的业务逻辑。

@Transactional
public Order handleOrder(Order order) {       
    order.getLineItems()
      .forEach(l -> {
          Product> p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          if (p.getStock() >= l.getQuantity()) {
              p.setStock(p.getStock() - l.getQuantity());
              productRepository.save(p);
          } else {
              throw new RuntimeException("Product is out of stock: " + l.getProductId());
          }
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

@Transactional
public Order revertOrder(Order order) {
    order.getLineItems()
      .forEach(l -> {
          Product p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          p.setStock(p.getStock() + l.getQuantity());
          productRepository.save(p);
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

Note that we’re persisting the entities within a transaction, which ensures that no inconsistent state results in case of exceptions.

请注意,我们在一个事务中坚持实体,这确保在出现异常时不会产生不一致的状态。

Apart from these, we’ll also have to define the domain entities, the repository interface, and a bunch of configuration classes necessary for everything to work properly.

除了这些,我们还必须定义领域实体、资源库接口和一堆配置类,这些都是一切正常工作所必需的。

But since these are mostly boilerplate, we’ll avoid going through them, and they can be referred to in the GitHub repository provided in the last section of this article.

但由于这些大多是模板,我们将避免去看它们,它们可以在本文最后一节提供的GitHub仓库中参考。

4.4. Shipping Microservice

4.4.运送微服务

The shipping microservice will not be very different either. This will be responsible for checking if a shipment can be generated for the order and create one if possible.

航运微服务也不会有很大的不同。它将负责检查是否可以为订单生成货物并在可能的情况下创建一个货物。

As before we’ll define a controller to expose our endpoints, in fact just a single endpoint:

像以前一样,我们将定义一个控制器来暴露我们的端点,事实上只有一个端点。

@PostMapping
public Order process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

and a service to encapsulate the business logic related to order shipment:

和一个服务来封装与订单发货有关的业务逻辑。

public Order handleOrder(Order order) {
    LocalDate shippingDate = null;
    if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
      && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
        shippingDate = LocalDate.now().plusDays(1);
    } else {
        throw new RuntimeException("The current time is off the limits to place order.");
    }
    shipmentRepository.save(new Shipment()
      .setAddress(order.getShippingAddress())
      .setShippingDate(shippingDate));
    return order.setShippingDate(shippingDate)
      .setOrderStatus(OrderStatus.SUCCESS);
}

Our simple shipping service is just checking the valid time window to place orders. We’ll avoid discussing the rest of the boilerplate code as before.

我们简单的运输服务只是检查有效的时间窗口来下订单。我们将像以前一样避免讨论其余的模板代码。

4.5. Order Microservice

4.5.订单微服务

Finally, we’ll define an order microservice which will be responsible for creating a new order apart from other things. Interestingly, it’ll also play as an orchestrator service where it will communicate with the inventory service and the shipping service for the order.

最后,我们将定义一个订单微服务,它将负责创建一个新的订单,而不是其他事情。有趣的是,它还将作为一个协调器服务,与库存服务和订单的运输服务进行通信。

Let’s define our controller with the required endpoints:

让我们用所需的端点来定义我们的控制器。

@PostMapping
public Order create(@RequestBody Order order) {
    Order processedOrder = orderService.createOrder(order);
    if (OrderStatus.FAILURE.equals(processedOrder.getOrderStatus())) {
        throw new RuntimeException("Order processing failed, please try again later.");
    }
    return processedOrder;
}
@GetMapping
public List<Order> getAll() {
    return orderService.getOrders();
}

And, a service to encapsulate the business logic related to orders:

还有,一个服务来封装与订单有关的业务逻辑。

public Order createOrder(Order order) {
    boolean success = true;
    Order savedOrder = orderRepository.save(order);
    Order inventoryResponse = null;
    try {
        inventoryResponse = restTemplate.postForObject(
          inventoryServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
    }
    Order shippingResponse = null;
    try {
        shippingResponse = restTemplate.postForObject(
          shippingServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
        HttpEntity<Order> deleteRequest = new HttpEntity<>(order);
        ResponseEntity<Order> deleteResponse = restTemplate.exchange(
          inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class);
    }
    if (success) {
        savedOrder.setOrderStatus(OrderStatus.SUCCESS);
        savedOrder.setShippingDate(shippingResponse.getShippingDate());
    } else {
        savedOrder.setOrderStatus(OrderStatus.FAILURE);
    }
    return orderRepository.save(savedOrder);
}

public List<Order> getOrders() {
    return orderRepository.findAll();
}

The handling of orders where we’re orchestrating calls to inventory and shipping services is far from ideal. Distributed transactions with multiple microservices is a complex topic in itself and beyond the scope of this tutorial.

在我们协调调用库存和运输服务的情况下,对订单的处理远非理想。带有多个微服务的分布式交易本身就是一个复杂的话题,超出了本教程的范围

However, we’ll see later in this tutorial how a reactive system can avoid the need for distributed transactions to a certain extent.

然而,我们将在本教程的后面看到反应式系统如何在一定程度上避免对分布式事务的需求。

As before, we’ll not go through the rest of the boilerplate code. However, this can be referenced in the GitHub repo.

和以前一样,我们不会去看其余的模板代码。然而,这可以在GitHub repo中得到参考。

4.6. Front-end

4.6.前端

Let’s also add a user interface to make the discussion complete. The user interface will be based on Angular and will be a simple single-page application.

让我们也添加一个用户界面,使讨论完整。该用户界面将基于Angular,是一个简单的单页应用程序。

We’ll need to create a simple component in Angular to handle create and fetch orders. Of specific importance is the part where we call our API to create the order:

我们需要在Angular中创建一个简单的组件来处理创建和获取订单。特别重要的是我们调用我们的API来创建订单的部分。

createOrder() {
    let headers = new HttpHeaders({'Content-Type': 'application/json'});
    let options = {headers: headers}
    this.http.post('http://localhost:8080/api/orders', this.form.value, options)
      .subscribe(
        (response) => {
          this.response = response
        },
        (error) => {
          this.error = error
        }
      )
}

The above code snippet expects order data to be captured in a form and available within the scope of the component. Angular offers fantastic support for creating simple to complex forms using reactive and template-driven forms.

上述代码片段期望订单数据被捕获到表单中,并在组件的范围内可用。Angular 为使用反应式和模板驱动的表单创建简单到复杂的表单提供了出色的支持。

Also important is the part where we get previously created orders:

同样重要的是,我们得到以前创建的订单的部分。

getOrders() {
  this.previousOrders = this.http.get(''http://localhost:8080/api/orders'')
}

Please note that the Angular HTTP module is asynchronous in nature and hence returns RxJS Observables. We can handle the response in our view by passing them through an async pipe:

请注意,Angular HTTP 模块异步性质的,因此返回RxJS Observables。我们可以在我们的视图中通过异步管道来处理这些响应。

<div class="container" *ngIf="previousOrders !== null">
  <h2>Your orders placed so far:</h2>
  <ul>
    <li *ngFor="let order of previousOrders | async">
      <p>Order ID: {{ order.id }}, Order Status: {{order.orderStatus}}, Order Message: {{order.responseMessage}}</p>
    </li>
  </ul>
</div>

Of course, Angular will require templates, styles, and configurations to work, but these can be referred to in the GitHub repository. Please note that we have bundled everything in a single component here, which is ideally not something we should do.

当然,Angular需要模板、样式和配置才能工作,但这些可以在GitHub仓库中参考。请注意,我们在这里把所有东西都捆绑在一个组件中,这在理想情况下是不应该这样做的。

But, for this tutorial, those concerns are not in scope.

但是,在本教程中,这些问题不在范围之内。

4.7. Deploying the Application

4.7.部署应用程序

Now that we’ve created all individual parts of the application, how should we go about deploying them? Well, we can always do this manually. But we should be careful that it can soon become tedious.

现在我们已经创建了应用程序的所有个别部分,我们应该如何去部署它们?好吧,我们总是可以手动完成这个工作。但我们应该注意,这很快就会变得乏味。

For this tutorial, we’ll use Docker Compose to build and deploy our application on a Docker Machine. This will require us to add a standard Dockerfile in each service and create a Docker Compose file for the entire application.

在本教程中,我们将使用Docker Compose在Docker机器上构建和部署我们的应用程序。这将需要我们在每个服务中添加一个标准的Docker文件,并为整个应用程序创建一个Docker Compose文件。

Let’s see how this docker-compose.yml file looks:

让我们看看这个docker-compose.yml文件看起来如何。

version: '3'
services:
  frontend:
    build: ./frontend
    ports:
      - "80:80"
  order-service:
    build: ./order-service
    ports:
      - "8080:8080"
  inventory-service:
    build: ./inventory-service
    ports:
      - "8081:8081"
  shipping-service:
    build: ./shipping-service
    ports:
      - "8082:8082"

This is a fairly standard definition of services in Docker Compose and does not require any special attention.

这是Docker Compose中对服务的一个相当标准的定义,不需要任何特别注意。

4.8. Problems With This Architecture

4.8.这种结构的问题

Now that we have a simple application in place with multiple services interacting with each other, we can discuss the problems in this architecture. There are what we’ll try to address in the following sections and eventually get to the state where we would have transformed our application into a reactive system!

现在我们已经有了一个简单的应用,有多个服务在相互作用,我们可以讨论这个架构中的问题。在下面的章节中,我们将尝试解决这些问题,并最终达到将我们的应用程序转变为一个反应式系统的状态

While this application is far from a production-grade software and there are several issues, we’ll focus on the issues that pertain to the motivations for reactive systems:

虽然这个应用远不是一个生产级的软件,而且有几个问题,但我们将关注与反应式系统的动机有关的问题

  • Failure in either inventory service or shipping service can have a cascading effect
  • The calls to external systems and database are all blocking in nature
  • The deployment cannot handle failures and fluctuating loads automatically

5. Reactive Programming

5.反应式编程

Blocking calls in any program often result in critical resources just waiting for things to happen. These include database calls, calls to web services, and file system calls. If we can free up threads of execution from this waiting and provide a mechanism to circle back once results are available, it will yield much better resource utilization.

任何程序中的阻塞调用往往导致关键资源只是在等待事情发生。这包括数据库调用、对网络服务的调用和文件系统调用。如果我们能将执行线程从这种等待中解放出来,并提供一种机制,一旦有了结果,就能回圈,这将产生更好的资源利用率。

This is what adopting the reactive programming paradigm does for us. While it’s possible to switch over to a reactive library for many of these calls, it may not be possible for everything. For us, fortunately, Spring makes it much easier to use reactive programming with MongoDB and REST APIs:

这就是采用反应式编程范式为我们做的事情。虽然有可能在许多这样的调用中改用反应式库,但未必能做到万无一失。对我们来说,幸运的是,Spring使我们更容易使用反应式编程与MongoDB和REST API。

Spring Data Mongo has support for reactive access through the MongoDB Reactive Streams Java Driver. It provides ReactiveMongoTemplate and ReactiveMongoRepository, both of which have extensive mapping functionality.

Spring Data Mongo通过 MongoDB Reactive Streams Java Driver 支持反应式访问。它提供了ReactiveMongoTemplateReactiveMongoRepository,两者都具有广泛的映射功能。

Spring WebFlux provides the reactive-stack web framework for Spring, enabling non-blocking code and Reactive Streams backpressure. It leverages the Reactor as its reactive library. Further, it provides WebClient for performing HTTP requests with Reactive Streams backpressure. It uses Reactor Netty as the HTTP client library.

Spring WebFlux为Spring提供了反应堆网络框架,实现了非阻塞代码和反应流的反压。它利用Reactor作为其反应性库。此外,它还提供了WebClient,用于执行具有反应流背压的HTTP请求。它使用Reactor Netty作为HTTP客户端库。

5.1. Inventory Service

5.1.库存服务

We’ll begin by changing our endpoints to emit reactive publishers:

我们将首先改变我们的端点,使其发出反应式发布器。

@GetMapping
public Flux<Product> getAllProducts() {
    return productService.getProducts();
}
@PostMapping
public Mono<Order> processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}

@DeleteMapping
public Mono<Order> revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

Obviously, we’ll have to make necessary changes to the service as well:

很明显,我们也必须对服务进行必要的修改。

@Transactional
public Mono<Order> handleOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          if (p.getStock() >= q) {
              p.setStock(p.getStock() - q);
              return productRepository.save(p);
          } else {
              return Mono.error(new RuntimeException("Product is out of stock: " + p.getId()));
          }
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

@Transactional
public Mono<Order> revertOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          p.setStock(p.getStock() + q);
          return productRepository.save(p);
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

5.2. Shipping Service

5.2.运输服务

Similarly, we’ll change the endpoint of our shipping service:

同样地,我们将改变我们的运输服务的端点。

@PostMapping
public Mono<Order> process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

And, corresponding changes in the service to leverage reactive programming:

而且,在服务中进行相应的改变,以利用反应式编程。

public Mono<Order> handleOrder(Order order) {
    return Mono.just(order)
      .flatMap(o -> {
          LocalDate shippingDate = null;
          if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
            && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
              shippingDate = LocalDate.now().plusDays(1);
          } else {
              return Mono.error(new RuntimeException("The current time is off the limits to place order."));
          }
          return shipmentRepository.save(new Shipment()
            .setAddress(order.getShippingAddress())
            .setShippingDate(shippingDate));
      })
      .map(s -> order.setShippingDate(s.getShippingDate())
        .setOrderStatus(OrderStatus.SUCCESS));
    }

5.3. Order Service

5.3 订购服务

We’ll have to make similar changes in the endpoints of the order service:

我们将不得不在订单服务的端点上做类似的改变。

@PostMapping
public Mono<Order> create(@RequestBody Order order) {
    return orderService.createOrder(order)
      .flatMap(o -> {
          if (OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return Mono.error(new RuntimeException("Order processing failed, please try again later. " + o.getResponseMessage()));
          } else {
              return Mono.just(o);
          }
      });
}

@GetMapping
public Flux<Order> getAll() {
    return orderService.getOrders();
}

The changes to service will be more involved as we’ll have to make use of Spring WebClient to invoke the inventory and shipping reactive endpoints:

对服务的改变将涉及更多,因为我们必须利用Spring WebClient来调用库存和运输的反应式端点。

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .flatMap(o -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(o))
            .exchange();
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return webClient.method(HttpMethod.POST)
                .uri(shippingServiceUrl)
                .body(BodyInserters.fromValue(o))
                .exchange();
          } else {
              return Mono.just(o);
          }
      })
      .onErrorResume(err -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(order))
            .retrieve()
            .bodyToMono(Order.class)
            .map(o -> o.setOrderStatus(OrderStatus.FAILURE)
              .setResponseMessage(err.getMessage()));
      })
      .map(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return order.setShippingDate(o.getShippingDate())
                .setOrderStatus(OrderStatus.SUCCESS);
          } else {
              return order.setOrderStatus(OrderStatus.FAILURE)
                .setResponseMessage(o.getResponseMessage());
          }
      })
      .flatMap(orderRepository::save);
}

public Flux<Order> getOrders() {
    return orderRepository.findAll();
}

This kind of orchestration with reactive APIs is no easy exercise and often error-prone as well as hard to debug. We’ll see how this can be simplified in the next section.

这种带有反应式API的协调工作并不容易,往往容易出错,也很难调试。我们将在下一节中看到如何简化这一过程。

5.4. Front-end

5.4.前端

Now, that our APIs are capable of streaming events as they occur, it’s quite natural that we should be able to leverage that in our front-end as well. Fortunately, Angular supports EventSource, the interface for Server-Sent Events.

既然我们的 API 能够在事件发生时进行流式处理,那么很自然地,我们也应该能够在我们的前端利用这一功能。幸运的是,Angular 支持EventSource服务器发送事件的接口

Let’s see how can we pull and process all our previous orders as a stream of events:

让我们看看如何能把我们以前的所有订单作为一个事件流来拉动和处理。

getOrderStream() {
    return Observable.create((observer) => {
        let eventSource = new EventSource('http://localhost:8080/api/orders')
        eventSource.onmessage = (event) => {
            let json = JSON.parse(event.data)
            this.orders.push(json)
            this._zone.run(() => {
                observer.next(this.orders)
            })
        }
        eventSource.onerror = (error) => {
            if(eventSource.readyState === 0) {
                eventSource.close()
                this._zone.run(() => {
                    observer.complete()
                })
            } else {
                this._zone.run(() => {
                    observer.error('EventSource error: ' + error)
                })
            }
        }
    })
}

6. Message-Driven Architecture

6.消息驱动的架构

The first problem we’re going to address is related to service-to-service communication. Right now, these communications are synchronous, which presents several problems. These include cascading failures, complex orchestration, and distributed transactions to name a few.

我们要解决的第一个问题与服务间的通信有关。现在,这些通信是同步的,这带来了几个问题。这些问题包括级联故障、复杂的协调和分布式事务,仅举几例。

An obvious way to solve this problem is to make these communications asynchronous. A message broker for facilitating all service-to-service communication can do the trick for us. We’ll use Kafka as our message broker and Spring for Kafka to produce and consume messages:

解决这个问题的一个明显方法是使这些通信成为异步的。一个用于促进所有服务间通信的消息代理可以为我们带来好处。我们将使用Kafka作为我们的消息代理和Spring for Kafka来产生和消费消息。

We’ll use a single topic to produce and consume order messages with different order statuses for services to react.

我们将使用一个主题来产生和消费具有不同订单状态的订单消息,以便服务做出反应。

Let’s see how each service needs to change.

让我们看看每项服务需要如何改变。

6.1. Inventory Service

6.1.库存服务

Let’s begin by defining the message producer for our inventory service:

让我们首先为我们的库存服务定义消息生产者。

@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendMessage(Order order) {
    this.kafkaTemplate.send("orders", order);
}

Next, we’ll have to define a message consumer for inventory service to react to different messages on the topic:

接下来,我们要为库存服务定义一个消息消费者,以便对主题上的不同消息做出反应。

@KafkaListener(topics = "orders", groupId = "inventory")
public void consume(Order order) throws IOException {
    if (OrderStatus.RESERVE_INVENTORY.equals(order.getOrderStatus())) {
        productService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    } else if (OrderStatus.REVERT_INVENTORY.equals(order.getOrderStatus())) {
        productService.revertOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

This also means that we can safely drop some of the redundant endpoints from our controller now. These changes are sufficient to achieve asynchronous communication in our application.

这也意味着我们现在可以安全地从我们的控制器中删除一些冗余的端点。这些变化足以在我们的应用程序中实现异步通信。

6.2. Shipping Service

6.2.运输服务

The changes in shipping service are relatively similar to what we did earlier with the inventory service. The message producer is the same, and the message consumer is specific to shipping logic:

运输服务的变化与我们之前对库存服务所做的相对类似。消息生产者是相同的,而消息消费者是针对运输逻辑的。

@KafkaListener(topics = "orders", groupId = "shipping")
public void consume(Order order) throws IOException {
    if (OrderStatus.PREPARE_SHIPPING.equals(order.getOrderStatus())) {
        shippingService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_SUCCESS)
                .setShippingDate(o.getShippingDate()));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

We can safely drop all the endpoints in our controller now as we no longer need them.

我们现在可以安全地放弃控制器中的所有端点,因为我们不再需要它们了。

6.3. Order Service

6.3 订购服务

The changes in order service will be a little more involved as this is where we were doing all the orchestration earlier.

订单服务中的变化将涉及更多,因为这是我们之前做所有协调工作的地方。

Nevertheless, the message producer remains unchanged, and message consumer takes on order service-specific logic:

然而,消息生产者保持不变,而消息消费者则承担了特定于订单服务的逻辑。

@KafkaListener(topics = "orders", groupId = "orders")
public void consume(Order order) throws IOException {
    if (OrderStatus.INITIATION_SUCCESS.equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.RESERVE_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("INVENTORY-SUCCESS".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.PREPARE_SHIPPING));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("SHIPPING-FAILURE".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.REVERT_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else {
        orderRepository.findById(order.getId())
          .map(o -> {
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    }
}

The consumer here is merely reacting to order messages with different order statuses. This is what gives us the choreography between different services.

这里的消费者只是对具有不同订单状态的订单信息做出反应。这就是我们在不同服务之间进行编排的原因。

Lastly, our order service will also have to change to support this choreography:

最后,我们的订单服务也将不得不改变,以支持这种编排。

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .map(o -> {
          orderProducer.sendMessage(o.setOrderStatus(OrderStatus.INITIATION_SUCCESS));
          return o;
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(orderRepository::save);
}

Note that this is far simpler than the service we had to write with reactive endpoints in the last section. Asynchronous choreography often results in far simpler code, although it does come at the cost of eventual consistency and complex debugging and monitoring. As we may guess, our front-end will no longer get the final status of the order immediately.

请注意,这比我们在上一节中不得不用反应式端点编写的服务要简单得多。异步的核心技术往往能带来简单得多的代码,尽管它确实以最终的一致性和复杂的调试和监控为代价。正如我们可能猜到的,我们的前端将不再立即获得订单的最终状态。

7. Container Orchestration Service

7.容器协调服务

The last piece of the puzzle that we want to solve is related to deployment.

我们要解决的最后一块难题与部署有关。

What we want in the application is ample redundancy and a tendency to scale up or down depending upon the need automatically.

我们在应用中想要的是充足的冗余和根据需要自动扩大或缩小的趋势。

We’ve already achieved containerization of services through Docker and are managing dependencies between them through Docker Compose. While these are fantastic tools in their own right, they do not help us to achieve what we want.

我们已经通过Docker实现了服务的容器化,并且正在通过Docker Compose管理它们之间的依赖关系。虽然这些工具本身是非常棒的,但它们并不能帮助我们实现我们想要的东西。

Hence, we need a container orchestration service that can take care of redundancy and scalability in our application. While there are several options, one of the popular ones includes Kubernetes. Kubernetes provides us with a cloud vendor-agnostic way to achieve highly scalable deployments of containerized workloads.

因此,我们需要一个容器编排服务,可以照顾到我们应用程序的冗余和可扩展性。虽然有几个选择,但其中一个流行的选择包括Kubernetes。Kubernetes为我们提供了一种与云厂商无关的方式,以实现容器化工作负载的高度可扩展部署。

Kubernetes wraps containers like Docker into Pods, which are the smallest unit of deployment. Further, we can use Deployment to describe the desired state declaratively.

Kubernetes将Docker等容器包装成Pod,它是最小的部署单元。此外,我们可以使用部署来声明性地描述所需状态。

Deployment creates ReplicaSets, which internally is responsible for bringing up the pods. We can describe a minimum number of identical pods that should be running at any point in time. This provides redundancy and hence high availability.

部署会创建ReplicaSets,它在内部负责把pod带起来。我们可以描述在任何时间点都应该运行的最低数量的相同的pod。这提供了冗余,从而提供了高可用性。

Let’s see how can we define a Kubernetes deployment for our applications:

让我们看看如何为我们的应用程序定义一个Kubernetes部署。

apiVersion: apps/v1
kind: Deployment
metadata: 
  name: inventory-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: inventory-deployment
  template: 
    metadata: 
      labels: 
        name: inventory-deployment
    spec: 
      containers:
      - name: inventory
        image: inventory-service-async:latest
        ports: 
        - containerPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: shipping-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: shipping-deployment
  template: 
    metadata: 
      labels: 
        name: shipping-deployment
    spec: 
      containers:
      - name: shipping
        image: shipping-service-async:latest
        ports: 
        - containerPort: 8082
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: order-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: order-deployment
  template: 
    metadata: 
      labels: 
        name: order-deployment
    spec: 
      containers:
      - name: order
        image: order-service-async:latest
        ports: 
        - containerPort: 8080

Here we’re declaring our deployment to maintain three identical replicas of pods at any time. While this is a good way to add redundancy, it may not be sufficient for varying loads. Kubernetes provides another resource known as the Horizontal Pod Autoscaler which can scale the number of pods in a deployment based on observed metrics like CPU utilization.

在这里,我们要声明我们的部署在任何时候都要保持三个相同的pod副本。虽然这是一个增加冗余的好方法,但对于不同的负载来说,这可能是不够的。Kubernetes提供了另一种资源,即Horizontal Pod Autoscaler,它可以根据观察到的指标(如CPU利用率)来缩放部署中的pod数量。

Please note that we have just covered the scalability aspects of the application hosted on a Kubernetes cluster. This does not necessarily imply that the underlying cluster itself is scalable. Creating a high availability Kubernetes cluster is a non-trivial task and beyond the scope of this tutorial.

请注意,我们只是涵盖了托管在Kubernetes集群上的应用程序的可扩展性方面。这并不一定意味着底层集群本身是可扩展的。创建一个高可用性的Kubernetes集群是一项非同寻常的任务,超出了本教程的范围。

8. Resulting Reactive System

8.形成的反应性系统

Now that we’ve made several improvements in our architecture, it’s perhaps time to evaluate this against the definition of a Reactive System. We’ll keep the evaluation against the four characteristics of a Reactive Systems we discussed earlier in the tutorial:

现在我们已经对我们的架构做了一些改进,也许是时候对照反应式系统的定义来评估一下了。我们将继续根据我们在本教程前面讨论的反应式系统的四个特征进行评估。

  • Responsive: The adoption of the reactive programming paradigm should help us achieve end-to-end non-blocking and hence a responsive application
  • Resilient: Kubernetes deployment with ReplicaSet of the desired number of pods should provide resilience against random failures
  • Elastic: Kubernetes cluster and resources should provide us the necessary support to be elastic in the face of unpredictable loads
  • Message-Driven: Having all service-to-service communication handled asynchronously through a Kafka broker should help us here

While this looks quite promising, it’s far from over. To be honest, the quest for a truly reactive system should be a continuous exercise of improvements. We can never preempt all that can fail in a highly complex infrastructure, where our application is just a small part.

虽然这看起来很有希望,但还远远没有结束。说实话,对一个真正的反应式系统的要求应该是一个持续的改进工作。我们永远无法预知高度复杂的基础设施中可能出现的所有故障,而我们的应用程序只是其中的一小部分。

A reactive system thus will demand reliability from every part that makes the whole. Right from the physical network to infrastructure services like DNS, they all should fall in line to help us achieve the end goal.

因此,一个反应式系统将要求构成整体的每一个部分的可靠性。从物理网络到像DNS这样的基础设施服务,它们都应该落到实处,帮助我们实现最终目标。

Often, it may not be possible for us to manage and provide the necessary guarantees for all these parts. And this is where a managed cloud infrastructure helps alleviate our pain. We can choose from a host of services like IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service), and PaaS (Platform-as-a-Service) to delegate the responsibilities to external parties. This leaves us with the responsibility of our application as far as possible.

通常,我们可能无法对所有这些部分进行管理和提供必要的保障。而这正是管理型云计算基础设施帮助我们减轻痛苦的地方。我们可以从IaaS(Infeastrure-as-a-Service)、BaaS(Backend-as-a-Service)和PaaS(Platform-as-a-Service)等一系列服务中选择,将责任委托给外部各方。这让我们尽可能地承担起我们的应用程序的责任。

9. Conclusion

9.结语

In this tutorial, we went through the basics of reactive systems and how does it compare with reactive programming. We created a simple application with multiple microservices and highlighted the problems we intend to solve with a reactive system.

在本教程中,我们了解了反应式系统的基础知识,以及它与反应式编程的比较。我们创建了一个有多个微服务的简单应用,并强调了我们打算用反应式系统解决的问题。

Further, we went ahead, introducing reactive programming, message-based architecture, and container orchestration service in the architecture to realize a reactive system.

此外,我们继续前进,在架构中引入了反应式编程、基于消息的架构和容器协调服务,以实现一个反应式系统。

Lastly, we discussed the resulting architecture and how it remains a journey towards the reactive system! This tutorial does not introduce us to all the tools, frameworks, or patterns which can help us create a reactive system, but it introduces us to the journey.

最后,我们讨论了由此产生的架构,以及它如何仍然是一个走向反应式系统的旅程!本教程并没有向我们介绍所有可以帮助我们创建反应式系统的工具、框架或模式,但它向我们介绍了这个旅程。

As usual, the source code for this article can be found over on GitHub.

像往常一样,本文的源代码可以在GitHub上找到超过