Transaction Support in Spring Integration – Spring集成中的事务支持

最后修改: 2019年 10月 19日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll take a look at transaction support in the Spring Integration framework.

在本教程中,我们将了解Spring Integration框架中的事务支持。

2. Transactions in Message Flows

2.消息流中的事务

Spring provides support for synchronizing resources with transactions since the earliest versions. We often use it to synchronize transactions managed by multiple transaction managers.

从最早的版本开始,Spring就提供了对与事务同步的资源的支持。我们经常使用它来同步由多个事务管理器管理的事务。

For example, we can synchronize a JMS commit with a JDBC commit.

例如,我们可以将JMS提交与JDBC提交同步。

On the other hand, we also have more complex use cases in the message flows. They include synchronization of nontransactional resources as well as various types of transactional resources.

另一方面,我们在消息流中也有更复杂的用例。它们包括非交易性资源的同步,以及各种类型的交易性资源。

Typically, messaging flows can be initiated by two different types of mechanisms.

通常情况下,消息流可以由两种不同类型的机制启动。

2.1. Message Flows Initiated by a User Process

2.1.由用户进程发起的消息流

Some message flows depend on the initiation of third party processes, like triggering a message on some message channel or invocation of a message gateway method.

一些消息流依赖于第三方进程的启动,如在一些消息通道上触发消息或调用消息网关方法。

We configure transaction support for these flows through Spring’s standard transaction support. The flows don’t have to be configured explicitly by Spring Integration to support transactions. The Spring Integration message flow naturally honors the transactional semantics of the Spring components.

我们通过 Spring 的标准交易支持来为这些流配置交易支持。这些流不需要由 Spring Integration 明确配置以支持交易。Spring Integration的消息流自然会尊重Spring组件的事务性语义。

For example, we can annotate a ServiceActivator or its method with @Transactional:

例如,我们可以用@Transactional来注解一个ServiceActivator或其方法。

@Transactional
public class TxServiceActivator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void storeTestResult(String testResult) {
        this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
        log.info("Test result is stored: {}", testResult);
    }
}

We can run the storeTestResult method from any component, and the transactional context will apply as usual. With this approach, we have full control over the transaction configuration.

我们可以从任何组件中运行storeTestResult方法,事务性上下文将照常应用。通过这种方法,我们可以完全控制事务配置。

2.2. Message Flows Initiated by a Daemon Process

2.2.由一个守护进程发起的消息流

We often use this type of message flow for automation. For example, a Poller polling a message queue to initiate a new message flow with the polled message, or a scheduler scheduling the process by creating a new message and initiating a message flow at a predefined time.

我们经常将这种类型的消息流用于自动化。例如,一个Poller轮询一个消息队列,用轮询的消息启动一个新的消息流,或者一个调度器通过创建一个新的消息并在预定的时间启动一个消息流来调度这个过程。

In essence, these are trigger-based flows initiated by a trigger process (a daemon process). For these flows, we have to provide some transaction configuration to create a transaction context whenever a new message flow begins.

从本质上讲,这些是基于触发器的流,由一个触发器进程(一个守护进程)发起。对于这些流,我们必须提供一些事务配置,以便在新消息流开始时创建一个事务上下文。

Through the configuration, we delegate the flows to Spring’s existing transaction support.

通过配置,我们将流量委托给Spring的现有事务支持。

We’ll focus on transaction support for this type of message flow through the rest of the article.

我们将通过文章的其余部分重点讨论对这种类型的消息流的交易支持。

3. Poller Transaction Support

3.投票器事务支持

Poller is a common component in integration flows. It periodically retrieves the data from various sources and passes it on through the integration chain.

Poller是集成流程中的一个常见组件。它周期性地从各种来源检索数据,并通过集成链进行传递。

Spring Integration provides transactional support for pollers out of the box. Any time we configure a Poller component, we can provide transactional configuration:

Spring Integration为投票器提供了开箱即用的事务性支持。在我们配置Poller组件的任何时候,我们都可以提供事务性配置。

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
    ...
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

private TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
      .transactionManager(txManager)
      .build();
}

We have to provide a reference to a TransactionManager and a custom TransactionSynchronizationFactory, or we can rely on the defaults. Internally, Spring’s native transaction wraps the process. As a result, all message flows initiated by this poller are transactional.

我们必须提供对TransactionManager和自定义TransactionSynchronizationFactory的引用,或者我们可以依赖默认值。在内部,Spring的本地事务包装了这个过程。因此,由该轮询器发起的所有消息流都是事务性的。

4. Transaction Boundaries

4.事务界限

When a transaction is started, the transaction context is always bound to the current thread. Regardless of how many endpoints and channels we might have in our message flow, our transaction context will always be preserved as long as the flow lives in the same thread.

当一个事务被启动时,事务上下文总是与当前线程绑定。无论我们的消息流中有多少个端点和通道,只要消息流存在于同一个线程中,我们的事务上下文就会一直被保留下来。

If we break it by initiating a new thread in some service, we’ll break the Transactional boundary as well. Essentially, the transaction will end at that point.

如果我们在某个服务中启动一个新的线程来破坏它,我们也会破坏交易的边界。从本质上讲,交易将在这一点上结束。

If a successful handoff has transpired between the threads, the flow will be considered a success. That will commit the transaction at that point, but the flow will continue, and it still might result in an Exception somewhere downstream.

如果线程之间的交接成功,流程将被认为是成功的。这时将提交事务,但流程仍将继续,而且仍可能导致下游某处出现Exception

Consequently, that Exception can get back to the initiator of the flow so that the transaction can end up in a rollback. That is why we have to use transactional channels at any point where a thread boundary can be broken.

因此,这个Exception可以回到流程的发起者那里,这样事务就会以回滚结束。这就是为什么我们必须在任何可能打破线程边界的地方使用事务通道

For example, we should use JMS, JDBC, or some other transactional channel.

例如,我们应该使用JMS、JDBC或其他一些事务性渠道。

5. Transaction Synchronization

5.事务同步化

In some use cases, it is beneficial to synchronize certain operations with a transaction that encompasses the entire flow.

在一些用例中,用一个包含整个流程的事务来同步某些操作是有益的。

For example, we’ll demonstrate how to use a Poller that reads an incoming file and, based on its contents, performs a database update. When the database operation completes, it also renames the file depending on the success of the operation.

例如,我们将演示如何使用一个Poller,它读取一个传入的文件,并根据其内容,执行数据库更新。当数据库操作完成后,它还会根据操作的成功与否重命名该文件。

Before we move to the example, it is crucial to understand that this approach synchronizes the operations on the filesystem with a transaction. It does not make the filesystem, which is not inherently transactional, actually become transactional.

在我们开始讨论这个例子之前,关键是要理解这种方法将文件系统上的操作与事务同步。它并没有使文件系统(其本身不是事务性的)真正成为事务性的。

The transaction starts before the poll and either commits or rolls back when the flow completes, followed by the synchronized operation on the filesystem.

事务在轮询前开始,并在流程完成后提交或回滚,随后在文件系统上进行同步操作。

First, we define an InboundChannelAdapter with a simple Poller:

首先,我们定义一个InboundChannelAdapter和一个简单的Poller

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

Poller contains a reference to the TransactionManager, as explained earlier. Additionally, it also contains a reference to the TransactionSynchronizationFactory. This component provides the mechanism for synchronization of the filesystem operations with the transaction:

Poller包含对TransactionManager的引用,如前所述。此外,它还包含一个对TransactionSynchronizationFactory的引用。这个组件提供了文件系统操作与事务同步的机制。

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
      new ExpressionEvaluatingTransactionSynchronizationProcessor();

    SpelExpressionParser spelParser = new SpelExpressionParser();
 
    processor.setAfterCommitExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
 
    processor.setAfterRollbackExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

    return new DefaultTransactionSynchronizationFactory(processor);
}

If the transaction commits, TransactionSynchronizationFactory will rename the file by appending “.PASSED” to the filename. However, if it rolls back, it will append “.FAILED”.

如果事务提交,TransactionSynchronizationFactory将通过在文件名上添加”.PASSED “来重命名该文件。然而,如果它回滚,它将附加”.FAILED”。

The InputChannel transforms the payload using the FileToStringTransformer and delegates it to the toServiceChannel. This channel is bound to the ServiceActivator:

InputChannel使用FileToStringTransformer转换有效载荷,并将其委托给toServiceChannel。这个通道被绑定到ServiceActivator

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}
    
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
    return new FileToStringTransformer();
}

ServiceActivator reads the incoming file, which contains the student’s exam results. It writes the result in the database. If a result contains the string “fail”, it throws the Exception, which causes the database to rollback:

ServiceActivator 读取接收的文件,其中包含学生的考试成绩。它将结果写入数据库。如果一个结果包含字符串 “fail”,它将抛出Exception,这将导致数据库回滚。

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

    jdbcTemplate.update("insert into STUDENT values(?)", payload);

    if (payload.toLowerCase().startsWith("fail")) {
        log.error("Service failure. Test result: {} ", payload);
        throw new RuntimeException("Service failure.");
    }

    log.info("Service success. Test result: {}", payload);
}

After the database operation successfully commits or rolls back, the TransactionSynchronizationFactory synchronizes the filesystem operation with its outcome.

在数据库操作成功提交或回滚后,TransactionSynchronizationFactory将文件系统操作与其结果同步。

6. Conclusion

6.结论

In this article, we explained the transaction support in the Spring Integration framework. Additionally, we demonstrated how to synchronize the transaction with operations on a nontransactional resource like the filesystem.

在这篇文章中,我们解释了Spring Integration框架中的事务支持。此外,我们演示了如何将事务与文件系统等非事务性资源上的操作同步。

The complete source code for the example is available over on GitHub.

该示例的完整源代码可在GitHub上获得