一、介绍
Redis的发布订阅拥有简单高效的实时消息传递,但需要根据业务需求权衡其持久化和可靠性限制,对于特殊场景,特殊业务还是建议使用专业的MQ, 如Kafka、RocketMQ。
发布订阅采用观察者模式 实现,包含三个核心组件:
(1)发布者(Publisher)
通过 PUBLISH 命令向指定频道发送消息
消息格式: PUBLISH channel_name message
返回值为当前订阅该频道的客户端数量
(2)订阅者(Subscriber)
通过 SUBSCRIBE 命令订阅一个或多个频道
可以订阅模式: PSUBSCRIBE pattern*
订阅后进入阻塞状态,等待消息到达
(3)频道(Channel)
消息传递的通道
支持通配符模式匹配
无需预先创建,自动管理
工作流程
1.订阅阶段 :客户端执行 SUBSCRIBE channel1 channel2
2.发布阶段 :另一个客户端执行 PUBLISH channel1 “Hello”
3.消息传递 :Redis服务器将消息推送给所有订阅channel1的客户端
4.接收处理 :订阅者收到消息格式: [“message”, “channel1”, “Hello”]
Redis发布订阅与消息队列(RocketMQ)对比
Redis发布订阅虽然有许多不足,但对一些业务小,要求不高的场景使用起来也是非常香的。
二、配置监听器、及消费者处理类
引入redis依赖,及配置这里就不展示了。
配置监听器
/**
* @author Redis消息监听配置
*/
@RequiredArgsConstructor
@Component
public class RedisMessageListenerConfig {
private final RedisMessageListenerContainer container;
private final MessageSubscriber messageSubscriber;
/**
* 默认是已经初始化了RedisMessageListenerContainer
*/
@PostConstruct
public void init(){
//这里默认是已经初始化了RedisMessageListenerContainer, 在此处仅仅添加消息监听器即可
//如果没有初始化RedisMessageListenerContainer,则需要先初始化,再添加消息监听器
container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));
container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));
}
/**
* 没有初始化RedisMessageListenerContainer,则需要先初始化
*/
// @Bean
// public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
// //自行进行配置,这里不再说明
// RedisMessageListenerContainer initContainer = new RedisMessageListenerContainer();
// initContainer.setConnectionFactory();
// ...
// initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));
// initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));
// return initContainer;
// }
}
消费者处理类
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageSubscriber {
public void messageHandler1(Message message) {
log.info("消息通知, message:{}",message);
try {
String string = message.toString();
//添加业务逻辑
} catch (Exception e) {
log.error("消费者异常,message:{} error:{},",message,e.getMessage());
}
}
public void messageHandler2(Message message) {
log.info("消息通知, message:{}",message);
try {
String string = message.toString();
//添加业务逻辑
} catch (Exception e) {
log.error("消费者异常,message:{} error:{},",message,e.getMessage());
}
}
}
三、生产者处理类
复制
@Service
@RequiredArgsConstructor
public class InterviewSessionServiceImpl {
private final RedisTemplate<String, Object> redisTemplate;
public void message1(String message1) {
redisTemplate.convertAndSend("Channel_Topic_1", message1);
}
public void message2(String message2) {
redisTemplate.convertAndSend("Channel_Topic_2", message2);
}
}

全部评论