Apache RocketMQ with Spring Boot – Apache RocketMQ与Spring Boot

最后修改: 2020年 1月 2日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll create a message producer and consumer using Spring Boot and Apache RocketMQ, an open-source distributed messaging and streaming data platform.

在本教程中,我们将使用Spring Boot和Apache RocketMQ(一个开源的分布式消息传递和流式数据平台)创建一个消息生产者和消费者。

2. Dependencies

2.依赖性

For Maven projects, we need to add the RocketMQ Spring Boot Starter dependency:

对于Maven项目,我们需要添加RocketMQ Spring Boot Starter依赖项。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

3. Producing Messages

3.制作信息

For our example, we’ll create a basic message producer that will send events whenever the user adds or removes an item from the shopping cart.

在我们的例子中,我们将创建一个基本的消息生产者,每当用户从购物车中添加或删除一个项目时,就会发送事件。

First, let’s set up our server location and group name in our application.properties:

首先,让我们在application.properties中设置我们的服务器位置和组名。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

Note that if we had more than one name server, we could list them like host:port;host:port.

注意,如果我们有一个以上的名称服务器,我们可以像host:port;host:port那样列出它们。

Now, to keep it simple, we’ll create a CommandLineRunner application and generate a few events during application startup:

现在,为了保持简单,我们将创建一个CommandLineRunner应用程序,并在应用程序启动时生成一些事件。

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

The CartItemEvent consists of just two properties – the id of the item and a quantity:

CartItemEvent只包括两个属性–物品的id和数量。

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

In the above example, we use the convertAndSend() method, a generic method defined by the AbstractMessageSendingTemplate abstract class, to send our cart events. It takes two parameters: A destination, which in our case is a topic name, and a message payload.

在上面的例子中,我们使用convertAndSend()方法,一个由AbstractMessageSendingTemplate抽象类定义的通用方法,来发送我们的购物车事件。它需要两个参数。一个目的地,在我们的例子中是一个主题名称,以及一个消息有效载荷。

4. Message Consumer

4.信息消费者

Consuming RocketMQ messages is as simple as creating a Spring component annotated with @RocketMQMessageListener and implementing the RocketMQListener interface:

消耗RocketMQ消息就像创建一个用@RocketMQMessageListener注释的Spring组件并实现RocketMQListener接口一样简单。

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

We need to create a separate component for every message topic we are listening for. In each of these listeners, we define the name of the topic and consumer group name through the @RocketMQMessageListener annotation.

我们需要为我们要监听的每个消息主题创建一个单独的组件。在每个监听器中,我们通过@RocketMQMessageListener注解定义主题名称和消费者组名称。

5. Synchronous and Asynchronous Transmission

5.同步和异步传输

In the previous examples, we used the convertAndSend method to send our messages. We have some other options, though.

在前面的例子中,我们使用convertAndSend方法来发送我们的信息。不过,我们还有一些其他的选择。

We could, for example, call syncSend which is different from convertAndSend because it returns SendResult object.

例如,我们可以调用syncSend,它与convertAndSend不同,因为它返回SendResult对象。

It can be used, for example, to verify if our message was sent successfully or get its id:

例如,它可以用来验证我们的信息是否被成功发送或获得其ID。

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

Like convertAndSend, this method is returned only when the sending procedure completes.

convertAndSend一样,该方法只在发送程序完成后返回。

We should use synchronous transmission in cases requiring high reliability, such as important notification messages or SMS notification.

在需要高可靠性的情况下,我们应该使用同步传输,如重要的通知信息或短信通知。

On the other hand, we may instead want to send the message asynchronously and be notified when the sending completes.

另一方面,我们可能反而想异步地发送消息,并在发送完成后得到通知。

We can do this with asyncSend, which takes a SendCallback as a parameter and returns immediately:

我们可以通过asyncSend来实现这一点,它需要一个SendCallback作为参数并立即返回。

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

We use asynchronous transmission in cases requiring high throughput.

我们在需要高吞吐量的情况下使用异步传输。

Lastly, for scenarios where we have very high throughput requirements, we can use sendOneWay instead of asyncSendsendOneWay is different from asyncSend in that it doesn’t guarantee the message gets sent.

最后,对于我们有非常高的吞吐量要求的场景,我们可以使用sendOneWay而不是asyncSendsendOneWayasyncSend的不同之处在于,它并不保证消息被发送。

One-way transmission can also be used for ordinary reliability cases, such as collecting logs.

单向传输也可用于普通的可靠性案例,如收集日志。

6. Sending Messages in Transaction

6.在事务中发送消息

RocketMQ provides us with the ability to send messages within a transaction. We can do it by using the sendInTransaction() method:

RocketMQ为我们提供了在一个事务中发送消息的能力。我们可以通过使用 sendInTransaction()方法来做到这一点。

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Also, we must implement a RocketMQLocalTransactionListener interface:

另外,我们必须实现一个RocketMQLocalTransactionListener接口。

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

In sendMessageInTransaction(), the first parameter is the transaction name. It must be the same as the @RocketMQTransactionListener‘s member field txProducerGroup.

sendMessageInTransaction()中,第一个参数是事务名称。它必须与@RocketMQTransactionListener的成员字段txProducerGroup.相同。

7. Message Producer Configuration

7.信息生产者配置

We can also configure aspects of the message producer itself:

我们还可以配置消息生产者本身的各个方面。

  • rocketmq.producer.send-message-timeout: The message send timeout in milliseconds – the default value is 3000
  • rocketmq.producer.compress-message-body-threshold: Threshold above which, RocketMQ will compress messages – the default value is 1024.
  • rocketmq.producer.max-message-size: The maximum message size in bytes – the default value is 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: The maximum number of retries to perform internally in asynchronous mode before sending failure – the default value is 2.
  • rocketmq.producer.retry-next-server: Indicates whether to retry another broker on sending failure internally – the default value is false.
  • rocketmq.producer.retry-times-when-send-failed: The maximum number of retries to perform internally in asynchronous mode before sending failure – the default value is 2.

8. Conclusion

8.结语

In this article, we’ve learned how to send and consume messages using Apache RocketMQ and Spring Boot. As always all source code is available on GitHub.

在这篇文章中,我们已经学会了如何使用Apache RocketMQ和Spring Boot来发送和消费消息。一如既往,所有的源代码都在GitHub上可用