Event Externalization with Spring Modulith – 使用 Spring Modulith 实现事件外部化

最后修改: 2024年 1月 26日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll discuss the need to publish messages within a @Transactional block and the associated performance challenges, such as prolonged database connection times. To tackle this, we’ll utilize Spring Modulith‘s features to listen to Spring application events and automatically publish them to a Kafka topic.

在本文中,我们将讨论在@Transactional块中发布消息的需求以及相关的性能挑战,例如数据库连接时间延长。为了解决这一问题,我们将利用 Spring Modulith 的功能来监听 Spring 应用程序事件,并将其自动发布到 Kafka 主题。

2. Transactional Operations and Message Brokers

2.事务性业务和信息中介

For the code examples of this article, we’ll assume we’re writing the functionality responsible for saving an Article on Baeldung:

在本文的代码示例中,我们将假设我们正在编写负责在 Baeldung 上保存 Article 的功能:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... other business logic
        
        articleRepository.save(article);
    }
}

Additionally, we’ll need to notify other parts of the system about this new Article. With this information, other modules or services will react accordingly, creating reports or sending newsletters to the website’s readers.

此外,我们还需要向系统的其他部分通知这篇新的文章。有了这些信息,其他模块或服务就会做出相应的反应,创建报告或向网站读者发送新闻邮件。

The easiest way to achieve this is to inject a dependency who knows how to publish this event. For our example, let’s use KafkaOperations to send a message to the “baeldung.articles.published” topic and use the Article‘s slug() as the key:

最简单的方法是注入一个知道如何发布该事件的依赖关系。在我们的示例中,让我们使用 KafkaOperations 向”baeldung.articles.published“主题发送消息,并使用 Articleslug() 作为关键字:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}

However, this approach is not ideal for a few different reasons. From a design point of view, we have coupled the domain service with the message producer. Moreover, the domain service directly depends on the lower-level component, breaking one of the fundamental Clean Architecture rules.

然而,由于一些不同的原因,这种方法并不理想。从设计的角度来看,我们将域服务与消息生成器耦合在一起。此外,领域服务直接依赖于底层组件,这违反了基本的清洁架构规则之一。

Furthermore, this approach will also have performance implications because everything is happening within a @Transactional method. As a result, the database connection acquired for saving the Article will be kept open until the message is successfully published.

此外,这种方法还会对性能产生影响,因为所有事情都是在 @Transactional 方法中发生的。因此,为保存文章而获取的数据库连接将一直处于打开状态,直到消息成功发布。

Lastly, this solution also creates an error-prone relationship between persisting the data and publishing the message:

最后,这种解决方案还在持久化数据和发布信息之间建立了一种容易出错的关系:

  • If the producer fails to publish the message, the transaction will be rolled back;
  • The transaction can eventually be rolled back even if the message was already published;

3. Dependency Inversion Using Spring Events

3.使用 Spring 事件进行依赖反转

We can leverage Spring Events to improve the design of our solution. Our goal is to avoid publishing the messages to Kafka directly from our domain service. Let’s remove the KafkaOperations dependency and publish an internal application event instead:

我们可以利用 Spring Events 来改进我们的解决方案设计。我们的目标是避免直接从域服务向 Kafka 发布消息。让我们移除 KafkaOperations 依赖关系,转而发布内部应用程序事件:

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}

In addition to this, we’ll have a dedicated Kafka producer as part of our infrastructure layer. This component will listen to the ArticlePublishedEvents and delegate the publishing to the underlying KafkaOperations bean:

除此之外,我们还将有一个专用的 Kafka 生产者作为基础架构层的一部分。该组件将监听 ArticlePublishedEvents 并将发布委托给底层 KafkaOperations bean:

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor 

    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.splug(), event);
    }
}

With this abstraction, the infrastructure component now depends on the event produced by the domain service. In other words, we’ve managed to reduce the coupling and invert the source code dependency. Furthermore, if other modules are interested in the Article creation, they can now seamlessly listen to these application events and react accordingly.

通过这种抽象,基础架构组件现在依赖于领域服务产生的事件。换句话说,我们成功地降低了耦合度并反转了源代码依赖性。此外,如果其他模块对 Article 的创建感兴趣,它们现在可以无缝地监听这些应用程序事件并做出相应的反应。

On the other hand, the publish() method will be called from within the same transaction as our business logic. Indirectly, the two operations remain coupled relative to the fact that the failure of either can cause the failure or rollback of the other.

另一方面,publish() 方法将在与业务逻辑相同的事务中调用。由于其中一个操作的失败会导致另一个操作的失败或回滚,因此这两个操作仍然间接耦合。

4. Atomic vs. Non-atomic Operations

4. 原子运算与非原子运算

Now, let’s delve into the performance considerations. To begin, we must determine whether rolling back when the communication with the message broker fails is the desired behavior. This choice varies based on the specific context.

现在,让我们深入考虑性能问题。首先,我们必须确定当与消息代理的通信失败时,回滚是否是所需的行为。这种选择因具体情况而异。

In case we do not need this atomicity, it’s imperative to free the database connection and publish the events asynchronously. To simulate this, we can try to create an article without a slug, causing ArticlePublishedKafkaProducer::publish to fail:

如果我们不需要这种原子性,就必须释放数据库连接并异步发布事件。为了模拟这种情况,我们可以尝试创建一篇没有 slug 的文章,这会导致 ArticlePublishedKafkaProducer::publish 失败:

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");

    baeldung.createArticle(article);

    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Introduction to Spring Boot", "John Doe");
}

If we run the test now, it will fail. This happens because ArticlePublishedKafkaProducer throws an exception that will cause the domain service to roll back the transaction. However, we can make the event listener asynchronous by replacing the @EventListener annotation with @TransactionalEventListener and @Async:

如果我们现在运行测试,将会失败。出现这种情况的原因是 ArticlePublishedKafkaProducer 引发了异常,这将导致域服务回滚事务。不过,我们可以将 @EventListener 注解替换为 @TransactionalEventListener @Async: 从而使事件监听器成为异步的。

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}

If we re-run the test now, we’ll notice that the exception is logged, the event was not published, and the entity is saved to the database. Moreover, the database connection was released sooner,  allowing other threads to use it.

如果现在重新运行测试,我们会发现异常已被记录,事件未被发布,实体已被保存到数据库中。此外,数据库连接被提前释放,允许其他线程使用。

5. Event Externalization With Spring Modulith

5.使用 Spring Modulith 实现事件外部化

We successfully tackled the design and performance concerns of the original code example through a two-step approach:

我们通过两步方法成功解决了原始代码示例在设计和性能方面的问题:

  • Dependency inversion using Spring application events
  • Asynchronous publishing utilizing @TransactionalEventListener and @Async

 Spring Modulith allows us to further simplify our code, providing built-in support for this pattern. Let’s start by adding the maven dependencies for spring-modulith-events-api to our pom.xml:

Spring Modulith 允许我们进一步简化代码,为这种模式提供内置支持。首先,让我们将 spring-modulith-events-api 的 maven 依赖项添加到 pom.xml 中:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.3</version>
</dependency>

This module can be configured to listen to application events and automatically externalize them to various message systems. We’ll stick to our original example and focus on Kafka. For this integration, we’ll need to add the spring-modulith-events-kafka dependency:

该模块可配置为监听应用程序事件,并自动将其外部化到各种消息系统我们将继续使用最初的示例,重点关注 Kafka。为了实现这一集成,我们需要添加 spring-modulith-events-kafka 依赖关系:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.3</version>
    <scope>runtime</scope> 
</dependency>

Now, we need to update the ArticlePublishedEvent and annotate it with @Externalized. This annotation requires the name and the key of the routing target. In other words, the Kafka topic and the message key. For the key, we’ll use a SpEL expression that will invoke Article::slug():

现在,我们需要更新 ArticlePublishedEvent 并用 @Externalized 对其进行注解。该注解需要路由目标的名称和关键字。换句话说,就是 Kafka 主题和消息键。对于关键字,我们将使用 SpEL 表达式,该表达式将调用 Article::slug()

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

6. The Event Publication Registry

6.事件发布登记处

As previously discussed, we still have an error-prone relationship between persisting the data and publishing the message –  ailing to publish the message causes the transaction to roll back. On the other hand, even if the message is successfully published, the transaction can still roll back later.

如前所述,我们在持久化数据和发布消息之间仍然存在容易出错的关系–未能发布消息会导致事务回滚。另一方面,即使成功发布了消息,事务稍后仍可能回滚。

Spring Modulith’s event publication registry implements the “transactional outbox” pattern to tackle this problem, ensuring eventual consistency across the system. When a transactional operation happens, instead of immediately sending a message to an external system, the event is stored in an event publishing log within the same business transaction.

Spring Modulith 的事件发布注册表实现了 “事务发件箱 “模式来解决这一问题,从而确保整个系统的最终一致性。当事务操作发生时,该事件不会立即发送消息到外部系统,而是存储在同一业务事务中的事件发布日志中。

6.1. The Event Publishing Log

6.1.事件发布日志

First, we’ll need to introduce the spring-modulith-starter dependency that corresponds to our persistence technology. We can consult the official documentation for a complete list of the supported starters. Since we use Spring Data JPA and a PostgreSQL database, we’ll add the spring-modulith-starter-jpa dependency:

首先,我们需要引入与持久化技术相对应的 spring-modulith-starter 依赖关系。我们可以查阅官方文档,了解所支持启动器的完整列表。由于我们使用 Spring Data JPA 和 PostgreSQL 数据库,因此我们将添加 spring-modulith-starter-jpa 依赖项:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
    <version>1.1.2</version>
</dependency>

Additionally, we’ll enable Spring Modulith to create the “event_publication” table. This table contains the relevant data about the externalized application events. Let’s add the following property to our application.yml:

此外,我们还将启用 Spring Modulith 创建 “event_publication “表。该表包含外部化应用程序事件的相关数据。让我们在 application.yml 中添加以下属性:

spring.modulith:
  events.jdbc-schema-initialization.enabled: true

Our setup uses Testcontainer to spin up a Docker Container with the PostgreSQL database. As a result, we can leverage the Testcontainers Desktop application to “freeze the container shutdown” and “open a terminal” attached to the container itself. Then, we can inspect the database using the following commands:

我们的设置使用 Testcontainer 来启动一个带有 PostgreSQL 数据库的 Docker 容器。因此,我们可以利用 Testcontainers Desktop 应用程序来”冻结容器关闭“并”打开终端”附加到容器本身。然后,我们可以使用以下命令检查数据库:

  • “psql -U test_user -d test_db” – to open the PostgreSQL interactive terminal
  • “\d” – to list the database objects

TestcontainersDesktop

As we can see, the “even_publication” table was successfully created. Let’s execute a query to see the events persisted by our tests:

我们可以看到,”even_publication“表已成功创建。让我们执行一次查询,查看测试持久化的事件:

testcontainer_events-1

On the first row, we can see the event created by our first test, which covered the happy flow. However, in the second test, we intentionally created an invalid event, by omitting “slug“, to simulate a failure during event publication. Since this Article was saved to the database but not successfully published, it appears in the events_publication table with a missing completion_date.

在第一行中,我们可以看到第一个测试创建的事件,它涵盖了完整的流程。但是,在第二个测试中,我们故意省略了”slug“,创建了一个无效事件,以模拟事件发布过程中的失败。由于该 Article 已保存到数据库,但未成功发布,因此它出现在 events_publication 表中,但缺少 completion_date

6.2. Resubmitting Events

6.2.重新提交事件

We can enable Spring Modulith to automatically resubmit events upon application restart through the republish-outstanding-events-on-restart property:

我们可以通过republish-outstanding-events-on-restart属性启用 Spring Modulith 在应用程序重新启动时自动重新提交事件

spring.modulith:
  republish-outstanding-events-on-restart: true

Furthermore, we can use the IncompleteEventPublications bean to programmatically re-submit the failed events older than a given time:

此外,我们还可以使用 IncompleteEventPublications Bean 以编程方式重新提交早于给定时间的失败事件

@Component
class EventPublications {
    private final IncompleteEventPublications incompleteEvents;
    private final CompletedEventPublications completeEvents;

    // constructor

    void resubmitUnpublishedEvents() {
        incompleteEvents.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
    }
}

Similarly, we can use the CompletedEventPublications bean to easily query or clear the event_publications table:

同样,我们可以使用 CompletedEventPublications bean 来轻松查询或清除 event_publications 表:

void clearPublishedEvents() {
    completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
}

7. Event Externalization Configuration

7.事件外部化配置

Even though the @Externalized annotation’s value is useful for concise SpEL expressions, there are situations where we might want to avoid using it:

尽管 @Externalized 注解的值对于简洁的 SpEL 表达式非常有用,但在某些情况下,我们可能还是要避免使用它:

  • In cases where the expression becomes overly complex
  • When we aim to separate information about the topic from the application event
  • If we want distinct models for the application event and the externalized event

For these use cases, we can configure the necessary routing and event mapping using EventExternalizationConfiguration’s builder. After that, we simply need to expose this configuration as a Spring bean:

对于这些用例,我们可以使用 EventExternalizationConfiguration’s 的构建器配置必要的路由和事件映射。之后,我们只需将此配置作为 Spring Bean 暴露即可:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}

The EventExternalizationConfiguration enables us to define the routing and mapping of application events in a declarative way. Moreover, it lets us handle various types of application events. For example, if we need to handle an additional event like with “WeeklySummaryPublishedEvent”, we can easily do it by adding one more type-specific routing and mapping:

EventExternalizationConfiguration 使我们能够以声明的方式定义应用程序事件的 路由 映射。此外, 它还允许我们处理各种类型的应用程序事件。 例如,如果我们需要处理额外的事件,如”WeeklySummaryPublishedEvent”,我们可以通过添加更多特定于类型的 路由 映射来轻松实现:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .route(
        WeeklySummaryPublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle())
      )
      .mapping(
        WeeklySummaryPublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.handle(), it.heading())
      )
      .build();
}

As we observe, the mappings and routings need two things: the type itself and a function to resolve the Kafka topic and payload. In our example, both application events will be mapped to a common type and sent to the same topic.

正如我们所观察到的,映射路由需要两样东西:类型本身和一个用于解析 Kafka 主题和有效负载的函数。在我们的示例中,两个应用程序事件都将映射到一个通用类型,并发送到同一个主题。

Additionally, since we now declare the routing in the configuration, we can remove this information from the event itself. Consequently, the event will only have the @Externalized annotation, with no value:

此外,由于我们现在在配置中声明了路由,因此可以从事件本身移除此信息。因此,事件将只有 @Externalized 注解,而没有任何值:

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

@Externalized
public record WeeklySummaryPublishedEvent(String handle, String heading) {
}

8. Conclusion

8.结论

In this article, we discussed the scenarios that require us to publish a message from within a transactional block. We discovered that this pattern can have big performance implications because it can block the database connection for a longer time.

在本文中,我们讨论了需要从事务块中发布消息的场景。我们发现,这种模式会对性能产生很大影响,因为它会阻塞数据库连接较长时间。

After that, we used Spring Modulith’s features to listen to Spring application events and automatically publish them to a Kafka topic. This approach allowed us to externalize the events asynchronously and free the database connection sooner.

之后,我们使用 Spring Modulith 的功能来监听 Spring 应用程序事件,并自动将其发布到 Kafka 主题。通过这种方法,我们可以异步地将事件外部化,并尽快释放数据库连接。

The complete source code can be found over on GitHub.

完整的源代码可以在 GitHub 上找到