A Guide to the Axon Framework – Axon框架指南

最后修改: 2017年 3月 10日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll be looking at Axon and how it helps us implement applications with CQRS (Command Query Responsibility Segregation) and Event Sourcing in mind.

在这篇文章中,我们将关注Axon,以及它如何帮助我们实现具有CQRS(命令查询责任隔离)和Event Sourcing的应用。

During this guide, both Axon Framework and Axon Server will be utilized. The former will contain our implementation and the latter will be our dedicated Event Store and Message Routing solution.

在本指南中,Axon Framework和Axon Server都将被使用。前者将包含我们的实现,后者将是我们专用的事件存储和消息路由解决方案。

The sample application we’ll be building focuses on an Order domain. For this, we’ll be leveraging the CQRS and Event Sourcing building blocks Axon provides us.

我们将构建的示例应用程序侧重于Order域。为此,我们将利用Axon提供的CQRS和Event Sourcing构建模块

Note that a lot of the shared concepts come right out of DDD, which is beyond the scope of this current article.

请注意,很多共享的概念直接来自DDD,,这超出了目前这篇文章的范围。

2. Maven Dependencies

2.Maven的依赖性

We’ll create an Axon / Spring Boot application. Hence, we need to add the latest axon-spring-boot-starter dependency to our pom.xml, as well as the axon-test dependency for testing.
To use matching versions, we will use the axon-bom within our dependency management section:

我们将创建一个Axon / Spring Boot应用程序。因此,我们需要将最新的axon-spring-boot-starter依赖性添加到我们的pom.xml,以及axon-test依赖性,用于测试。
为了使用匹配的版本,我们将在我们的依赖性管理部分使用axon-bom

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-bom</artifactId>
            <version>4.5.13</version>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3. Axon Server

3.Axon服务器

We will use Axon Server to be our Event Store and our dedicated command, event and query routing solution.

我们将使用Axon Server作为我们的Event Store和我们专用的命令、事件和查询路由解决方案。

As an Event Store, it gives us the ideal characteristics required when storing events. This article provides background on why this is desirable.

作为一个事件存储,它为我们提供了存储事件时所需要的理想特性。这篇文章提供了关于为什么这是理想的背景。

As a Message Routing solution, it gives us the option to connect several instances together without focusing on configuring things like a RabbitMQ or a Kafka topic to share and dispatch messages.

作为一个消息路由解决方案,它为我们提供了将几个实例连接在一起的选择,而不必专注于配置RabbitMQ或Kafka主题等东西来共享和分配消息。

Axon Server can be downloaded here. As it is a simple JAR file, the following operation suffices to start it up:

Axon服务器可以在这里下载。由于它是一个简单的JAR文件,以下操作足以启动它。

java -jar axonserver.jar

This will start a single Axon Server instance which is accessible through localhost:8024. The endpoint provides an overview of the connected applications and the messages they can handle, as well as a querying mechanism towards the Event Store contained within Axon Server.

这将启动一个单一的Axon服务器实例,可通过localhost:8024访问。该端点提供了一个连接的应用程序和它们可以处理的消息的概述,以及对Axon服务器内包含的事件存储的查询机制。

The default configuration of Axon Server together with the axon-spring-boot-starter dependency will ensure our Order service will automatically connect to it.

Axon服务器的默认配置和axon-spring-boot-starter依赖关系将确保我们的订单服务会自动连接到它。

4. Order Service API – Commands

4.订单服务API–命令

We’ll set up our Order service with CQRS in mind. Therefore we’ll emphasize the messages that flow through our application.

我们将以CQRS的方式来设置我们的订单服务。因此,我们将强调流经我们应用程序的消息。

First, we’ll define the Commands, meaning the expressions of intent. The Order service is capable of handling three different types of actions:

首先,我们将定义命令,即意图的表达。订单服务能够处理三种不同类型的行动。

  1. Creating a new order
  2. Confirming an order
  3. Shipping an order

Naturally, there will be three command messages that our domain can deal with — CreateOrderCommand, ConfirmOrderCommand, and ShipOrderCommand:

当然,我们的域可以处理的命令信息有三个–CreateOrderCommand,ConfirmOrderCommand, 和ShipOrderCommand

public class CreateOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
    private final String productId;
 
    // constructor, getters, equals/hashCode and toString 
}
public class ConfirmOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
    
    // constructor, getters, equals/hashCode and toString
}
public class ShipOrderCommand {
 
    @TargetAggregateIdentifier
    private final String orderId;
 
    // constructor, getters, equals/hashCode and toString
}

The TargetAggregateIdentifier annotation tells Axon that the annotated field is an id of a given aggregate to which the command should be targeted. We’ll briefly touch on aggregates later in this article.

TargetAggregateIdentifier注释告诉Axon,被注释的字段是一个给定的集合的id,该命令应以该集合为目标。我们将在本文的后面简要介绍集合的情况。

Also, note that we marked the fields in the commands as final. This is intentional, as it’s a best practice for any message implementation to be immutable.

另外,请注意,我们将命令中的字段标记为final.,这是故意的,因为任何消息的实现都是不可改变的,这是一个最佳实践。

5. Order Service API – Events

5.订单服务API – 事件

Our aggregate will handle the commands, as it’s in charge of deciding if an Order can be created, confirmed, or shipped.

我们的聚合体将处理这些命令,因为它负责决定一个订单是否可以被创建、确认或运送。

It will notify the rest of the application of its decision by publishing an event. We’ll have three types of events — OrderCreatedEvent, OrderConfirmedEvent, and OrderShippedEvent:

它将通过发布一个事件来通知应用程序的其他部分其决定。我们将有三种类型的事件 – OrderCreatedEvent, OrderConfirmedEvent, 和OrderShippedEvent

public class OrderCreatedEvent {
 
    private final String orderId;
    private final String productId;
 
    // default constructor, getters, equals/hashCode and toString
}
public class OrderConfirmedEvent {
 
    private final String orderId;
 
    // default constructor, getters, equals/hashCode and toString
}
public class OrderShippedEvent { 

    private final String orderId; 

    // default constructor, getters, equals/hashCode and toString 
}

6. The Command Model – Order Aggregate

6.命令模式–订单总和

Now that we’ve modeled our core API with respect to the commands and events, we can start creating the Command Model.

现在我们已经在命令和事件方面为我们的核心API建模,我们可以开始创建命令模型。

The Aggregate is a regular component within the Command Model and stems from DDD. Other frameworks use the concept too, as is for example seen in this article about persisting DDD aggregates with Spring.

聚合是命令模型中的一个常规组件,源于DDD。其他框架也使用这一概念,例如在这篇关于用Spring持久化DDD聚合的文章中可以看到。

As our domain focuses on dealing with Orders, we’ll create an OrderAggregate as the centre of our Command Model.

由于我们的领域专注于处理订单,我们将创建一个OrderAggregate作为我们命令模型的中心。

6.1. Aggregate Class

6.1.汇总类

Thus, let’s create our basic aggregate class:

因此,让我们来创建我们的基本聚合类。

@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(CreateOrderCommand command) {
        AggregateLifecycle.apply(new OrderCreatedEvent(command.getOrderId(), command.getProductId()));
    }

    @EventSourcingHandler
    public void on(OrderCreatedEvent event) {
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    protected OrderAggregate() { }
}

The Aggregate annotation is an Axon Spring specific annotation marking this class as an aggregate. It will notify the framework that the required CQRS and Event Sourcing specific building blocks need to be instantiated for this OrderAggregate.

Aggregate注解是Axon Spring的特定注解,它将通知框架需要为这个OrderAggregate实例化所需的CQRS和Event Sourcing特定构建块。

As an aggregate will handle commands that are targeted to a specific aggregate instance, we need to specify the identifier with the AggregateIdentifier annotation.

由于聚合将处理针对特定聚合实例的命令,我们需要用AggregateIdentifier注释来指定标识符。

Our aggregate will commence its life cycle upon handling the CreateOrderCommand in the OrderAggregate ‘command handling constructor’. To tell the framework that the given function is able to handle commands, we’ll add the CommandHandler annotation.

我们的聚合体将在处理CreateOrderCommand中的OrderAggregate“命令处理构造函数 “时开始其生命周期。为了告诉框架该函数能够处理命令,我们将添加CommandHandler注解。

When handling the CreateOrderCommand, it will notify the rest of the application that an order was created by publishing the OrderCreatedEvent. To publish an event from within an aggregate, we’ll use AggregateLifecycle#apply(Object…).

当处理CreateOrderCommand时,它将通过发布OrderCreatedEvent来通知应用程序的其他部分,即订单已经创建。为了从聚合中发布一个事件,我们将使用AggregateLifecycle#apply(Object…)

From this point, we can actually start to incorporate Event Sourcing as the driving force to recreate an aggregate instance from its stream of events.

从这一点来看,我们实际上可以开始将事件源作为驱动力,从其事件流中重新创建一个聚合实例。

We start this off with the ‘aggregate creation event’, the OrderCreatedEvent, which is handled in an EventSourcingHandler annotated function to set the orderId and orderConfirmed state of the Order aggregate.

我们从 “聚合创建事件 “开始,OrderCreatedEvent,它在一个EventSourcingHandler注释的函数中被处理,以设置订单聚合的orderIdorderConfirmed状态。

Also note that to be able to source an aggregate based on its events, Axon requires a default constructor.

还需要注意的是,为了能够根据事件来确定一个聚合体的来源,Axon需要一个默认的构造函数。

6.2. Aggregate Command Handlers

6.2.聚合命令处理程序

Now that we have our basic aggregate, we can start implementing the remaining command handlers:

现在我们有了基本的聚合,我们可以开始实现其余的命令处理程序。

@CommandHandler 
public void handle(ConfirmOrderCommand command) { 
    if (orderConfirmed) {
        return;
    }
    apply(new OrderConfirmedEvent(orderId)); 
} 

@CommandHandler 
public void handle(ShipOrderCommand command) { 
    if (!orderConfirmed) { 
        throw new UnconfirmedOrderException(); 
    } 
    apply(new OrderShippedEvent(orderId)); 
} 

@EventSourcingHandler 
public void on(OrderConfirmedEvent event) { 
    orderConfirmed = true; 
}

The signature of our command and event sourcing handlers simply states handle({the-command}) and on({the-event}) to maintain a concise format.

我们的命令和事件源处理程序的签名只是简单地说明handle({the-command}) on({the-event}) 以保持简洁的格式。

Additionally, we’ve defined that an Order can only be confirmed once and shipped if it has been confirmed. Thus, we’ll ignore the command in the former, and throw an UnconfirmedOrderException if the latter isn’t the case.

此外,我们已经定义了一个订单只能被确认一次,并且如果它已经被确认就可以发货。因此,我们将忽略前者的命令,如果不是后者,则抛出一个UnconfirmedOrderException

This exemplifies the need for the OrderConfirmedEvent sourcing handler to update the orderConfirmed state to true for the Order aggregate.

这说明OrderConfirmedEvent源处理程序需要将orderConfirmed状态更新为true的订单集合。

7. Testing the Command Model

7.测试命令模型

First, we need to set up our test by creating a FixtureConfiguration for the OrderAggregate:

首先,我们需要通过为OrderAggregate创建FixtureConfiguration来设置我们的测试。

private FixtureConfiguration<OrderAggregate> fixture;

@Before
public void setUp() {
    fixture = new AggregateTestFixture<>(OrderAggregate.class);
}

The first test case should cover the simplest situation. When the aggregate handles the CreateOrderCommand, it should produce an OrderCreatedEvent:

第一个测试案例应该涵盖最简单的情况。当集合处理CreateOrderCommand时,它应该产生一个OrderCreatedEvent

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.givenNoPriorActivity()
  .when(new CreateOrderCommand(orderId, productId))
  .expectEvents(new OrderCreatedEvent(orderId, productId));

Next, we can test the decision-making logic of only being able to ship an Order if it’s been confirmed. Due to this, we have two scenarios — one where we expect an exception, and one where we expect an OrderShippedEvent.

接下来,我们可以测试只有当订单被确认后才能发货的决策逻辑。由于这个原因,我们有两种情况–一种是我们期待一个异常,另一种是我们期待一个OrderShippedEvent

Let’s take a look at the first scenario, where we expect an exception:

让我们看一下第一种情况,我们期待一个例外。

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId))
  .when(new ShipOrderCommand(orderId))
  .expectException(UnconfirmedOrderException.class);

And now the second scenario, where we expect an OrderShippedEvent:

现在是第二种情况,我们期待一个OrderShippedEvent

String orderId = UUID.randomUUID().toString();
String productId = "Deluxe Chair";
fixture.given(new OrderCreatedEvent(orderId, productId), new OrderConfirmedEvent(orderId))
  .when(new ShipOrderCommand(orderId))
  .expectEvents(new OrderShippedEvent(orderId));

8. The Query Model – Event Handlers

8.查询模型–事件处理程序

So far, we’ve established our core API with the commands and events, and we have the command model of our CQRS Order service, the OrderAggregate, in place.

到目前为止,我们已经用命令和事件建立了我们的核心API,而且我们已经有了CQRS订单服务的命令模型,即OrderAggregate,已经到位。

Next, we can start thinking of one of the Query Models our application should service.

接下来,我们可以开始考虑我们的应用程序应该服务的查询模型之一

One of these models is the Order:

这些模式之一是订单

public class Order {

    private final String orderId;
    private final String productId;
    private OrderStatus orderStatus;

    public Order(String orderId, String productId) {
        this.orderId = orderId;
        this.productId = productId;
        orderStatus = OrderStatus.CREATED;
    }

    public void setOrderConfirmed() {
        this.orderStatus = OrderStatus.CONFIRMED;
    }

    public void setOrderShipped() {
        this.orderStatus = OrderStatus.SHIPPED;
    }

    // getters, equals/hashCode and toString functions
}
public enum OrderStatus {
    CREATED, CONFIRMED, SHIPPED
}

We’ll update this model based on the events propagating through our system. A Spring Service bean to update our model will do the trick:

我们将根据通过我们系统传播的事件来更新这个模型。一个Spring Service Bean来更新我们的模型将起到作用。

@Service
public class OrdersEventHandler {

    private final Map<String, Order> orders = new HashMap<>();

    @EventHandler
    public void on(OrderCreatedEvent event) {
        String orderId = event.getOrderId();
        orders.put(orderId, new Order(orderId, event.getProductId()));
    }

    // Event Handlers for OrderConfirmedEvent and OrderShippedEvent...
}

As we’ve used the axon-spring-boot-starter dependency to initiate our Axon application, the framework will automatically scan all the beans for existing message-handling functions.

由于我们使用了axon-spring-boot-starter依赖项来启动我们的Axon应用程序,该框架将自动扫描所有的Bean以获得现有的消息处理功能。

As the OrdersEventHandler has EventHandler annotated functions to store an Order and update it, this bean will be registered by the framework as a class that should receive events without requiring any configuration on our part.

由于OrdersEventHandlerEventHandler注释的函数来存储Order和更新它,这个bean将被框架注册为一个应该接收事件的类,而不需要我们进行任何配置。

9. The Query Model – Query Handlers

9.查询模型–查询处理程序

Next, to query this model to for example retrieve all the orders, we should first introduce a Query message to our core API:

接下来,为了查询这个模型,例如检索所有的订单,我们应该首先给我们的核心API引入一个查询消息。

public class FindAllOrderedProductsQuery { }

Second, we’ll have to update the OrdersEventHandler to be able to handle the FindAllOrderedProductsQuery:

第二,我们必须更新OrdersEventHandler,以便能够处理FindAllOrderedProductsQuery

@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
    return new ArrayList<>(orders.values());
}

The QueryHandler annotated function will handle the FindAllOrderedProductsQuery and is set to return a List<Order> regardless, similarly to any ‘find all’ query.

QueryHandler注解的函数将处理FindAllOrderedProductsQuery,并被设置为返回一个List<Order>,与任何 “查找所有 “查询类似。

10. Putting Everything Together

10 把所有东西放在一起

We’ve fleshed out our core API with commands, events, and queries, and set up our command and query model by having an OrderAggregate and Order model.

我们已经用命令、事件和查询充实了我们的核心API,并通过OrderAggregateOrder模型设置了我们的命令和查询模型。

Next is to tie up the loose ends of our infrastructure. As we’re using the axon-spring-boot-starter, this sets a lot of the required configuration automatically.

下一步是把我们的基础设施的松散部分绑起来。由于我们使用的是axon-spring-boot-starter,它自动设置了很多必要的配置。

First, as we want to leverage Event Sourcing for our Aggregate, we’ll need an EventStore. Axon Server which we have started up in step three will fill this hole.

首先,由于我们想利用事件源来实现我们的聚合,我们将需要一个EventStore我们在第三步中启动的Axon Server将填补这个漏洞。

Secondly, we need a mechanism to store our Order query model. For this example, we can add h2 as an in-memory database and spring-boot-starter-data-jpa for ease of use:

其次,我们需要一个机制来存储我们的Order查询模型。对于这个例子,我们可以添加h2作为内存数据库和spring-boot-starter-data-jpa以便于使用。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

10.1. Setting up a REST Endpoint

10.1.设置一个REST端点

Next, we need to be able to access our application, for which we’ll be leveraging a REST endpoint by adding the spring-boot-starter-web dependency:

接下来,我们需要能够访问我们的应用程序,为此我们将通过添加spring-boot-starter-web依赖性来利用一个REST端点。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

From our REST endpoint, we can start dispatching commands and queries:

从我们的REST端点,我们可以开始调度命令和查询:

@RestController
public class OrderRestEndpoint {

    private final CommandGateway commandGateway;
    private final QueryGateway queryGateway;

    // Autowiring constructor and POST/GET endpoints
}

The CommandGateway is used as the mechanism to send our command messages, and the QueryGateway, in turn, sends query messages. The gateways provide a simpler, more straightforward API, compared to the CommandBus and QueryBus that they connect with.

CommandGateway被用作发送我们的命令消息的机制,而QueryGateway,反过来,发送查询消息。与它们所连接的CommandBusQueryBus相比,这些网关提供了一个更简单、更直接的 API。

From here on, our OrderRestEndpoint should have a POST endpoint to create, confirm, and ship an order:

从这里开始,我们的OrderRestEndpoint应该有一个POST端点来创建、确认和运送一个订单

@PostMapping("/ship-order")
public CompletableFuture<Void> shipOrder() {
    String orderId = UUID.randomUUID().toString();
    return commandGateway.send(new CreateOrderCommand(orderId, "Deluxe Chair"))
                         .thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
                         .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId)));
}

This rounds up the command side of our CQRS application. Note that a CompletableFuture is returned by the gateway, enabling asynchronizity.

这就完成了我们的CQRS应用程序的命令部分。请注意,网关返回的是CompletableFuture,实现了异步性。

Now, all that’s left is a GET endpoint to query all the Order:

现在,剩下的就是一个GET端点来查询所有的Order:

@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
    return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
}

In the GET endpoint, we leverage the QueryGateway to dispatch a point-to-point query. In doing so, we create a default FindAllOrderedProductsQuery, but we also need to specify the expected return type.

在GET端点中,我们利用QueryGateway来调度一个点对点的查询。这样做,我们创建了一个默认的FindAllOrderedProductsQuery,但是我们还需要指定预期的返回类型。

As we expect multiple Order instances to be returned, we leverage the static ResponseTypes#multipleInstancesOf(Class) function. With this, we have provided a basic entrance into the query side of our Order service.

由于我们期望返回多个Order实例,我们利用了静态的ResponseTypes#multipleInstancesOf(Class)函数。这样,我们就提供了一个进入我们的订单服务的查询方面的基本入口。

We completed the setup, so now we can send some commands and queries through our REST Controller once we’ve started up the OrderApplication.

我们完成了设置,所以现在我们可以在启动OrderApplication后,通过我们的REST控制器发送一些命令和查询。

POST-ing to endpoint /ship-order will instantiate an OrderAggregate that’ll publish events, which, in turn, will save/update our Orders. GET-ing from the /all-orders endpoint will publish a query message that’ll be handled by the OrdersEventHandler, which will return all the existing Orders.

/ship-order端点发送POST将实例化一个OrderAggregate,它将发布事件,反过来,它将保存/更新我们的Orders。/all-orders端点获取将发布一个查询信息,该信息将由OrdersEventHandler处理,它将返回所有现有Orders。

11. Conclusion

11.结论

In this article, we introduced the Axon Framework as a powerful base for building an application leveraging the benefits of CQRS and Event Sourcing.

在这篇文章中,我们介绍了Axon框架,作为利用CQRS和Event Sourcing的优势来构建应用程序的强大基础。

We implemented a simple Order service using the framework to show how such an application should be structured in practice.

我们使用该框架实现了一个简单的订单服务,以展示这样一个应用程序在实践中应该如何结构。

Lastly, Axon Server posed as our Event Store and the message routing mechanism, greatly simplifying the infrastructure.

最后,Axon服务器作为我们的事件存储和消息路由机制,大大简化了基础设施。

The implementation of all these examples and code snippets can be found over on GitHub.

所有这些例子和代码片断的实现都可以在GitHub上找到over

For any additional questions on this topic, also check out Discuss AxonIQ.

如有关于此主题的任何其他问题,也请查看讨论AxonIQ