Spring Integration Java DSL – Spring Integration Java DSL

最后修改: 2018年 8月 23日

中文/混合/英文(键盘快捷键:t)

1. Introduction 

1.引言

In this tutorial, we’ll learn about the Spring Integration Java DSL for creating application integrations.

在本教程中,我们将了解用于创建应用程序集成的Spring Integration Java DSL。

We’ll take the file-moving integration we built in Introduction to Spring Integration and use the DSL instead.

我们将采用我们在Introduction to Spring Integration中构建的文件移动集成,并使用DSL来代替。

2. Dependencies

2.依赖性

The Spring Integration Java DSL is part of Spring Integration Core.

Spring Integration Java DSL是Spring Integration Core的一部分。

So, we can add that dependency:

所以,我们可以添加这个依赖性。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

And to work on our file-moving application, we’ll also need Spring Integration File:

而要在我们的文件移动应用程序上工作,我们还需要Spring集成文件

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

3. Spring Integration Java DSL

3.Spring Integration Java DSL

Before the Java DSL, users would configure Spring Integration components in XML.

在Java DSL之前,用户会在XML中配置Spring Integration组件。

The DSL introduces some fluent builders from which we can easily create a complete a Spring Integration pipeline purely in Java. 

DSL介绍了一些流畅的构建器,通过这些构建器,我们可以轻松地用Java创建一个完整的Spring集成管道。

So, let’s say we wanted to create a channel that uppercases any data coming through the pipe.

因此,假设我们想创建一个通道,将通过管道的任何数据大写。

In the past, we might have done:

在过去,我们可能会这样做。

<int:channel id="input"/>

<int:transformer input-channel="input" expression="payload.toUpperCase()" />

And now we can instead do:

而现在我们反而可以做到。

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlows.from("input")
      .transform(String::toUpperCase)
      .get();
}

4. The File-Moving App

4.文件移动应用程序

To begin our file-moving integration, we’ll need some simple building blocks.

为了开始我们的文件移动整合,我们需要一些简单的构建模块。

4.1. Integration Flow

4.1.集成流程

The first building block we need is an integration flow, which we can get from the IntegrationFlows builder:

我们需要的第一个构件是一个集成流程,我们可以从集成流程生成器中得到它。

IntegrationFlows.from(...)

from can take several types, but in this tutorial, we will look at just three:

来自可以有多种类型,但在本教程中,我们将只看三种。

  • MessageSources
  • MessageChannels, and
  • Strings

We’ll talk about all three shortly.

我们很快就会谈论这三个问题。

After we have called from, some customization methods are now available to us:

在我们调用了from之后,现在有一些定制方法可以供我们使用。

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // add more components
  .get();

Ultimately, IntegrationFlows will always produce an instance of IntegrationFlow, which is the final product of any Spring Integration app.

最终,IntegrationFlows将始终产生一个IntegrationFlow的实例,这是任何Spring集成应用程序的最终产品。

This pattern of taking input, performing the appropriate transformations, and emitting the results is fundamental to all Spring Integration apps.

这种接受输入、执行适当的转换并发出结果的模式是所有Spring Integration应用程序的基础

4.2. Describing an Input Source

4.2.描述一个输入源

First, to move files, we’ll need to indicate to our integration flow where it should look for them, and for that, we need a MessageSource:

首先,为了移动文件,我们需要向我们的集成流程指出它应该在哪里寻找它们,为此,我们需要一个MessageSource:

@Bean
public MessageSource<File> sourceDirectory() {
  // .. create a message source
}

Simply put, a MessageSource is a place from which messages can come that are external to the application.

简单地说,MessageSource是一个地方,消息可以来自应用程序的外部

More specifically, we need something that can adapt that external source into the Spring messaging representation. And since this adaptation is focused on input, these are often called Input Channel Adapters.

更具体地说,我们需要的是能够将外部资源适应到Spring消息传递表示的东西。由于这种适应集中于输入,这些通常被称为输入通道适配器

The spring-integration-file dependency gives us an input channel adapter that’s great for our use case: FileReadingMessageSource:

spring-integration-file 依赖为我们提供了一个输入通道适配器,非常适合我们的用例。FileReadingMessageSource:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File(INPUT_DIR));
    return messageSource;
}

Here, our FileReadingMessageSource will be reading a directory given by INPUT_DIR and will create a MessageSource from it.

在这里,我们的FileReadingMessageSource将读取一个由INPUT_DIR给出的目录,并将从中创建一个MessageSource

Let’s specify this as our source in an IntegrationFlows.from invocation:

让我们在IntegrationFlows.frominvocation中指定它作为我们的源。

IntegrationFlows.from(sourceDirectory());

4.3. Configuring an Input Source

4.3.配置一个输入源

Now, if we are thinking about this as a long-lived application, we’ll probably want to be able to notice files as they come in, not just move the files that are already there at startup.

现在,如果我们考虑将其作为一个长期存在的应用程序,我们可能希望能够在文件进入时注意到它们,而不仅仅是在启动时移动已经存在的文件。

To facilitate this, from can also take extra configurers as further customization of the input source:

为了方便,from也可以采取额外的configurers作为输入源的进一步定制。

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

In this case, we can make our input source more resilient by telling Spring Integration to poll that source–our filesystem in this case–every 10 seconds.

在这种情况下,我们可以通过告诉Spring Integration每隔10秒轮询一次输入源–这里是指我们的文件系统–来使我们的输入源更具弹性。

And, of course, this doesn’t apply to just our file input source, we could add this poller to any MessageSource.

当然,这并不只适用于我们的文件输入源,我们可以将这个轮询器添加到任何MessageSource

4.4. Filtering Messages from an Input Source

4.4.过滤输入源的信息

Next, let’s suppose we want our file-moving application to move specific files only, say image files having jpg extension.

接下来,让我们假设我们的文件移动应用程序只移动特定的文件,例如具有jpg扩展的图像文件。

For this, we can use GenericSelector:

为此,我们可以使用GenericSelector

@Bean
public GenericSelector<File> onlyJpgs() {
    return new GenericSelector<File>() {

        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

So, let’s update our integration flow again:

因此,让我们再次更新我们的整合流程。

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs());

Or, because this filter is so simple, we could have instead defined it using a lambda:

或者,因为这个过滤器非常简单,我们可以用一个lambda来定义它

IntegrationFlows.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Handling Messages With Service Activators

4.5.使用服务激活器处理消息

Now that we have a filtered list of files, we need to write them to a new location.

现在我们有一个经过过滤的文件列表,我们需要把它们写到一个新的位置。

Service Activatorare what we turn to when we’re thinking about outputs in Spring Integration.

服务激活器s是我们在考虑Spring Integration中的输出时所求助的。

Let’s use the FileWritingMessageHandler service activator from spring-integration-file:

让我们使用来自spring-integration-fileFileWritingMessageHandler服务激活器。

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.

在这里,我们的FileWritingMessageHandler将把它收到的每个Message有效载荷写到OUTPUT_DIR

Again, let’s update:

再一次,让我们来更新。

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

And notice, by the way, the usage of setExpectReply. Because integration flows can be bidirectional, this invocation indicates that this particular pipe is one way.

顺便注意,setExpectReply的用法。因为集成流可以是双向的,这个调用表明这个特定的管道是单向的。

4.6. Activating Our Integration Flow

4.6.激活我们的集成流程

When we have added all our components we need to register our IntegrationFlow as a bean to activate it:

当我们添加了所有的组件后,我们需要将我们的IntegrationFlow注册为一个bean来激活它。

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.

get方法提取了一个IntegrationFlow实例,我们需要将其注册为Spring Bean。

As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.

一旦我们的应用程序上下文加载,我们IntegrationFlow中包含的所有组件就会被激活。

And now, our application will start moving files from the source directory to target directory.

现在,我们的应用程序将开始从源目录向目标目录移动文件。

5. Additional Components

5.附加组件

In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.

在我们基于DSL的文件移动应用中,我们创建了一个入站通道适配器,一个消息过滤器,和一个服务激活器。

Let’s look at a few other common Spring Integration components and see how we might use them.

让我们看看其他几个常见的Spring集成组件,看看我们如何使用它们。

5.1. Message Channels

5.1.信息通道

As mentioned earlier, a Message Channel is another way to initialize a flow:

如前所述,消息通道是另一种初始化流程的方式。

IntegrationFlows.from("anyChannel")

We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”

我们可以把它理解为 “请找到或创建一个名为anyChannel的通道Bean。然后,读取任何从其他流程输入anyChannel的数据。”

But, really it is more general-purpose than that.

但是,实际上它比这更具有通用性。

Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.

简单地说,通道将生产者与消费者抽象开来,我们可以把它看作是一个JavaQueue一个通道可以在流程中的任何一点插入

Let’s say, for example, that we want to prioritize the files as they get moved from one directory to the next:

比方说,我们想在文件从一个目录移到下一个目录时,对其进行优先排序。

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

Then, we can insert an invocation to channel in between our flow:

然后,我们可以在我们的流程之间插入对channel的调用。

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).

有几十个通道可供选择,一些更方便的通道用于并发、审计或中间持久化(想想Kafka或JMS缓冲区)。

Also, channels can be powerful when combined with Bridges.

另外,当与Bridges结合时,通道可以很强大。

5.2. Bridge

5.2.桥梁

When we want to combine two channels, we use a Bridge.

当我们想合并两个通道时,我们使用一个

Let’s imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:

让我们想象一下,我们不是直接写到输出目录,而是让我们的文件移动应用写到另一个通道。

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

Now, because we’ve simply written it to a channel, we can bridge from there to other flows.

现在,因为我们已经简单地将其写入一个通道,我们可以从那里桥接到其他流量

Let’s create a bridge that polls our holding tank for messages and writes them to a destination:

让我们创建一个桥,轮询我们的储存箱的信息,并把它们写到一个目的地。

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
      .handle(targetDirectory())
      .get();
}

Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:

同样,因为我们写到了一个中间通道,现在我们可以添加另一个流程,该流程接收这些相同的文件,并以不同的速度写入它们

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
      .handle(anotherTargetDirectory())
      .get();
}

As we can see, individual bridges can control the polling configuration for different handlers.

我们可以看到,个别桥接器可以控制不同处理程序的轮询配置。

As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.

一旦我们的应用程序上下文被加载,我们现在有一个更复杂的应用程序在运行,它将开始从源目录向两个目标目录移动文件。

6. Conclusion

6.结论

In this article, we saw various ways to use the Spring Integration Java DSL to build different integration pipelines.

在这篇文章中,我们看到了使用Spring Integration Java DSL来构建不同集成管道的各种方法。

Essentially, we were able to recreate the file-moving application from a previous tutorial, this time using pure java.

从本质上讲,我们能够重新创建前一个教程中的文件移动应用程序,这次使用的是纯java。

Also, we took a look at a few other components like channels and bridges.

此外,我们还看了一些其他组件,如通道和桥梁。

The complete source code used in this tutorial is available over on Github.

本教程中所使用的完整源代码可在Github上获得