Dispatching Queries in Axon Framework – 在Axon框架中调度查询

最后修改: 2022年 9月 7日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Axon Framework helps us build event-driven microservice systems. In A Guide to the Axon Framework, we learned about Axon by walking through a simple Axon Spring Boot application that includes building an example Order model for us to update and query. That article uses a simple point-to-point query.

Axon框架帮助我们构建事件驱动的微服务系统。在Axon框架指南中,我们通过了解一个简单的AxonSpring Boot应用程序,包括构建一个供我们更新和查询的示例Order模型来了解Axon。那篇文章使用了一个简单的点对点查询。

In this tutorial, we’ll build on the above example to examine all of the ways we can dispatch queries in Axon. In addition to looking more closely at the point-to-point query, we’ll also learn about the streaming query, the scatter-gather query and the subscription query.

在本教程中,我们将在上述例子的基础上,研究Axon中所有可以调度查询的方式。除了更仔细地研究点对点查询外,我们还将学习流式查询、散点收集查询和订阅查询。

2. Query Dispatching

2.查询调度

When we submit a query to Axon, the framework will issue that query to all registered query handlers capable of answering our query. In a distributed system, it’s possible that multiple nodes can support the same kind of query, and it’s also possible for a single node to have multiple query handlers that can support the query.

当我们向Axon提交查询时,框架会将该查询发布给所有能够回答我们查询的注册查询处理程序。在一个分布式系统中,有可能多个节点可以支持同一种查询,也有可能一个节点有多个查询处理程序可以支持该查询。

How, then, does Axon decide which results to include in its response? The answer depends on how we dispatch the query. Axon gives us three options:

那么,Axon如何决定在其响应中包括哪些结果?答案取决于我们如何分配查询。Axon给了我们三个选择。

  • A point-to-point query obtains a complete answer from any node that supports our query
  • A streaming query obtains a stream of answers from any node that supports our query
  • A scatter-gather query obtains a complete answer from all nodes that support our query
  • A subscription query obtains the answer so far and then continues to listen for any updates

In the following sections, we’ll learn how to support and dispatch each kind of query.

在下面的章节中,我们将学习如何支持和调度每一种查询。

3. Point-to-Point Queries

3.点对点查询

With a point-to-point query, Axon issues the query to every node that supports the query. Axon assumes any node is capable of giving a complete answer to a point-to-point query, and it will simply return the results it gets from the first node that responds.

通过点对点查询,Axon向每个支持该查询的节点发出查询。Axon假定任何节点都能够对点对点查询给出完整的答案,它将简单地返回从第一个响应的节点得到的结果。

In this section, we’ll use a point-to-point query to get all current Orders in our system.

在这一节中,我们将使用一个点对点的查询来获取我们系统中所有当前的 Orders。

3.1. Defining the Query

3.1.定义查询

Axon uses strongly typed classes to represent a query type and encapsulate the query parameters. In this case, since we’re querying all orders, we don’t require any query parameters. Thus, we can represent our query with an empty class:

Axon使用强类型的类来表示查询类型并封装查询参数。在这种情况下,由于我们要查询所有的订单,我们不需要任何查询参数。因此,我们可以用一个空类来表示我们的查询。

public class FindAllOrderedProductsQuery {}

3.2. Defining the Query Handler

3.2.定义查询处理程序

We can register query handlers using the @QueryHandler annotation.

我们可以使用@QueryHandler注解来注册查询处理程序。

Let’s create a class for our query handlers and add a handler that can support FindAllOrderedProductsQuery queries:

让我们为我们的查询处理程序创建一个类,并添加一个可以支持FindAllOrderedProductsQuery查询的处理程序。

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    private final Map<String, Order> orders = new HashMap<>();

    @QueryHandler
    public List<Order> handle(FindAllOrderedProductsQuery query) {
        return new ArrayList<>(orders.values());
    }
}

In the above example, we’re registering handle() as an Axon query handler that:

在上面的例子中,我们将handle()注册为一个Axon查询处理程序,它。

  1. is capable of responding to FindAllOrderedProductsQuery queries
  2. returns a List of Orders. As we’ll see later, Axon takes the return type into account when deciding which query handler can respond to a given query. This can make it easier to gradually migrate to a new API.

We use the OrdersEventHandler interface above so that we can later swap in an implementation that uses a persistent data store, such as MongoDB. For this tutorial, we’ll keep things simple by storing Order objects in an in-memory Map. Thus, our query handler just needs to return the Order objects as a List.

我们在上面使用了OrdersEventHandler接口,这样我们就可以在以后交换使用持久化数据存储的实现,例如MongoDB。在本教程中,我们将通过将Order对象存储在内存中的Map来保持简单。因此,我们的查询处理程序只需将Order对象作为List返回。

3.3. Dispatching a Point-to-Point Query

3.3.派遣一个点对点查询

Now that we’ve defined a query type and a query handler, we’re ready to dispatch a FindAllOrderedProductsQuery to Axon. Let’s create a service class with a method that issues a point-to-point FindAllOrderedProductsQuery:

现在我们已经定义了一个查询类型和一个查询处理程序,我们准备向Axon发送一个FindAllOrderedProductsQuery。让我们创建一个服务类,其中有一个方法可以发出一个点对点的FindAllOrderedProductsQuery

@Service
public class OrderQueryService {
    private final QueryGateway queryGateway;

    public OrderQueryService(QueryGateway queryGateway) {
        this.queryGateway = queryGateway;
    }

    public CompletableFuture<List<OrderResponse>> findAllOrders() {
        return queryGateway.query(new FindAllOrderedProductsQuery(),
            ResponseTypes.multipleInstancesOf(Order.class))
          .thenApply(r -> r.stream()
            .map(OrderResponse::new)
            .collect(Collectors.toList()));
    }
}

In the above example, we use Axon’s QueryGateway to dispatch an instance of FindAllOrderedProductsQuery. We use the gateway’s query() method to issue a point-to-point query. Because we are specifying ResponseTypes.multipleInstancesOf(Order.class), Axon knows we only want to talk to query handlers whose return type is a collection of Order objects.

在上面的例子中,我们使用Axon的QueryGateway来调度FindAllOrderedProductsQuery的一个实例。我们使用网关的query()方法来发布一个点对点的查询。因为我们指定了ResponseTypes.multipleInstancesOf(Order.class),Axon知道我们只想与那些返回类型为Order对象集合的查询处理程序对话。

Finally, to add a layer of indirection between our Order model class and our external clients, we wrap our results in OrderResponse objects.

最后,为了在我们的Order模型类和我们的外部客户端之间增加一层间接性,我们将我们的结果包装在OrderResponse对象中。

3.4. Testing Our Point-to-Point Query

3.4.测试我们的点对点查询

We’ll use @SpringBootTest to test our query using the Axon integration. Let’s start by adding the spring-test dependency to our pom.xml file:

我们将使用@SpringBootTest来测试我们使用Axon集成的查询。让我们首先将spring-test依赖性添加到我们的pom.xml文件。

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <scope>test</scope>
</dependency>

Next, let’s add a test that invokes our service method to retrieve an Order:

接下来,让我们添加一个测试,调用我们的服务方法来检索一个Order

@SpringBootTest(classes = OrderApplication.class)
class OrderQueryServiceIntegrationTest {

    @Autowired
    OrderQueryService queryService;

    @Autowired
    OrdersEventHandler handler;

    private String orderId;

    @BeforeEach
    void setUp() {
        orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId);
        handler.reset(Collections.singletonList(order));
    }

    @Test
    void givenOrderCreatedEventSend_whenCallingAllOrders_thenOneCreatedOrderIsReturned()
            throws ExecutionException, InterruptedException {
        List<OrderResponse> result = queryService.findAllOrders().get();
        assertEquals(1, result.size());
        OrderResponse response = result.get(0);
        assertEquals(orderId, response.getOrderId());
        assertEquals(OrderStatusResponse.CREATED, response.getOrderStatus());
        assertTrue(response.getProducts().isEmpty());
    }
}

In our @BeforeEach method above, we invoke reset(), which is a convenience method in OrdersEventHandler for pre-loading Order objects from a legacy system, or to help facilitate a migration. Here, we use it to pre-load an Order into our in-memory store for our testing.

在上面的@BeforeEach方法中,我们调用了reset(),这是OrdersEventHandler中的一个方便方法,用于从传统系统中预装Order对象,或帮助促进迁移。在这里,我们使用它来预先加载一个Order到我们的内存存储中,以便进行测试。

We then invoke our service method and verify that it has retrieved our test order after dispatching our query to the query handler we set up earlier.

然后我们调用我们的服务方法,并验证它在将我们的查询分派给我们先前设置的查询处理程序后,已经检索到我们的测试订单。

4. Streaming Queries

4.流式查询

With streaming queries, we can send large collections as a stream.

通过流式查询,我们可以将大型集合作为一个流发送。

Instead of waiting until the whole result is complete at the query handler side, like with point-to-point queries, the result can be returned in pieces. Unlike subscription queries, streaming queries are expected to be complete at some point.

而不是像点对点查询那样,在查询处理程序端等待整个结果的完成,结果可以被分块返回。与订阅查询不同,流式查询预计在某个时间点完成。

By depending on project Reactor, the streaming query benefits from features such as backpressure to handle large collections of results. If we don’t already have the reactor-core dependency, we need to add it to be able to use streaming queries.

通过依赖项目Reactor,流式查询可以从诸如背压等功能中获益,以处理大型结果集合。如果我们还没有reactor-core依赖关系,我们需要添加它,以便能够使用流式查询。

4.1. Defining the Query

4.1.定义查询

We’ll reuse the query from the point-to-point query.

我们将重新使用点对点查询中的查询。

4.2. Defining the Query Handler

4.2.定义查询处理程序

A streaming query should return a Publisher. We can use Reactor to create a Mono from the values of the in-memory map:

一个流式查询应该返回一个Publisher。我们可以使用Reactor从内存地图的值中创建一个Mono

@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
    return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
}

We use flatMapMany() to convert the Mono to a Publisher.

我们使用flatMapMany()Mono转换为Publisher

4.3. Dispatching a Streaming Query

4.3.派遣一个流式查询

The method to add to the OrderQueryService is very similar to the point-to-point query. We do give it a different method name, so the distinction’s clear:

要添加到OrderQueryService的方法与点对点查询非常相似。我们确实给了它一个不同的方法名,所以区别很明显。

public Flux<OrderResponse> allOrdersStreaming() {
    Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
    return Flux.from(publisher).map(OrderResponse::new);
}

4.4. Testing Our Streaming Query

4.4.测试我们的流查询

Let’s add a test for this to our OrderQueryServiceIntegrationTest:

让我们为我们的OrderQueryServiceIntegrationTest添加一个测试。

@Test
void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
    Flux<OrderResponse> result = queryService.allOrdersStreaming();
    StepVerifier.create(result)
      .assertNext(order -> assertEquals(orderId, order.getOrderId()))
      .expectComplete()
      .verify();
}

We should note that with the expectComplete(), we verify the stream was indeed completed.

我们应该注意到,通过expectComplete(),我们验证了流确实已经完成。

5. Scatter-Gather Queries

5.分散收集的查询

A scatter-gather query is dispatched to all query handlers in all nodes that support the query. For these queries, the results from each query handler are combined into a single response. If two nodes have the same Spring application name, Axon considers them equivalent and will only use the results from the first node that responds.

对于这些查询,来自每个查询处理程序的结果被合并为一个响应。如果两个节点有相同的Spring应用程序名称,Axon认为它们是等价的,并将只使用第一个响应的节点的结果。

In this section, we’ll create a query to retrieve the total number of shipped products that match a given product ID. We will simulate querying both a live system and a legacy system in order to show that Axon will combine the responses from both systems.

在本节中,我们将创建一个查询,以检索符合给定产品ID的已发货产品的总数。我们将模拟查询实时系统和传统系统,以显示Axon将结合两个系统的响应。

5.1. Defining the Query

5.1.定义查询

Unlike our point-to-point query, we need to supply a parameter this time: the product ID. Instead of an empty class, we’ll create a POJO with our product ID parameter:

与我们的点对点查询不同,这次我们需要提供一个参数:产品ID。我们将创建一个带有产品ID参数的POJO,而不是一个空类:

public class TotalProductsShippedQuery {
    private final String productId;

    public TotalProductsShippedQuery(String productId) {
        this.productId = productId;
    }

    // getter
}

5.2. Defining the Query Handlers

5.2.定义查询处理程序

First, we’ll query the event-based system, which, as we’ll recall, uses an in-memory data store. Let’s add a query handler to our existing InMemoryOrdersEventHandler to get the total number of shipped products:

首先,我们将查询基于事件的系统,正如我们所记得的,它使用了一个内存数据存储。让我们为我们现有的InMemoryOrdersEventHandler添加一个查询处理程序,以获得已发货产品的总数。

@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
    return orders.values().stream()
      .filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
      .map(o -> Optional.ofNullable(o.getProducts().get(query.getProductId())).orElse(0))
      .reduce(0, Integer::sum);
}

Above, we retrieve all of our in-memory Order objects and remove any that haven’t been shipped. We then invoke getProducts() on each Order to get the number of products shipped whose product ID matches our query parameter. We then sum those numbers to get our total number of shipped products.

以上,我们检索了所有内存中的Order对象,并删除了任何尚未发货的对象。然后我们在每个Order上调用getProducts(),以获得产品ID与我们的查询参数相匹配的已发货产品的数量。然后我们将这些数字相加,得到已发货产品的总数。

Since we want to combine those results with the numbers in our hypothetical legacy system, let’s simulate the legacy data with a separate class and query handler:

由于我们想把这些结果与我们假设的遗留系统中的数字结合起来,让我们用一个单独的类和查询处理程序来模拟遗留的数据。

@Service
public class LegacyQueryHandler {
    @QueryHandler
    public Integer handle(TotalProductsShippedQuery query) {
        switch (query.getProductId()) {
        case "Deluxe Chair":
            return 234;
        case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
            return 10;
        default:
            return 0;
        }
    }
}

For the sake of this tutorial, this query handler exists in the same Spring application as our InMemoryOrdersEventHandler handler. In a real-life scenario, we would likely not have multiple query handlers for the same query type within the same application. A scatter-gather query typically combines results from multiple Spring applications that each have a single handler.

在本教程中,这个查询处理程序与我们的InMemoryOrdersEventHandler处理程序存在于同一个Spring应用程序中。在现实生活中,我们可能不会在同一个应用程序中为同一查询类型设置多个查询处理程序。分散收集的查询通常会结合来自多个Spring应用程序的结果,而每个应用程序都有一个处理程序。

5.3. Dispatching a Scatter-Gather Query

5.3.分散收集查询的调度

Let’s add a new method to our OrderQueryService for dispatching a scatter-gather query:

让我们为我们的OrderQueryService添加一个新的方法,用于调度一个分散收集的查询。

public Integer totalShipped(String productId) {
    return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
        ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
      .reduce(0, Integer::sum);
}

This time, we construct our query object with the productId parameter. We also set a 10-second timeout to our scatterGather() call. Axon will only respond with results it can retrieve within that time window. If one or more handlers do not respond within that window, their results will not be included in queryGateway‘s response.

这一次,我们用productId参数构建我们的查询对象。我们还为我们的scatterGather()调用设置了10秒的超时。Axon将只响应它能在该时间窗口内检索到的结果。如果一个或多个处理程序没有在该窗口内响应,它们的结果将不包括在queryGateway的响应中。

5.4. Testing Our Scatter-Gather Query

5.4 测试我们的散点收集查询

Let’s add a test for this to our OrderQueryServiceIntegrationTest:

让我们为我们的OrderQueryServiceIntegrationTest添加一个测试。

void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
    Order order = new Order(orderId);
    order.getProducts().put("Deluxe Chair", 3);
    order.setOrderShipped();
    handler.reset(Collections.singletonList(order));

    assertEquals(237, queryService.totalShipped("Deluxe Chair"));
}

Above, we use our reset() method to simulate three orders in our event-driven system. Previously, in our LegacyQueryHandler, we hard-coded 234 shipped deluxe chairs in our legacy system. Thus, our test should yield a combined total of 237 deluxe chairs shipped.

上面,我们使用我们的reset()方法,在我们的事件驱动系统中模拟了三个订单。之前,在我们的LegacyQueryHandler中,我们在传统系统中硬编码了234张已发货的豪华椅子。因此,我们的测试应该产生237把豪华椅的总发货量。

6. Subscription Queries

6.订阅查询

With subscription queries, we get an initial result followed by a stream of updates. In this section, we’ll query our system for an Order in its current state, but then remain connected to Axon in order to get any new updates to that Order as they occur.

通过订阅查询,我们会得到一个初始结果,然后是更新流。在本节中,我们将查询系统中的Order的当前状态,但随后保持与Axon的连接,以便在该Order发生时获得任何新更新。

6.1. Defining the Query

6.1.定义查询

Since we want to retrieve a specific order, let’s create a query class that includes an order ID as its only parameter:

由于我们想检索一个特定的订单,让我们创建一个查询类,包括一个订单ID作为其唯一的参数。

public class OrderUpdatesQuery {
    private final String orderId;

    public OrderUpdatesQuery(String orderId) {
        this.orderId = orderId;
    }

    // getter
}

6.2. Defining the Query Handler

6.2.定义查询处理程序

The query handler for retrieving an Order from our in-memory map is very simple. Let’s add it to our InMemoryOrdersEventHandler class:

从我们的内存地图中检索Order的查询程序非常简单。让我们把它添加到我们的InMemoryOrdersEventHandler类。

@QueryHandler
public Order handle(OrderUpdatesQuery query) {
    return orders.get(query.getOrderId());
}

6.3. Emitting Query Updates

6.3.发出查询更新

A subscription query is only interesting when there are updates. Axon Framework provides a QueryUpdateEmitter class we can use to inform Axon how and when a subscription should be updated. Let’s inject that emitter into our InMemoryOrdersEventHandler class and use it in a convenience method:

只有在有更新时,订阅的查询才会有意义。Axon Framework提供了一个QueryUpdateEmitter类,我们可以用它来通知Axon如何以及何时更新订阅。让我们将该发射器注入我们的InMemoryOrdersEventHandler类,并在一个便利方法中使用它。

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {

    private final QueryUpdateEmitter emitter;

    public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
        this.emitter = emitter;
    }

    private void emitUpdate(Order order) {
        emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
          .equals(q.getOrderId()), order);
    }

    // our event and query handlers
}

Our emitter.emit() invocation tells Axon that any clients subscribed to the OrderUpdatesQuery may need to receive an update. The second argument is a filter telling Axon that only subscriptions matching the supplied order ID should get the update.

我们的emitter.emit()调用告诉Axon,任何订阅了OrderUpdatesQuery的客户端可能需要接收更新。第二个参数是一个过滤器,告诉Axon,只有与所提供的订单ID相匹配的订阅才应该得到更新。

We can now use our emitUpdate() method inside any event handler that modifies an order. For example, if an order is shipped, any active subscription to updates to that order should be notified. Let’s create an event handler for the OrderShippedEvent that was covered in the previous article and have it emit updates to the shipped order:

我们现在可以在任何修改订单的事件处理程序中使用我们的emitUpdate()方法。例如,如果一个订单被装运,任何对该订单进行更新的活动订阅应该被通知。让我们为前一篇文章中涉及的OrderShippedEvent创建一个事件处理程序,并让它发出对已发货订单的更新。

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    @EventHandler
    public void on(OrderShippedEvent event) {
        orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
            order.setOrderShipped();
            emitUpdate(order);
            return order;
        });
    }

    // fields, query handlers, other event handlers, and our emitUpdate() method
}

We can do the same for our ProductAddedEventProductCountIncrementedEventProductCountDecrementedEvent, and OrderConfirmedEvent events.

我们可以为我们的ProductAddedEventProductCountIncrementedEventProductCountDecrementedEventOrderConfirmedEvent事件做同样的事。

6.4. Subscribing to a Query

6.4.订阅一个查询

Next, we’ll build a service method for subscribing to a query. We’ll use Flux from Reactor Core in order to stream updates to the client code.

接下来,我们将构建一个用于订阅查询的服务方法。我们将使用Reactor Core中的Flux,以便将更新流传到客户端代码。

Let’s add that dependency to our pom.xml file:

让我们把这个依赖性添加到我们的pom.xml文件。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

Now, let’s add our service method implementation to OrderQueryService:

现在,让我们把我们的服务方法实现添加到OrderQueryService

public class OrderQueryService {
    public Flux<OrderResponse> orderUpdates(String orderId) {
        return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class))
                .map(OrderResponse::new);
    }

    private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
        SubscriptionQueryResult<R, R> result = queryGateway.subscriptionQuery(query,
          resultType, resultType);
        return result.initialResult()
          .concatWith(result.updates())
          .doFinally(signal -> result.close());
    }

    // our other service methods
}

The public orderUpdates() method above delegates most of its work to our private convenience method, subscriptionQuery(), though we again package our response as OrderResponse objects so we’re not exposing our internal Order object.

上面的公共orderUpdates()方法将其大部分工作委托给我们的私有便利方法subscriptionQuery(),尽管我们再次将我们的响应打包为OrderResponse对象,这样我们就不会暴露我们的内部Order对象。

Our generalized subscriptionQuery() convenience method is where we combine the initial result we get from Axon with any future updates.

我们的通用subscriptionQuery()便利方法是将我们从Axon获得的初始结果与任何未来的更新结合起来。

First, we invoke Axon’s queryGateway.subscriptionQuery() to get a SubscriptionQueryResult object. We supply resultType to queryGateway.subscriptionQuery() twice because we are always expecting an Order object, but we could use a different type for updates if we wanted to.

首先,我们调用Axon的queryGateway.subscriptionQuery()来获得一个SubscriptionQueryResult对象。我们向queryGateway.subscriptionQuery()提供resultType两次,因为我们总是期待一个Order对象,但是如果我们想,我们可以为更新使用不同的类型。

Next, we use result.getInitialResult() and result.getUpdates() to get all the information we need to fulfill the subscription.

接下来,我们使用result.getInitialResult()result.getUpdates()来获得我们需要的所有信息,以完成订阅。

Finally, we close the stream.

最后,我们关闭该流。

While we don’t use it here, there’s also a Reactive extension for Axon Framework that offers an alternative query gateway that can make it easier to work with subscription queries.

虽然我们在这里没有使用它,但Axon Framework还有一个Reactive扩展,它提供了一个替代的查询网关,可以使订阅查询的工作更加容易。

6.5. Testing Our Subscription Query

6.5.测试我们的订阅查询

To help us test our service method that returns a Flux, we’ll use the StepVerifier class that we get from the reactor-test dependency:

为了帮助我们测试返回Flux的服务方法,我们将使用从reactor-test依赖中获得的StepVerifier类。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Let’s add our test:

让我们添加我们的测试。

class OrderQueryServiceIntegrationTest {
    @Test
    void givenOrdersAreUpdated_whenCallingOrderUpdates_thenUpdatesReturned() {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.schedule(this::addIncrementDecrementConfirmAndShip, 100L, TimeUnit.MILLISECONDS);
        try {
            StepVerifier.create(queryService.orderUpdates(orderId))
              .assertNext(order -> assertTrue(order.getProducts().isEmpty()))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(2, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(OrderStatusResponse.CONFIRMED, order.getOrderStatus()))
              .assertNext(order -> assertEquals(OrderStatusResponse.SHIPPED, order.getOrderStatus()))
              .thenCancel()
              .verify();
        } finally {
            executor.shutdown();
        }
    }

    private void addIncrementDecrementConfirmAndShip() {
        sendProductAddedEvent();
        sendProductCountIncrementEvent();
        sendProductCountDecrement();
        sendOrderConfirmedEvent();
        sendOrderShippedEvent();
    }

    private void sendProductAddedEvent() {
        ProductAddedEvent event = new ProductAddedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountIncrementEvent() {
        ProductCountIncrementedEvent event = new ProductCountIncrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountDecrement() {
        ProductCountDecrementedEvent event = new ProductCountDecrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendOrderConfirmedEvent() {
        OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
        eventGateway.publish(event);
    }

    private void sendOrderShippedEvent() {
        OrderShippedEvent event = new OrderShippedEvent(orderId);
        eventGateway.publish(event);
    }

    // our other tests
}

Above, we have a private addIncrementDecrementConfirmAndShip() method that publishes five Order-related events to Axon. We invoke this in a separate thread via ScheduledExecutorService 100ms after the test begins in order to simulate events that come in after we’ve begun our OrderUpdatesQuery subscription.

上面,我们有一个私有的addIncrementDecrementConfirmAndShip()方法,向Axon发布五个Order相关事件。我们通过ScheduledExecutorService 在测试开始后100ms调用这个方法,以便模拟在我们开始OrderUpdatesQuery订阅后出现的事件。

In our primary thread, we invoke the orderUpdates() query we’re testing, using StepVerifier to allow us to make assertions on every discrete update we receive from the subscription.

在我们的主线程中,我们调用我们正在测试的orderUpdates()查询,使用StepVerifier来允许我们对从订阅中收到的每个离散更新进行断言。

7. Conclusion

7.结语

In this article, we explored three approaches to dispatching queries within the Axon Framework: point-to-point queries, scatter-gather queries, and subscription queries.

在这篇文章中,我们探讨了在Axon框架内调度查询的三种方法:点对点查询、散点收集查询和订阅查询。

As always, the complete code examples for this article are available over on GitHub.

一如既往,本文的完整代码示例可在GitHub上获得over