使用redis的发布/订阅(Pub/Sub), 实现消息队列

23人浏览 / 0人评论 / 添加收藏

一、介绍
Redis的发布订阅拥有简单高效的实时消息传递,但需要根据业务需求权衡其持久化和可靠性限制,对于特殊场景,特殊业务还是建议使用专业的MQ, 如Kafka、RocketMQ。

发布订阅采用观察者模式 实现,包含三个核心组件:
(1)发布者(Publisher)

通过 PUBLISH 命令向指定频道发送消息
消息格式: PUBLISH channel_name message
返回值为当前订阅该频道的客户端数量
(2)订阅者(Subscriber)

通过 SUBSCRIBE 命令订阅一个或多个频道
可以订阅模式: PSUBSCRIBE pattern*
订阅后进入阻塞状态,等待消息到达
(3)频道(Channel)

消息传递的通道
支持通配符模式匹配
无需预先创建,自动管理
工作流程
1.订阅阶段 :客户端执行 SUBSCRIBE channel1 channel2
2.发布阶段 :另一个客户端执行 PUBLISH channel1 “Hello”
3.消息传递 :Redis服务器将消息推送给所有订阅channel1的客户端
4.接收处理 :订阅者收到消息格式: [“message”, “channel1”, “Hello”]

Redis发布订阅与消息队列(RocketMQ)对比
 

Redis发布订阅虽然有许多不足,但对一些业务小,要求不高的场景使用起来也是非常香的。

二、配置监听器、及消费者处理类
引入redis依赖,及配置这里就不展示了。

配置监听器
/**
* @author Redis消息监听配置
*/
@RequiredArgsConstructor
@Component
public class RedisMessageListenerConfig {
   private final RedisMessageListenerContainer container;
   private final MessageSubscriber messageSubscriber;

   /**
    * 默认是已经初始化了RedisMessageListenerContainer
    */
   @PostConstruct
   public void init(){
       //这里默认是已经初始化了RedisMessageListenerContainer, 在此处仅仅添加消息监听器即可
       //如果没有初始化RedisMessageListenerContainer,则需要先初始化,再添加消息监听器
       container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));
       container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));
   }

   /**
    * 没有初始化RedisMessageListenerContainer,则需要先初始化
    */
   // @Bean
   // public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
   //     //自行进行配置,这里不再说明
   //     RedisMessageListenerContainer initContainer = new RedisMessageListenerContainer();
   //     initContainer.setConnectionFactory();
   //     ...

   //     initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));
   //     initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));
   //     return initContainer;
   // }
}

消费者处理类
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageSubscriber {

   public void messageHandler1(Message message) {
       log.info("消息通知, message:{}",message);
       try {
           String string = message.toString();
           //添加业务逻辑
       } catch (Exception e) {
           log.error("消费者异常,message:{} error:{},",message,e.getMessage());
       }
   }

   public void messageHandler2(Message message) {
       log.info("消息通知, message:{}",message);
       try {
           String string = message.toString();
           //添加业务逻辑
       } catch (Exception e) {
           log.error("消费者异常,message:{} error:{},",message,e.getMessage());
       }
   }
}


三、生产者处理类
复制
@Service
@RequiredArgsConstructor
public class InterviewSessionServiceImpl {

   private final RedisTemplate<String, Object> redisTemplate;

   public void message1(String message1) {
       redisTemplate.convertAndSend("Channel_Topic_1", message1);
   }

   public void message2(String message2) {
       redisTemplate.convertAndSend("Channel_Topic_2", message2);
   }
}
 

全部评论