Integration Patterns With Apache Camel – 使用Apache Camel的集成模式

最后修改: 2016年 12月 22日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

This article will cover some essential enterprise integration patterns (EIPs) supported by Apache Camel. Integration patterns help by providing solutions for standardized ways of integrating systems.

本文将介绍Apache Camel支持的一些基本企业集成模式(EIPs)。集成模式通过提供标准化的系统集成方式的解决方案来帮助。

If you need to first go over the basics of Apache Camel, definitely visit this article to brush up on the basics.

如果您需要首先了解Apache Camel的基础知识,一定要访问这篇文章以了解基础知识。

2. About EIPs

2.关于EIPs

Enterprise integration patterns are design patterns that aim to provide solutions for integration challenges. Camel provides implementations for many of these patterns. To see the full list of supported patterns, visit this link.

企业集成模式是旨在为集成挑战提供解决方案的设计模式。Camel为许多这些模式提供了实现。要查看支持的模式的完整列表,请访问这个链接

In this article, we’ll cover Content Based Router, Message Translator, Multicast, Splitter and Dead Letter Channel integration patterns.

在这篇文章中,我们将介绍基于内容的路由器、消息翻译器、多播、分离器和死信通道的集成模式。

2. Content Based Router

2.基于内容的路由器

Content Based Router is a message router which routes a message to its destination based on a message header, part of payload or basically anything from message exchange which we consider as content.

基于内容的路由器是一个消息路由器,它根据消息头、有效载荷的一部分或基本上任何我们认为是内容的消息交换,将消息路由到目的地。

It starts with choice() DSL statement followed by one or more when() DSL statements. Each when() contains a predicate expression which, if satisfied, will result in the execution of contained processing steps.

它以choice() DSL语句开始,后面是一个或多个when() DSL语句。每个when()都包含一个谓词表达式,如果满足该表达式,将导致执行所包含的处理步骤。

Let’s illustrate this EIP by defining a route which consumes files from one folder and moves them into two different folders depending on file extension. Our route is referenced in Spring XML file using custom XML syntax for Camel:

让我们通过定义一个路由来说明这个EIP,该路由从一个文件夹中消耗文件,并根据文件扩展名将其移动到两个不同的文件夹中。我们的路由是在Spring XML文件中使用Camel的自定义XML语法来引用的。

<bean id="contentBasedFileRouter" 
  class="com.baeldung.camel.file.ContentBasedFileRouter" />

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <routeBuilder ref="contentBasedFileRouter" />
</camelContext>

Route definition is contained in ContentBasedFileRouter class where files are routed from the source folder into two different destination folders depending on their extension.

路线定义包含在ContentBasedFileRouter类中,文件根据其扩展名从源文件夹被路由到两个不同的目标文件夹。

Alternatively, we could use Spring Java config approach here as opposed to using Spring XML file. To do that, we need to add an additional dependency to our project:

另外,我们可以在这里使用Spring Java配置方法,而不是使用Spring XML文件。要做到这一点,我们需要向我们的项目添加一个额外的依赖。

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-javaconfig</artifactId>
    <version>2.18.1</version>
</dependency>

The latest version of the artifact can be found here.

最新版本的工件可以在这里找到。

After that, we need to extend CamelConfiguration class and override routes() method which will reference ContentBasedFileRouter:

之后,我们需要扩展CamelConfiguration类并覆盖routes()方法,该方法将引用ContentBasedFileRouter

@Configuration
public class ContentBasedFileRouterConfig extends CamelConfiguration {

    @Bean
    ContentBasedFileRouter getContentBasedFileRouter() {
        return new ContentBasedFileRouter();
    }

    @Override
    public List<RouteBuilder> routes() {
        return Arrays.asList(getContentBasedFileRouter());
    }
}

The extension is evaluated using Simple Expression Language via simple() DSL statement which was intended to be used for evaluating Expressions and Predicates:

该扩展通过simple() DSL语句使用Simple Expression Language进行评估,该语句旨在用于评估表达式和谓词。

public class ContentBasedFileRouter extends RouteBuilder {

    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER_TXT 
      = "src/test/destination-folder-txt";
    private static final String DESTINATION_FOLDER_OTHER 
      = "src/test/destination-folder-other";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true").choice()
          .when(simple("${file:ext} == 'txt'"))
          .to("file://" + DESTINATION_FOLDER_TXT).otherwise()
          .to("file://" + DESTINATION_FOLDER_OTHER);
    }
}

Here we are additionally using otherwise() DSL statement in order to route all messages which don’t satisfy predicates given with when() statements.

这里我们额外使用otherwise() DSL语句,以路由所有不满足when()语句给出的谓词的消息。

3. Message Translator

3.信息翻译员

Since every system uses it’s own data format, it is frequently required to translate the message coming from another system into the data format supported by the destination system.

由于每个系统都使用它自己的数据格式,所以经常需要将来自另一个系统的信息翻译成目的系统支持的数据格式。

Camel supports MessageTranslator router which allows us to transform messages using either custom processor in the routing logic, using a specific bean to perform the transformation or by using transform() DSL statement.

Camel支持MessageTranslator路由器,它允许我们使用路由逻辑中的自定义处理器来转换消息,使用特定的Bean来执行转换,或者使用transform() DSL语句。

Example with using a custom processor can be found in the previous article where we defined a processor which prepends a timestamp to each incoming file’s filename.

使用自定义处理器的例子可以在上一篇文章中找到,我们定义了一个处理器,在每个传入文件的文件名前加上一个时间戳。

Let’s now demonstrate how to use Message Translator using transform() statement:

现在让我们演示如何使用transform()语句来使用消息翻译器。

public class MessageTranslatorFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER 
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .transform(body().append(header(Exchange.FILE_NAME)))
          .to("file://" + DESTINATION_FOLDER);
    }
}

In this example, we are appending the filename to file content via transform() statement for each file from the source folder and moving transformed files to a destination folder.

在这个例子中,我们通过transform()语句将源文件夹中的每个文件的文件名追加到文件内容中,并将转换后的文件移到目标文件夹中。

4. Multicast

4.组播

Multicast allows us to route the same message to a set of different endpoints and process them in a different way.

多播允许我们将同一消息路由到一组不同的端点并以不同的方式处理它们

This is possible by using multicast() DSL statement and then by listing the endpoints and processing steps within them.

这可以通过使用multicast() DSL语句,然后在其中列出端点和处理步骤。

By default, processing on different endpoints is not done in parallel, but this can be changed by using parallelProcessing() DSL statement.

默认情况下,不同端点上的处理不是平行进行的,但这可以通过使用parallelProcessing() DSL语句来改变。

Camel will use the last reply as the outgoing message after the multicasts by default. However, it is possible to define a different aggregation strategy to be used for assembling the replies from the multicasts.

默认情况下,Camel将使用最后一个回复作为组播后的出站消息。然而,可以定义一个不同的聚合策略,用于组装组播的回复。

Let’s see how Multicast EIP looks like on an example. We’ll multicast files from source folder onto two different routes where we’ll transform their content and send them to different destination folders. Here we use direct: component which allows us to link two routes together:

让我们看看多播EIP在一个例子中是怎样的。我们将从源文件夹组播文件到两个不同的路由,在那里我们将转换它们的内容并将它们发送到不同的目标文件夹。这里我们使用direct:组件,它允许我们将两条路由连接在一起。

public class MulticastFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER_WORLD 
      = "src/test/destination-folder-world";
    private static final String DESTINATION_FOLDER_HELLO 
      = "src/test/destination-folder-hello";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .multicast()
          .to("direct:append", "direct:prepend").end();

        from("direct:append")
          .transform(body().append("World"))
          .to("file://" + DESTINATION_FOLDER_WORLD);

        from("direct:prepend")
           .transform(body().prepend("Hello"))
           .to("file://" + DESTINATION_FOLDER_HELLO);
    }
}

5. Splitter

5.分割器

The splitter allows us to split the incoming message into a number of pieces and processing each of them individually. This is possible by using split() DSL statement.

分割器允许我们将收到的消息分割成若干块,并单独处理每一块。这可以通过使用split() DSL语句实现。

As opposed to Multicast, Splitter will change the incoming message, while Multicast will leave it as it is.

与多播相反,Splitter将改变传入的信息,而多播将保持其原样。

To demonstrate this on an example, we’ll define a route where each line from a file is split and transformed into an individual file which is then moved to a different destination folder. Each new file will be created with file name equal to file content:

为了在一个例子上证明这一点,我们将定义一个路由,其中文件的每一行都被分割并转化为一个单独的文件,然后被移动到一个不同的目标文件夹。每个新文件将被创建,文件名等于文件内容。

public class SplitterFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER  
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .split(body().convertToString().tokenize("\n"))
          .setHeader(Exchange.FILE_NAME, body())
          .to("file://" + DESTINATION_FOLDER);
    }
}

6. Dead Letter Channel

6.死信频道

It is common and it should be expected that sometimes problems can happen, for example, database deadlocks, which can cause a message not to be delivered as expected. However, in certain cases, trying again with a certain delay will help and a message will get processed.

有时会发生一些问题,例如数据库死锁,这是很常见的,也是应该预料到的,这可能会导致信息不能按预期传递。然而,在某些情况下,用一定的延迟再试一次会有帮助,信息会得到处理。

Dead Letter Channel allows us to control what happens with a message once it fails to be delivered. Using Dead Letter Channel we can specify whether to propagate the thrown Exception to the caller and where to route the failed Exchange.

“死信 “通道允许我们控制一旦消息传递失败会发生什么。使用 “死信 “通道,我们可以指定是否将抛出的异常传播给调用者以及将失败的Exchange路由到哪里。

When a message fails to be delivered, Dead Letter Channel (if used) will move the message to the dead letter endpoint.

当信息未能被送达时,死信通道(如果使用)将把信息转移到死信端点。

Let’s demonstrate this on an example by throwing an exception on the route:

让我们在一个例子上演示一下,在路由上抛出一个异常。

public class DeadLetterChannelFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";

    @Override
    public void configure() throws Exception {
        errorHandler(deadLetterChannel("log:dead?level=ERROR")
          .maximumRedeliveries(3).redeliveryDelay(1000)
          .retryAttemptedLogLevel(LoggingLevel.ERROR));

        from("file://" + SOURCE_FOLDER + "?delete=true")
          .process(exchange -> {
            throw new IllegalArgumentException("Exception thrown!");
        });
    }
}

Here we defined an errorHandler which logs failed deliveries and defines redelivery strategy. By setting retryAttemptedLogLevel(), each redelivery attempt will be logged with specified log level.

这里我们定义了一个errorHandler,它记录了失败的交付,并定义了重新交付的策略。通过设置retryAttemptedLogLevel(),每次重新交付的尝试都会被记录在指定的日志级别。

In order for this to be fully functional, we additionally need to configure a logger.

为了使其完全发挥作用,我们还需要配置一个记录器。

After running this test, following log statements are visible in a console:

运行这个测试后,在控制台中可以看到以下日志语句。

ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 0 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 1 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 2 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 3 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR dead:156 - Exchange[ExchangePattern: InOnly, 
BodyType: org.apache.camel.component.file.GenericFile, 
Body: [Body is file based: GenericFile[File.txt]]]

As you can notice, each redelivery attempt is being logged displaying Exchange for which delivery was not successful.

正如你所注意到的,每一次重新递送的尝试都会被记录下来,显示递送不成功的Exchange。

7. Conclusion

7.结论

In this article, we presented an introduction to integration patterns using Apache Camel and demonstrated them on few examples.

在这篇文章中,我们介绍了使用Apache Camel的集成模式,并在几个例子中进行了演示。

We demonstrated how to use these integration patterns and why they are beneficial for solving integration challenges.

我们演示了如何使用这些集成模式,以及为什么它们有利于解决集成挑战。

Code from this article can be found over on GitHub.

本文的代码可以在GitHub上找到over