SEDA With Spring Integration and Apache Camel – 带有Spring集成和Apache Camel的SEDA

最后修改: 2022年 9月 28日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

SEDA, or Staged Event-Driven Architecture, is an architectural style proposed by Matt Welsh in his Ph.D. thesis. Its primary benefits are scalability, support for highly-concurrent traffic, and maintainability.

SEDA,即阶段性事件驱动架构,是Matt Welsh在其博士论文中提出的一种架构风格。它的主要优点是可扩展性、对高并发流量的支持以及可维护性。

In this tutorial, we’ll use SEDA to count the unique words in a sentence using two separate implementations: Spring Integration and Apache Camel.

在本教程中,我们将使用SEDA来计算一个句子中的唯一单词,使用两个独立的实现。Spring IntegrationApache Camel

2. SEDA

2.SEDA

SEDA addresses several non-functional requirements specific to online services:

SEDA解决了在线服务特有的几个非功能要求

  1. High concurrency: The architecture must support as many concurrent requests as possible.
  2. Dynamic content: Software systems must often support complex business use cases, requiring many steps to process user requests and generate responses.
  3. Robustness to load: User traffic for online services can be unpredictable, and the architecture needs to deal with changes in traffic volume gracefully.

To address these requirements, SEDA decomposes complex services into event-driven stages. These stages are indirectly connected with queues and can thus be completely decoupled from each other. Furthermore, each stage has a scaling mechanism to cope with its incoming load:

为了满足这些要求,SEDA将复杂的服务分解为事件驱动的阶段。这些阶段与队列间接相连,因此可以彼此完全解耦。此外,每个阶段都有一个扩展机制来应对其传入的负载。

SEDA Overview

The above diagram from Matt Welsh’s paper depicts the overall structure of a web server implemented with SEDA. Each rectangle represents a single processing stage for an incoming HTTP request. The stages can independently consume tasks from their incoming queues, do some processing or I/O work, and then pass a message to the next queue.

上图来自Matt Welsh的论文,描述了一个用SEDA实现的网络服务器的整体结构。每个矩形代表了一个传入HTTP请求的单一处理阶段。这些阶段可以独立地从其传入的队列中消耗任务,做一些处理或I/O工作,然后将消息传递给下一个队列。

2.1. Components

2.1.组成部分

To better understand the components of SEDA, let’s look at how this diagram from Matt Welsh’s thesis shows the inner workings of a single stage:

为了更好地理解SEDA的组成部分,让我们看看马特-威尔士的论文中的这张图是如何显示一个阶段的内部运作的。

SEDA Stage

As we can see, each SEDA stage has the following components:

我们可以看到,每个SEDA阶段都有以下组成部分。

  • Event: Events are data structures containing whatever data the stage needs to perform its processing. For example, for an HTTP web server, events might contain user data – such as the body, header, and request parameters – and infrastructure data like the user’s IP, the request timestamp, etc.
  • Event Queue: This holds the stage’s incoming events.
  • Event Handler: The event handler is the procedural logic of the stage. This could be a simple routing stage, forwarding data from its event queue to other relevant event queues, or a more complex stage that processes the data somehow. The event handler can read events individually or in batches – the latter’s helpful when there’s a performance benefit to batch processing, such as updating multiple database records with one query.
  • Outgoing Events: Based on the business use case and the overall structure of the flow, each stage can send new events to zero or more event queues. Creating and sending outgoing messages is done in the event handler method.
  • Thread Pool: Threading is a well-known concurrency mechanism. In SEDA, threading is localized and customized for each stage. In other words, each stage maintains a thread pool. Thus, unlike the one-thread-per-request model, each user request is processed by several threads under SEDA. This model allows us to tune each stage independently according to its complexity.
  • Controllers: A SEDA controller is any mechanism that manages the consumption of resources such as thread pool size, event queue size, scheduling, etc. Controllers are responsible for the elastic behavior of SEDA. A simple controller might manage the number of active threads in each thread pool. A more sophisticated controller could implement complex performance-tuning algorithms that monitor the whole application at runtime and tune various parameters. Moreover, controllers decouple the performance-tuning logic from the business logic. That separation of concerns makes it easier to maintain our code.

By putting all these components together, SEDA provides a robust solution for dealing with high and fluctuating traffic loads.

通过把所有这些组件放在一起,SEDA提供了一个强大的解决方案来处理高的和波动的流量负载。

3. Sample Problem

3.问题样本

In the following sections, we’ll create two implementations that solve the same problem using SEDA.

在下面的章节中,我们将创建两个实现,用SEDA来解决同一个问题。

Our example problem will be straightforward: count how many times each word appears case-insensitive within a given string.

我们的例子问题将是简单明了的。计算每个词在一个给定的字符串中不区分大小写出现的次数

Let’s define a word as a sequence of characters without spaces, and we’ll ignore other complications such as punctuation. Our output will be a map that contains the words as keys and the counts as values. For example, given the input “My name is Hesam“, the output will be:

让我们把一个词定义为没有空格的字符序列,我们将忽略其他复杂的情况,如标点符号。我们的输出将是一个地图,其中包含作为键的单词和作为值的计数。例如,鉴于输入”我的名字是Hesam“,输出将是。

{
  "my": 1,
  "name": 1,
  "is": 1,
  "hesam": 1
}

3.1. Adapting the Problem to SEDA

3.1.使问题适用于SEDA

Let’s look at our problem in terms of SEDA stages. Since scalability is a core goal of SEDA, it’s usually better to design small stages focused on specific operations, especially if we have I/O-intensive tasks. Moreover, having small stages helps us better tune the scale of each stage.

让我们从SEDA阶段的角度来看看我们的问题。由于可扩展性是SEDA的一个核心目标,通常最好是设计一些专注于特定操作的小阶段,特别是当我们有I/O密集型任务时。此外,拥有小的阶段有助于我们更好地调整每个阶段的规模。

To solve our word count problem, we can implement a solution with the following stages:

为了解决我们的字数问题,我们可以通过以下几个阶段实施解决方案。

Example Word-Count Flow

Now that we have our stage design, let’s implement it in the next sections using two different enterprise integration technologies. In this table, we can preview how SEDA will show up in our implementations:

现在我们有了我们的阶段性设计,让我们在接下来的章节中使用两种不同的企业集成技术来实现它。在这个表格中,我们可以预览SEDA在我们的实现中会如何显示。

SEDA Component Spring Integration Apache Camel
Event
org.springframework.messaging.Message org.apache.camel.Exchange
Event Queue

org.springframework.integration.channel

org.springframework.integration.channel

Endpoints defined by URI strings
Event Handler
Instances of functional interfaces Camel processors, Camel utility classes, and Functions
Thread Pool
Spring abstraction of TaskExecutor Out-of-the-box support in SEDA endpoints

4. Solution Using Spring Integration

4.使用Spring集成的解决方案

For our first implementation, we’ll use Spring Integration. Spring Integration builds on the Spring model to support popular enterprise integration patterns.

对于我们的第一个实现,我们将使用Spring Integration。Spring Integration建立在Spring模型上,支持流行的企业集成模式

Spring Integration has three main components:

Spring Integration有三个主要组成部分。

  1. A message is a data structure containing a header and a body.
  2. A channel carries messages from one endpoint to another endpoint. There are two kinds of channels in Spring Integration:
    • point-to-point: Only one endpoint can consume the messages in this channel.
    • publish-subscribe: Multiple endpoints can consume the messages in this channel.
  3. An endpoint routes a message to an application component that performs some business logic. There are a variety of endpoints in Spring Integration, such as transformers, routers, service activators, and filters.

Let’s look at an overview of our Spring Integration solution:

让我们来看看我们的Spring集成解决方案的概况。

 Word Count EIP Diagram

4.1. Dependencies

4.1. 依赖性

Let’s get started by adding dependencies for Spring Integration, Spring Boot Test, and Spring Integration Test:

让我们开始为Spring Integration, Spring Boot TestSpring Integration Test添加依赖项。

<dependencies>
    <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
    </dependency>
    <dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-test</artifactId>
	<scope>test</scope>
    </dependency>
</dependencies>

4.2. The Message Gateway

4.2.消息网关

A messaging gateway is a proxy that hides the complexity of sending a message to integration flows. Let’s set one up for our Spring Integration flow:

消息网关是一个代理,它隐藏了向集成流发送消息的复杂性。让我们为我们的 Spring 集成流设置一个。

@MessagingGateway
public interface IncomingGateway {
    @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
    public Map<String, Long> countWords(String input);
}

Later, we’ll be able to use this gateway method to test our entire flow:

稍后,我们将能够使用这个网关方法来测试我们的整个流程。

incomingGateway.countWords("My name is Hesam");

Spring wraps the “My name is Hesam” input within an instance of org.springframework.messaging.Message and passes it to receiveTextChannel, and later gives us the final result from returnResponseChannel.

Spring在org.springframework.messaging.Message的实例中包装了“我的名字是Hesam”输入,并将其传递给receiveTextChannel,随后从returnResponseChannel给我们最终结果。

4.3. Message Channels

4.3 信息通道

In this section, we’ll look at how to set up our gateway’s initial message channel, receiveTextChannel.

在本节中,我们将看看如何设置我们网关的初始消息通道,receiveTextChannel

Under SEDA, channels need to be scalable via an associated thread pool, so let’s begin by creating a thread pool:

在SEDA下,通道需要通过相关的线程池进行扩展,所以我们先创建一个线程池。

@Bean("receiveTextChannelThreadPool")
TaskExecutor receiveTextChannelThreadPool() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(5);
    executor.setThreadNamePrefix("receive-text-channel-thread-pool");
    executor.initialize();
    return executor;
}

Next, we’ll use our thread pool to create our channel:

接下来,我们将使用我们的线程池来创建我们的通道。

@Bean(name = "receiveTextChannel")
MessageChannel getReceiveTextChannel() {
    return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
      .get();
}

MessageChannels is a Spring Integration class that helps us create channels of various types. Here, we use the executor() method to create an ExecutorChannel, which is a channel managed by a thread pool.

MessageChannels是一个Spring集成类,可以帮助我们创建各种类型的通道。在这里,我们使用executor()方法来创建一个ExecutorChannel,这是一个由线程池管理的通道。

Our other channels and thread pools are set up the same way as above.

我们的其他频道和线程池的设置方法与上述相同。

4.4. Receive Text Stage

4.4.接收文本阶段

With our channels set up, we can start implementing our stages. Let’s create our initial stage:

设置好渠道后,我们就可以开始实施我们的阶段。让我们来创建我们的初始阶段。

@Bean
IntegrationFlow receiveText() {
    return IntegrationFlows.from(receiveTextChannel)
      .channel(splitWordsChannel)
      .get();
}

IntegrationFlows is a fluent Spring Integration API for creating IntegrationFlow objects, representing the stages of our flow. The from() method configures our stage’s incoming channel, and channel() configures the outgoing channel.

IntegrationFlows是一个流畅的Spring集成API,用于创建IntegrationFlow对象,代表我们流程的阶段。from()方法配置我们阶段的入站通道,channel()配置出站通道。

In this example, our stage passes our gateway’s input message to splitWordsChannel. This stage might be more complex and I/O intensive in a production application, reading messages from a persistent queue or over a network.

在这个例子中,我们的阶段将网关的输入消息传递给splitWordsChannel。在生产应用中,这个阶段可能更复杂,I/O更密集,从持久性队列或通过网络读取消息。

4.5. Split Words Stage

4.5.分词阶段

Our next stage has a single responsibility: splitting our input String into a String array of the individual words in the sentence:

我们的下一阶段只有一个责任:将我们的输入String分割成句子中各个单词的String阵列。

@Bean
IntegrationFlow splitWords() {
    return IntegrationFlows.from(splitWordsChannel)
      .transform(splitWordsFunction)
      .channel(toLowerCaseChannel)
      .get();
}

In addition to the from() and channel() invocations we’ve used before, here we also use transform(), which applies the supplied Function to our input message.  Our splitWordsFunction implementation is very simple:

除了我们之前使用的from()channel()调用之外,这里我们还使用了transform(),它将提供的Function应用到我们的输入消息。 我们的splitWordsFunction的实现非常简单。

final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");

4.6. Convert to Lowercase Stage

4.6.转换为小写阶段

This stage converts every word in our String array into lowercase:

这个阶段将我们的String数组中的每个字都转换为小写。

@Bean
IntegrationFlow toLowerCase() {
    return IntegrationFlows.from(toLowerCaseChannel)
      .split()
      .transform(toLowerCase)
      .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
        .outputProcessor(buildMessageWithListPayload))
      .channel(countWordsChannel)
      .get();
}

The first new IntegrationFlows method we use here is split(). The split() method uses the splitter pattern to send each element of our input message to toLowerCase as individual messages.

我们在这里使用的第一个新IntegrationFlows方法是split()split()方法使用splitter模式来将我们的输入消息的每个元素作为单独的消息发送到toLowerCase

The next new method we see is aggregate(), which implements the aggregator pattern. The aggregator pattern has two essential arguments:

我们看到的下一个新方法是aggregate(),它实现了聚合器模式。聚合器模式有两个基本参数:

  1. the release strategy, which determines when to combine messages into a single one
  2. the processor, which determines how to combine messages into a single one

Our release strategy function uses listSizeReached, which tells the aggregator to start aggregation when all elements of the input array have been collected:

我们的发布策略函数使用listSizeReached,它告诉聚合器在输入数组的所有元素都被收集后开始聚合。

final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();

The buildMessageWithListPayload processor then packages our lowercased results into a List:

然后 buildMessageWithListPayload处理器将我们的小写结果打包成List

final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->
  MessageBuilder.withPayload(messageGroup.streamMessages()
      .map(Message::getPayload)
      .toList())
    .build();

4.7. Count Words Stage

4.7.数词阶段

Our final stage packages our word counts into a Map, wherein the keys are the words from the original input, and the values are the number of occurrences of each word:

我们的最后阶段将我们的字数打包成Map,其中键是原始输入中的字,而值是每个字的出现次数。

@Bean
IntegrationFlow countWords() {
    return IntegrationFlows.from(countWordsChannel)
      .transform(convertArrayListToCountMap)
      .channel(returnResponseChannel)
      .get();
}

Here, we use our convertArrayListToCountMap function for packaging our counts as a Map:

在这里,我们使用我们的convertArrayListToCountMap函数,将我们的计数打包成Map

final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
  .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

4.8. Testing Our Flow

4.8.测试我们的流程

We can pass an initial message to our gateway method to test our flow:

我们可以向我们的网关方法传递一个初始消息,以测试我们的流程。

public class SpringIntegrationSedaIntegrationTest {
    @Autowired
    TestGateway testGateway;

    @Test
    void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
        Map<String, Long> actual = testGateway.countWords("My name is Hesam");
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);

        assertEquals(expected, actual);
    }
}

5. Solution With Apache Camel

5.使用Apache Camel的解决方案

Apache Camel is a popular and powerful open-source integration framework. It’s based on four primary concepts:

Apache Camel是一个流行而强大的开源集成框架。它基于四个主要概念。

  1. Camel context: The Camel runtime sticks different parts together.
  2. Routes: A route determines how a message should be processed and where it should go next.
  3. Processors: These are ready-to-use implementations of various enterprise integration patterns.
  4. Components: Components are extension points for integrating external systems via JMS, HTTP, file IO, etc.

Apache Camel has a component dedicated to SEDA functionality, making it straightforward to build SEDA applications.

Apache Camel有一个专门用于SEDA功能的组件,使得构建SEDA应用程序变得简单明了。

5.1. Dependencies

依赖性

Let’s add the required Maven dependencies for Apache Camel and Apache Camel Test:

让我们为Apache CamelApache Camel Test添加必要的Maven依赖项。

<dependencies>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>3.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-test-junit5</artifactId>
        <version>3.18.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>

5.2. Defining SEDA Endpoints

5.2.定义SEDA端点

First, we need to define the endpoints. An endpoint is a component defined with a URI string. SEDA endpoints must start with “seda:[endpointName]“:

首先,我们需要定义端点。一个端点是一个用URI字符串定义的组件。SEDA端点必须以”seda:[endpointName]“开头。

static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
static final String returnResponse = "mock:result";

As we can see, each endpoint is configured to have five concurrent consumers. This is equivalent to having a maximum of 5 threads for each endpoint.

正如我们所看到的,每个端点被配置为有五个并发的消费者。这相当于每个端点最多有5个线程。

For the sake of testing, the returnResponse is a mock endpoint.

为了测试,returnResponse是一个模拟的端点。

5.3. Extending RouteBuilder

5.3.对RouteBuilder的扩展

Next, let’s define a class that extends Apache Camel’s RouteBuilder and overrides its configure() method. This class wires all SEDA endpoints:

接下来,让我们定义一个扩展Apache Camel的RouteBuilder并重写其configure()方法的类。这个类连接所有的SEDA端点。

public class WordCountRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
    }
}

In the following sections, we’ll define our stages by adding lines to this configure() method using convenience methods we’ve inherited from RouteBuilder.

在下面的章节中,我们将使用从RouteBuilder继承的方便方法,通过向这个configure()方法添加行来定义我们的阶段。

5.4. Receive Text Stage

5.4.接收文本阶段

This stage receives messages from a SEDA endpoint and routes them to the next stage without any processing:

这个阶段接收来自SEDA端点的消息,并将其路由到下一个阶段,而不进行任何处理。

from(receiveTextUri).to(splitWordsUri);

Here, we used our inherited from() method to specify the incoming endpoint and to() to set the outgoing endpoint.

这里,我们使用继承的from()方法来指定传入端点,to()来设置传出端点。

5.5. Split Words Stage

5.5.分词阶段

Let’s implement the stage for splitting the input text into individual words:

让我们来实现将输入文本分割成单个单词的阶段。

from(splitWordsUri)
  .transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))
  .to(toLowerCaseUri);

The transform() method applies our Function to our input message, splitting it into an array.

transform()方法将我们的Function应用于我们的输入消息,将其分割成一个数组。

5.6. Convert to Lowercase Stage

5.6.转换为小写阶段

Our next task is to convert each word in our input to lowercase. Because we need to apply our transformation function to each  String in our message vs. the array itself, we’ll use the split() method both to split the input message for processing and to later aggregate the results back into an ArrayList:

我们的下一个任务是将输入中的每个单词转换为小写。因为我们需要对消息中的每个String和数组本身应用我们的转换函数,我们将使用split()方法来分割输入的消息进行处理,并在之后将结果聚合到ArrayList

from(toLowerCaseUri)
  .split(body(), new ArrayListAggregationStrategy())
  .transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))
  .end()
  .to(countWordsUri);

The end() method marks the end of the split process. Once each item in the list has been transformed, Apache Camel applies the aggregation strategy ArrayListAggregationStrategy we’ve specified.

end()方法标志着分割过程的结束。一旦列表中的每个项目被转换,Apache Camel就会应用我们指定的聚合策略ArrayListAggregationStrategy

ArrayListAggregationStrategy extends Apache Camel’s AbstractListAggregationStrategy to define which part of the message should be aggregated. In this case, the message body is the newly-lowercased word:

ArrayListAggregationStrategy扩展了Apache Camel的AbstractListAggregationStrategy来定义消息的哪一部分应该被聚合。在这种情况下,消息主体是新的小写字母的单词。

class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
    @Override
    public String getValue(Exchange exchange) {
        return exchange.getIn()
          .getBody(String.class);
    }
}

5.7. Count Words Stage

5.7.数词阶段

The last stage uses a transformer to convert the array into a map of words to word counts:

最后一个阶段使用一个转换器将数组转换为字与字数的映射。

from(countWordsUri)
  .transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
    .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
  .to(returnResponse);

5.8. Testing Our Route

5.8.测试我们的路线

Let’s test our route:

让我们测试一下我们的路线。

public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
    @Test
    public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
      throws InterruptedException {
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);
        getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
        template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        RoutesBuilder wordCountRoute = new WordCountRoute();
        return wordCountRoute;
    }
}

The CamelTestSupport superclass provides many fields and methods to help us test our flow. We’re using getMockEndpoint() and expectedBodiesReceived() to set our expected result, and template.sendBody() to submit test data to our mock endpoint. Finally, we use assertMockEndpointsSatisfied() to test whether our expectation matches the actual results.

CamelTestSupport超类提供了许多字段和方法来帮助我们测试我们的流程。我们使用getMockEndpoint()expectedBodiesReceived()来设置我们的预期结果,以及template.sendBody()来提交测试数据到我们的模拟端点。最后,我们使用assertMockEndpointsSatisfied()来测试我们的期望是否与实际结果相符。

6. Conclusion

6.结语

In this article, we learned about SEDA and its components and use cases. Afterward, we explored how to use SEDA to solve the same problem using first Spring Integration and then Apache Camel.

在这篇文章中,我们了解了SEDA以及它的组件和用例。之后,我们探讨了如何使用SEDA来解决同样的问题,首先使用Spring Integration,然后使用Apache Camel。

As always, the source code for the examples is available over on GitHub.

像往常一样,这些例子的源代码可以在GitHub上找到