Redis实现队列监听器订阅数据
程序员文章站
2022-07-15 13:26:55
...
1、创建redis监听器
/**
* @author rambo
* Redis队列监听器
*/
@Configuration
public class RedisMessageListener {
/**
* 创建连接工厂
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageListenerAdapter listenerAdapterTest2){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//接受消息的key,这里的key相当于队列的主题topic
container.addMessageListener(listenerAdapter,new PatternTopic(RedisKeyConstant.appendElderPrefix(RedisKeyConstant.REDIS_LISTENER_TOPIC)));
//container.addMessageListener(listenerAdapterTest2,new PatternTopic("test2"));
return container;
}
/**
* 绑定消息监听者和接收监听(消费者)的方法
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(SendRedisMessage receiver){
return new MessageListenerAdapter(receiver,"receiveElderMessage");
}
/**
* 注册订阅者
* @param latch
* @return
*/
@Bean
SendRedisMessage receiver(CountDownLatch latch) {
return new SendRedisMessage(latch);
}
/**
* 计数器,用来控制线程
* @return
*/
@Bean
public CountDownLatch latch(){
//指定了计数的次数 1
return new CountDownLatch(1);
}
}
二、监听队列主题后的处理逻辑
@Slf4j
public class SendRedisMessage {
private CountDownLatch latch;
//自定义逻辑处理
@Autowired
private PushMessageToVolunteerApp pushMessageToVolunteerApp;
@Autowired
public SendRedisMessage(CountDownLatch latch) {
this.latch = latch;
}
/**
* 队列消息接收方法
*/
@Bean
public void receiveElderMessage(String json) {
log.info(TimeUtils.ldtToStandardString(LocalDateTime.now())+" 开始消费redis队列order数据,向app端推送消息,消息内容:{}",json);
try {
Map<String,String> map = new HashMap<>(3);
Gson gson = new Gson();
map = gson.fromJson(json,map.getClass());
//这个方法可以自定义(自定义处理逻辑)
pushMessageToVolunteerApp.pushElderOrderMessageToApp(map);
log.info(TimeUtils.ldtToStandardString(LocalDateTime.now())+" 消费redis消息队列order数据成功");
} catch (Exception e) {
log.error(TimeUtils.ldtToStandardString(LocalDateTime.now())+" 消费redis消息队列数据失败,失败信息:{}]", e.getMessage());
//推送失败的话可以保存到数据库,然后定时推送
}finally {
latch.countDown();
}
}
}
三、向队列发送消息
/**
* redis
*/
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void send() {
//发布消息到redis队列中
Map<String,String> map = new HashMap<>(5);
map.put("title","发送主题");
map.put("body","发布了新订单");
map.put("order","*******");
stringRedisTemplate.convertAndSend(RedisKeyConstant.appendElderPrefix(RedisKeyConstant.REDIS_LISTENER_TOPIC),new Gson().toJson(map));
}