1. Overview
Spring Integration makes it easy to use some Enterprise Integration Patterns. One of these ways is through its DSL.
Spring Integration使得使用一些企业集成模式变得容易。其中一种方式是通过其DSL。
In this tutorial, we’ll take a look at the DSL’s support for subflows for simplifying some of our configurations.
2. Our Task
Let’s say we have a sequence of integers that we want to separate into three different buckets.
And if we wanted to use Spring Integration to do this, we could start by creating three output channels:
而如果我们想用Spring Integration来做这件事,我们可以从创建三个输出通道开始。
- Numbers like 0, 3, 6 and 9 will go to the multipleOfThreeChannel
- Numbers like 1, 4, 7, and 10 will go to the remainderIsOneChannel
- And numbers like 2, 5, 8, and 11 go to the remainderIsTwoChannel
To see how helpful subflows can be, let’s start with what this will look like without subflows.
And then, we’ll use subflows to simplify our configuration with:
- publishSubscribeChannel
- routeToRecipients
- Filters, to configure our if-then logic
- Routers, to configure our switch logic
3. Prerequisites
Now before configuring our subflows, let’s create those output channels.
We’ll make these QueueChannels since that is a bit easier to demo:
public class SubflowsConfiguration {
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
boolean isRemainderIOne(Integer number) {
return number % 3 == 1;
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
Ultimately, these are where our grouped numbers will end up.
Note also that Spring Integration can easily start looking complex, so we’ll add a few helper methods for the sake of readability.
还要注意的是,Spring Integration很容易开始变得复杂,所以为了可读性,我们将添加一些辅助方法。
4. Solving Without Subflows
Now we need to define our flows.
Without subflows, the simple idea is to define three separate integration flows, one for each type of number.
We’ll send the same sequence of messages to each IntegrationFlow component, but the output messages for each component will be different.
4.1. Defining IntegrationFlow Components
First, let’s define each IntegrationFlow bean in our SubflowConfiguration class:
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
Our flow contains two endpoints – a Splitter followed by a Filter.
The filter does what it sounds like. But why do we also need a splitter? We’ll see this in a minute, but basically, it splits an input Collection into individual messages.
滤波器做了它听起来像的事情。但为什么我们还需要一个分割器呢?我们将在一分钟内看到这一点,但基本上,它将一个输入Collection 分割成单独的消息。
And, we can, of course, define two more IntegrationFlow beans in the same way.
4.2. Messaging Gateways
For each flow, we also need a Message Gateway.
Simply put, these abstract the Spring Integration Messages API away from the caller, similarly to how a REST service can abstract away HTTP:
简单地说,这些都是将Spring Integration Messages API从调用者那里抽象出来,类似于REST服务对HTTP的抽象。
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleOfThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
For each, we need to use the @Gateway annotation and specify the implicit name for the input channel, which is simply the name of the bean followed by “.input”. Note that we can use this convention because we are using lambda-based flows.
对于每一个,我们都需要使用@Gateway 注解,并指定输入通道的隐式名称,即简单的Bean的名称,后面跟着“.input”。请注意,我们可以使用这一惯例,因为我们使用的是基于lambda的流程。
These methods are the entry points into our flows.
4.3. Sending Messages and Checking Output
And now, let’s test:
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
private QueueChannel multipleOfThreeChannel;
private NumbersClassifier numbersClassifier;
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
Notice that we’ve sent the messages as a List, which is why we needed the splitter, to take the single “list message” and transform it into several “number messages”.
注意,我们以List的形式发送消息,这就是为什么我们需要分割器,将单一的 “列表消息 “转化为多个 “数字消息”。
We call receive with o to get the next available message without waiting. Since there are two multiples of three in our list, we’d expect to be able to call it twice. The third call to receive returns null.
receive, of course, returns a Message, so we call getPayload to extract the number.
Similarly, we could do the same for the other two.
So, that was the solution without subflows. We have three separate flows to maintain and three separate gateway methods.
What we’ll do now is replace the three IntegrationFlow beans with a single bean and the three gateway methods with a single one.
5. Using publishSubscribeChannel
The publishSubscribeChannel() method broadcasts messages to all subscribing subflows. This way, we can create one flow, instead of three.
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription ->
.subscribe(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderOne)
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderTwo)
In this way, the subflows are anonymous, meaning that they can’t be independently addressed.
Now, we only have one flow, so let’s edit our NumbersClassifier as well:
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
Now, since we have only one IntegrationFlow bean and one gateway method, we need only send our list once:
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
// same assertions as before
Note that from now on, only the integration flow definition will change so that we won’t show the test again.
6. Using routeToRecipients
Another way to achieve the same thing is routeToRecipients, which is nice because it has filtering built in.
Using this method, we can specify both channels and subflows for broadcasting.
6.1. recipient
In the code below we’ll specify multipleof3Channel, remainderIs1Channel, and remainderIsTwoChannel as the recipients based on our conditions:
在下面的代码中,我们将根据我们的条件指定multipleof3Channel, remainderIs1Channel, 和remainderIsTwoChannel作为收件人。
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.<Integer> recipient("multipleOfThreeChannel",
.<Integer> recipient("remainderIsOneChannel",
.<Integer> recipient("remainderIsTwoChannel",
We can also call recipient without a condition, and routeToRecipients will publish to that destination unconditionally.
6.2. recipientFlow
6.2. recipientFlow
And note that routeToRecipients allows us to define a complete flow, just like publishSubscribeChannel.
Let’s modify the above code and specify an anonymous subflow as the first recipient:
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
This subflow will receive the entire sequence of messages, so we need to filter like before to get the same behavior.
Again, one IntegrationFlow bean was enough for us.
Now let’s move on to the if-else components. One of them is Filter.
7. Using if-then Flows
We’ve already used Filter in all of the previous examples. The good news is that we can specify not only the condition for further processing but also a channel or a flow for the discarded messages.
We can think of discard flows and channels like an else block:
我们可以把丢弃流和通道看作是一个else 块:。
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree,
notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow
In this case, we’ve implemented our if-else routing logic:
- If the number is not a multiple of three, then discard those messages to the discard flow; we use a flow here since there is more logic needed to know its destination channel.
- In the discard flow, if the number isn’t of remainder one, then discard those messages to the discard channel.
8. switch-ing on a Computed Value
And finally, let’s try the route method, which gives us a bit more control than routeToRecipients. It’s nice because a Router can split the flow into any number of parts, whereas a Filter can only do two.
8.1. channelMapping
Let’s define our IntegrationFlow bean:
让我们来定义我们的IntegrationFlow Bean。
public IntegrationFlow classify() {
return classify -> classify.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleOfThreeChannel")
.channelMapping(1, "remainderIsOneChannel")
.channelMapping(2, "remainderIsTwoChannel"));
In the code above we calculate a routing key by performing the division:
route(p -> p % 3,...
Based on this key, we route the messages:
channelMapping(0, "multipleof3Channel")
8.2. subFlowMapping
Now, like with others, we can take more control by specifying a subflow, replacing channelMapping with subFlowMapping:
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
Or still more control by calling the handle method instead of the channel method:
或者通过调用handle 方法而不是channel 方法来实现更多控制。
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
In this case, the subflow would return to the main flow after the route() method, so there we’d need to specify the channel remainderIsTwoChannel.
在这种情况下,子流程将在 route() 方法之后返回到主流程,所以我们需要在那里指定通道remainderIsTwoChannel.。
9. Conclusion
In this tutorial, we’ve explored how to filter and route messages in some ways using subflows.
As usual, the complete source code is available on GitHub.