Using Subflows in Spring Integration – 在Spring集成中使用子流程

最后修改: 2018年 11月 20日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Spring Integration makes it easy to use some Enterprise Integration Patterns. One of these ways is through its DSL.

Spring Integration使得使用一些企业集成模式变得容易。其中一种方式是通过其DSL

In this tutorial, we’ll take a look at the DSL’s support for subflows for simplifying some of our configurations.

在本教程中,我们将看一下DSL对子流程的支持,以简化我们的一些配置。

2. Our Task

2.我们的任务

Let’s say we have a sequence of integers that we want to separate into three different buckets.

假设我们有一个整数序列,我们想把它分成三个不同的桶。

And if we wanted to use Spring Integration to do this, we could start by creating three output channels:

而如果我们想用Spring Integration来做这件事,我们可以从创建三个输出通道开始。

  • Numbers like 0, 3, 6 and 9 will go to the multipleOfThreeChannel
  • Numbers like 1, 4, 7, and 10 will go to the remainderIsOneChannel
  • And numbers like 2, 5, 8, and 11 go to the remainderIsTwoChannel

To see how helpful subflows can be, let’s start with what this will look like without subflows.

为了了解子流程有多大的帮助,让我们从没有子流程时的情况开始。

And then, we’ll use subflows to simplify our configuration with:

然后,我们将使用子流程来简化我们的配置。

  • publishSubscribeChannel
  • routeToRecipients
  • Filters, to configure our if-then logic
  • Routers, to configure our switch logic

3. Prerequisites

3.先决条件

Now before configuring our subflows, let’s create those output channels.

现在在配置我们的子流程之前,让我们创建这些输出通道。

We’ll make these QueueChannels since that is a bit easier to demo:

我们将使这些QueueChannels,因为这更容易演示。

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
 
    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

Ultimately, these are where our grouped numbers will end up.

最终,这些都是我们的分组数字的最终归宿。

Note also that Spring Integration can easily start looking complex, so we’ll add a few helper methods for the sake of readability.

还要注意的是,Spring Integration很容易开始变得复杂,所以为了可读性,我们将添加一些辅助方法。

4. Solving Without Subflows

4.无子流程的求解

Now we need to define our flows.

现在我们需要定义我们的流量。

Without subflows, the simple idea is to define three separate integration flows, one for each type of number.

如果没有子流程,简单的想法是定义三个独立的整合流程,每种类型的数字都有一个。

We’ll send the same sequence of messages to each IntegrationFlow component, but the output messages for each component will be different.

我们将向每个IntegrationFlow组件发送相同的消息序列,但每个组件的输出消息将是不同的。

4.1. Defining IntegrationFlow Components

4.1.定义IntegrationFlow组件

First, let’s define each IntegrationFlow bean in our SubflowConfiguration class:

首先,让我们在SubflowConfiguration类中定义每个IntegrationFlowbean。

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

Our flow contains two endpoints – a Splitter followed by a Filter.

我们的流程包含两个端点–一个Splitter,后面是一个Filter。

The filter does what it sounds like. But why do we also need a splitter? We’ll see this in a minute, but basically, it splits an input Collection into individual messages.

滤波器做了它听起来像的事情。但为什么我们还需要一个分割器呢?我们将在一分钟内看到这一点,但基本上,它将一个输入Collection 分割成单独的消息。

And, we can, of course, define two more IntegrationFlow beans in the same way.

当然,我们还可以用同样的方式定义另外两个IntegrationFlowBean。

4.2. Messaging Gateways

4.2.信息传递网关

For each flow, we also need a Message Gateway.

对于每个流程,我们还需要一个消息网关

Simply put, these abstract the Spring Integration Messages API away from the caller, similarly to how a REST service can abstract away HTTP:

简单地说,这些都是将Spring Integration Messages API从调用者那里抽象出来,类似于REST服务对HTTP的抽象。

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);

}

For each, we need to use the @Gateway annotation and specify the implicit name for the input channel, which is simply the name of the bean followed by “.input”. Note that we can use this convention because we are using lambda-based flows.

对于每一个,我们都需要使用@Gateway 注解,并指定输入通道的隐式名称,即简单的Bean的名称,后面跟着“.input”请注意,我们可以使用这一惯例,因为我们使用的是基于lambda的流程。

These methods are the entry points into our flows.

这些方法是进入我们流程的入口。

4.3. Sending Messages and Checking Output

4.3.发送消息和检查输出

And now, let’s test:

而现在,让我们来测试一下。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
 
    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;
    @Test
    public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

Notice that we’ve sent the messages as a List, which is why we needed the splitter, to take the single “list message” and transform it into several “number messages”.

注意,我们以List的形式发送消息,这就是为什么我们需要分割器,将单一的 “列表消息 “转化为多个 “数字消息”。

We call receive with o to get the next available message without waiting. Since there are two multiples of three in our list, we’d expect to be able to call it twice. The third call to receive returns null.

我们用o调用receive来获取下一条可用的消息,而无需等待。由于我们的列表中有两个三的倍数,我们希望能够调用它两次。对receive的第三次调用返回null

receive, of course, returns a Message, so we call getPayload to extract the number.

receive,当然会返回一个Message,所以我们调用getPayload来提取数字。

Similarly, we could do the same for the other two.

同样地,我们也可以对其他两个人做同样的事情。

So, that was the solution without subflows. We have three separate flows to maintain and three separate gateway methods.

所以,这就是没有子流程的解决方案。我们有三个独立的流程需要维护,有三个独立的网关方法。

What we’ll do now is replace the three IntegrationFlow beans with a single bean and the three gateway methods with a single one.

我们现在要做的是用一个Bean替换三个IntegrationFlowBean,用一个Bean替换三个网关方法。

5. Using publishSubscribeChannel

5.使用publishSubscribeChannel

The publishSubscribeChannel() method broadcasts messages to all subscribing subflows. This way, we can create one flow, instead of three.

publishSubscribeChannel()方法向所有订阅的子流程广播消息。这样,我们可以创建一个流程,而不是三个。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

In this way, the subflows are anonymous, meaning that they can’t be independently addressed.

通过这种方式,子流是匿名的,意味着它们不能被独立处理。

Now, we only have one flow, so let’s edit our NumbersClassifier as well:

现在,我们只有一个流程,所以让我们也编辑一下我们的NumbersClassifier

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

Now, since we have only one IntegrationFlow bean and one gateway method, we need only send our list once:

现在,由于我们只有一个IntegrationFlowbean和一个网关方法,我们只需要发送一次我们的列表:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

    // same assertions as before
}

Note that from now on, only the integration flow definition will change so that we won’t show the test again.

注意,从现在开始,只有集成流程的定义会发生变化,这样我们就不会再显示测试。

6. Using routeToRecipients

6.使用routeToRecipients

Another way to achieve the same thing is routeToRecipients, which is nice because it has filtering built in.

另一种实现相同目的的方法是routeToRecipients,这很好,因为它有内置的过滤功能。

Using this method, we can specify both channels and subflows for broadcasting. 

使用这种方法,我们可以为广播指定通道和子流。

6.1. recipient

6.1.接受者

In the code below we’ll specify multipleof3ChannelremainderIs1Channel, and remainderIsTwoChannel as the recipients based on our conditions:

在下面的代码中,我们将根据我们的条件指定multipleof3Channel, remainderIs1Channel, remainderIsTwoChannel作为收件人。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel", 
            this::isMultipleOfThree)       
          .<Integer> recipient("remainderIsOneChannel", 
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel", 
            this::isRemainderTwo));
}

We can also call recipient without a condition, and routeToRecipients will publish to that destination unconditionally.

我们也可以在没有条件的情况下调用recipientrouteToRecipients将无条件地发布到该目的地。

6.2. recipientFlow

6.2. recipientFlow

And note that routeToRecipients allows us to define a complete flow, just like publishSubscribeChannel. 

并注意到routeToRecipients允许我们定义一个完整的流程,就像publishSubscribeChannel。

Let’s modify the above code and specify an anonymous subflow as the first recipient:

让我们修改上面的代码,指定一个匿名子流作为第一个收件人

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("mutipleOfThreeChannel"))
  ...);

This subflow will receive the entire sequence of messages, so we need to filter like before to get the same behavior.

这个子流程将接收整个消息序列,所以我们需要像以前一样进行过滤以获得相同的行为。

Again, one IntegrationFlow bean was enough for us.

同样,一个IntegrationFlowBean对我们来说已经足够了。

Now let’s move on to the if-else components. One of them is Filter.

现在让我们继续讨论if-else组件。其中一个是Filter

7. Using if-then Flows

7.使用if-then流程

We’ve already used Filter in all of the previous examples. The good news is that we can specify not only the condition for further processing but also a channel or a flow for the discarded messages.

我们已经在前面所有的例子中使用了Filter。好消息是,我们不仅可以指定进一步处理的条件,还可以为被丢弃的消息指定一个通道或一个流程

We can think of discard flows and channels like an else block:

我们可以把丢弃流和通道看作是一个else 块:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleofThreeChannel");
}

In this case, we’ve implemented our if-else routing logic:

在这种情况下,我们已经实现了我们的if-else路由逻辑。

  • If the number is not a multiple of three, then discard those messages to the discard flow; we use a flow here since there is more logic needed to know its destination channel.
  • In the discard flow, if the number isn’t of remainder one, then discard those messages to the discard channel.

8. switch-ing on a Computed Value

8.在计算值上进行切换

And finally, let’s try the route method, which gives us a bit more control than routeToRecipients. It’s nice because a Router can split the flow into any number of parts, whereas a Filter can only do two.

最后,让我们试试route方法,它比routeToRecipients>给我们更多的控制。这很好,因为Router可以将流程分成任何数量的部分,而Filter只能做两个。

8.1. channelMapping

8.1.通道映射

Let’s define our IntegrationFlow bean:

让我们来定义我们的IntegrationFlow Bean。

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3, 
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

In the code above we calculate a routing key by performing the division:

在上面的代码中,我们通过执行除法计算出一个路由键。

route(p -> p % 3,...

Based on this key, we route the messages:

根据这个密钥,我们对信息进行路由。

channelMapping(0, "multipleof3Channel")

8.2. subFlowMapping

8.2.subFlowMapping

Now, like with others, we can take more control by specifying a subflow, replacing channelMapping with subFlowMapping:

现在,和其他的一样,我们可以通过指定一个子流程来进行更多的控制,用subFlowMapping代替channelMapping

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

Or still more control by calling the handle method instead of the channel method:

或者通过调用handle 方法而不是channel 方法来实现更多控制。

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
      // do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

In this case, the subflow would return to the main flow after the route() method, so there we’d need to specify the channel remainderIsTwoChannel.

在这种情况下,子流程将在 route() 方法之后返回到主流程,所以我们需要在那里指定通道remainderIsTwoChannel.

9. Conclusion

9.结语

In this tutorial, we’ve explored how to filter and route messages in some ways using subflows.

在本教程中,我们已经探讨了如何使用子流程以某些方式过滤和路由消息。

As usual, the complete source code is available on GitHub.

像往常一样,完整的源代码可以在GitHub上获得