PubSub Messaging with Spring Data Redis – 使用Spring Data Redis的PubSub消息传递

最后修改: 2016年 3月 17日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this second article from the series exploring Spring Data Redis, we’ll have a look at the pub/sub message queues.

在这篇探索Spring Data Redis系列的第二篇文章中,我们将看一下pub/sub消息队列。

In Redis, publishers are not programmed to send their messages to specific subscribers. Rather, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.

在Redis中,发布者没有被编程为将他们的消息发送给特定的订阅者。相反,发布的消息被定性为通道,不知道可能有哪些(如果有的话)订阅者。

Similarly, subscribers express interest in one or more topics and only receive messages that are of interest, without knowledge of what (if any) publishers there are.

同样地,订阅者对一个或多个主题表示兴趣,并且只收到感兴趣的信息,而不知道有哪些(如果有的话)发布者。

This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology.

这种对发布者和订阅者的解耦可以允许更大的可扩展性和更动态的网络拓扑。

2. Redis Configuration

2.Redis配置

Let’s start adding the configuration which is required for the message queues.

让我们开始添加消息队列所需的配置。

First, we’ll define a MessageListenerAdapter bean which contains a custom implementation of the MessageListener interface called RedisMessageSubscriber. This bean acts as a subscriber in the pub-sub messaging model:

首先,我们将定义一个MessageListenerAdapter Bean,它包含MessageListener接口的自定义实现,称为RedisMessageSubscriber。这个Bean在pub-sub消息传递模型中充当订阅者。

@Bean
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

RedisMessageListenerContainer is a class provided by Spring Data Redis which provides asynchronous behavior for Redis message listeners. This is called internally and, according to the Spring Data Redis documentation – “handles the low level details of listening, converting and message dispatching.”

RedisMessageListenerContainer是由Spring Data Redis提供的一个类,它为Redis消息监听器提供异步行为。根据Spring Data Redis 文档,该类在内部被调用,并且 “处理监听、转换和消息分派的低级细节”。

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container 
      = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(jedisConnectionFactory()); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
}

We will also create a bean using a custom-built MessagePublisher interface and a RedisMessagePublisher implementation. This way, we can have a generic message-publishing API, and have the Redis implementation take a redisTemplate and topic as constructor arguments:

我们还将使用自定义的MessagePublisher接口和RedisMessagePublisher实现创建一个bean。这样,我们就可以有一个通用的消息发布 API,并让 Redis 实现接受 redisTemplatetopic 作为构造参数。

@Bean
MessagePublisher redisPublisher() { 
    return new RedisMessagePublisher(redisTemplate(), topic());
}

Finally, we’ll set up a topic to which the publisher will send messages, and the subscriber will receive them:

最后,我们将设置一个主题,发布者将向其发送消息,而订阅者将接收这些消息。

@Bean
ChannelTopic topic() {
    return new ChannelTopic("messageQueue");
}

3. Publishing Messages

3.发布消息

3.1. Defining the MessagePublisher Interface

3.1.定义MessagePublisher接口

Spring Data Redis does not provide a MessagePublisher interface to be used for message distribution. We can define a custom interface which will use redisTemplate in implementation:

Spring Data Redis并没有提供一个MessagePublisher接口用于消息发布。我们可以定义一个自定义接口,在实现中使用redisTemplate

public interface MessagePublisher {
    void publish(String message);
}

3.2. RedisMessagePublisher Implementation

3.2.RedisMessagePublisher 实现

Our next step is to provide an implementation of the MessagePublisher interface, adding message publishing details and using the functions in redisTemplate.

我们的下一步是提供一个MessagePublisher接口的实现,添加消息发布细节,并使用redisTemplate.中的函数。

The template contains a very rich set of functions for wide range of operations – out of which convertAndSend is capable of sending a message to a queue through a topic:

该模板包含一个非常丰富的函数集,用于广泛的操作–其中convertAndSend能够通过一个主题将消息发送到队列中。

public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    public RedisMessagePublisher(
      RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

As you can see, the publisher implementation is straightforward. It uses the convertAndSend() method of the redisTemplate to format and publish the given message to the configured topic.

正如你所看到的,发布者的实现是简单明了的。它使用redisTemplateconvertAndSend()方法来格式化和发布给定的消息到配置的主题。

A topic implements publish and subscribe semantics: when a message is published, it goes to all the subscribers who are registered to listen on that topic.

一个主题实现了发布和订阅的语义:当一个消息被发布时,它将被发送给所有在该主题上注册收听的订阅者。

4. Subscribing to Messages

4.订阅信息

RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface:

RedisMessageSubscriber实现了Spring Data Redis提供的MessageListener接口。

@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<String>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + message.toString());
    }
}

Note that there is a second parameter called pattern, which we have not used in this example. The Spring Data Redis documentation states that this parameter represents the, “pattern matching the channel (if specified)”, but that it can be null.

请注意,还有一个名为pattern的参数,我们在这个例子中没有使用。Spring Data Redis文档指出,这个参数代表 “与通道匹配的模式(如果指定的话)”,但它可以是null

5. Sending and Receiving Messages

5.发送和接收信息

Now we’ll put it all together. Let’s create a message and then publish it using the RedisMessagePublisher:

现在我们要把这一切放在一起。让我们创建一个消息,然后使用RedisMessagePublisher发布它。

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

When we call publish(message), the content is sent to Redis, where it is routed to the message queue topic defined in our publisher. Then it is distributed to the subscribers of that topic.

当我们调用publish(message)时,内容被发送到Redis,在那里它被路由到我们发布者中定义的消息队列主题。然后,它被分发给该主题的订阅者。

You may already have noticed that RedisMessageSubscriber is a listener, which registers itself to the queue for retrieval of messages.

你可能已经注意到,RedisMessageSubscriber是一个监听器,它将自己注册到队列中以获取消息。

On the arrival of the message, the subscriber’s onMessage() method defined triggered.

在消息到达时,订阅者的onMessage()方法被定义为触发。

In our example, we can verify that we’ve received messages that have been published by checking the messageList in our RedisMessageSubscriber:

在我们的例子中,我们可以通过检查RedisMessageSubscriber中的messageList来验证我们是否已经收到了已经发布的消息。

RedisMessageSubscriber.messageList.get(0).contains(message)

6. Conclusion

6.结论

In this article, we examined a pub/sub message queue implementation using Spring Data Redis.

在这篇文章中,我们研究了一个使用Spring Data Redis的pub/sub消息队列实现。

The implementation of the above example can be found in a GitHub project.

上述例子的实现可以在一个GitHub项目中找到

1.概述

1.概述

在这篇探索Spring Data Redis系列的第二篇文章中,我们将看看pub/sub消息队列。

在 Redis 中,发布者没有被编程为将其消息发送给特定的订阅者。相反,发布的消息被定性为通道,不知道可能有哪些(如果有的话)订阅者。

类似地,订阅者对一个或多个主题表示兴趣,并且只接收感兴趣的消息,而不知道有哪些(如果有)发布者。

这种对发布者和订阅者的解耦可以实现更大的可扩展性和更动态的网络拓扑结构。

2.Redis 配置

2.Redis 配置

让我们开始添加消息队列所需的配置。

首先,我们将定义一个MessageListenerAdapter Bean,它包含MessageListener接口的自定义实现,名为RedisMessageSubscriber。这个bean在pub-sub消息模型中充当订阅者:

@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber())。
}

RedisMessageListenerContainer是由Spring Data Redis提供的一个类,它为Redis消息监听器提供异步行为。

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container
      = new RedisMessageListenerContainer()。
    container.setConnectionFactory(jedisConnectionFactory())。
    container.addMessageListener(messageListener(), topic())。
    返回容器。
}

我们还将使用自定义的MessagePublisher接口和RedisMessagePublisher实现创建一个bean。

@Bean
MessagePublisher redisPublisher() {
    return new RedisMessagePublisher(redisTemplate(), topic() )。
}

最后,我们将设置一个主题,发布者将向其发送消息,而订阅者将接收这些消息:

@Bean
ChannelTopic topic() {
return new ChannelTopic("messageQueue")。
}

3.发布消息

3.发布消息

3.1.定义MessagePublisher接口

3.1.定义MessagePublisher接口

Spring Data Redis并没有提供用于消息发布的MessagePublisher接口。我们可以定义一个自定义接口,它将在实现中使用redisTemplate

public interface MessagePublisher {
    void publish(String message);
}

3.2.RedisMessagePublisher实现

3.2.RedisMessagePublisher实现

我们的下一步是提供一个MessagePublisher接口的实现,增加消息发布的细节并使用redisTemplate.的功能。

该模板包含了一套非常丰富的函数,用于广泛的操作–其中convertAndSend能够通过一个主题将消息发送到队列中:

public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    Private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    公共 RedisMessagePublisher(
      RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.topic = topic。
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message)。
    }
}

正如你所看到的,发布者的实现是直接的。它使用redisTemplateconvertAndSend()方法来格式化和发布给定的消息到配置的主题。

一个主题实现了发布和订阅语义:当一个消息被发布时,它将被发送到所有在该主题上注册收听的订阅者那里。

4.Subscribing to Messages

4.Subscribing to Messages

RedisMessageSubscriber实现了Spring Data Redis提供的MessageListener接口:

@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<String> ();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString())。
        System.out.println("收到的消息。" + message.toString())。
    }
}

注意,有一个名为pattern的第二个参数,我们在这个例子中没有使用。Spring Data Redis 文档指出,该参数代表 “与通道相匹配的模式(如果指定)”,但它可以是null

5.发送和接收信息

5.发送和接收信息

现在我们将把这一切放在一起。让我们创建一个消息,然后使用RedisMessagePublisher发布它:

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

当我们调用publish(message)时,内容被发送到 Redis,在那里它被路由到我们的发布者中定义的消息队列主题。然后它被分发到该主题的订阅者。

你可能已经注意到,RedisMessageSubscriber是一个监听器,它将自己注册到队列中以检索消息。

在消息到达时,订阅者的onMessage()方法被定义为被触发。

在我们的例子中,我们可以通过检查RedisMessageSubscriber中的messageList来验证我们是否已经收到已经发布的消息:

RedisMessageSubscriber.messageList.get(0).contains(message)

6.Conclusion

6.Conclusion

在这篇文章中,我们研究了一个使用Spring Data Redis的pub/sub消息队列实现。

上述例子的实现可以在GitHub项目中找到。

上述例子的实现可以在GitHub项目中找到。