redis队列先进先出需要注意什么_「每日一问」用代码分析,Redis为什么可以做消息队列...
MQ
现在的应用基本上都是采用分布式 系统架构进行设计,而很多分布式系统必备的一个基础组件就是消息队列。
如果大家不想再引入一个其他的消息组件例如:kafka、RabbitMQ、RocketMQ。恰好你的项目中使用了Redis,可以尝试利用Redis编写一个轻量级的消息组件。
为什么Redis可以作为消息队列
在回答这个问题前,首先我们考虑下,作为一个消息队列,应该有哪些特征?我个人认为应该满足以下三点要求:
消息的有序性
问题描述:虽然消费者是异步处理消息,但是大部分的情况下消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
解决方案:redis提供的数据结构list和sorted set都是有序存储的结构,都可以保证存入数据的有序性。但是个人更倾向于使用list,一是因为list的有序性不需要人为的干预,sorted set需要设置score来维护顺序;二是因为list支持阻塞是获取brpop,避免了轮询队列造成的CPU消耗。
重复消费的处理
问题描述:消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
解决方案:针对这一问题,生产者在生产消息时,生成一个全局唯一的消息id,消费者在消费时,手动记录已经消费的消息id。避免消息的重复消费。
消息的可靠性
问题描述:消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
解决方案:redis支持数据的持久化,因此可以保证数据的可靠性,具体可以参考我之前写的文章「每日一问」Redis宕机后,如何恢复数据?
简易代码实现
1.创建三个注解分别表示消息监听者、消息消费者、消息处理者,具体代码如下:
消息消费者容器/*** 消息消费者容器,被该注解标注的类表示其内部的被@see MessageListener 标注的public方法* 为一个消息监听器*/@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@[email protected] @interface MessageConsumer { @AliasFor(annotation = Component.class) String value() default "";}
消息处理器
/*** 消息处理器标志,被该注解标注的类,会处理间听到的消息,根据消息的类型选择处理器*/@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@[email protected] @interface MessageHandler { MessageListener.Mode value() default MessageListener.Mode.TOPIC;}
消息监听器
/*** 消息监听器,只可用在*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface MessageListener { String value() default ""; String topic() default ""; // 监听的消息主题 String channel() default ""; //监听的消息通道,适用于发布订阅模式 Mode mode() default Mode.TOPIC; // 监听器模式广播模式、主题模式 enum Mode { TOPIC(), PUBSUB() }}
2.创建消息处理器
抽象消息处理器@Slf4jpublic abstract class AbstractMessageHandler implements ApplicationContextAware { protected ApplicationContext applicationContext; protected RedisTemplate redisTemplate; protected void invokeMethod(Method method, Message message, Object bean) { try { method.invoke(bean, message); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } protected Message getMessage(byte[] bytes) { String s = new String(bytes, CharsetUtil.CHARSET_UTF_8); return JSONUtil.toBean(s, Message.class); } public AbstractMessageHandler(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } protected RedisConnection getConnection() { RedisConnection connection = redisTemplate.getRequiredConnectionFactory().getConnection(); return connection; } public abstract void invokeMessage(Method method); @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } protected void consumer(Method method, Set consumers, Object bean, byte[] message) { Message msg = getMessage(message); if (consumers.add(msg.getId())) { invokeMethod(method, msg, bean); } else { log.error("has consumed message {}", msg); } }}
topic模式消息处理器
@MessageHandler(value = MessageListener.Mode.TOPIC)public class TopicMessageHandler extends AbstractMessageHandler { public TopicMessageHandler(RedisTemplate redisTemplate) { super(redisTemplate); } @Override public void invokeMessage(Method method) { Set consumers = new HashSet<>(); MessageListener annotation = method.getAnnotation(MessageListener.class); String topic = getTopic(annotation); RedisConnection connection = redisTemplate.getRequiredConnectionFactory().getConnection(); Class declaringClass = method.getDeclaringClass(); Object bean = applicationContext.getBean(declaringClass); while (true) { List bytes = connection.bRPop(1, topic.getBytes()); if (CollectionUtil.isNotEmpty(bytes)) { if (null != bytes.get(1)) { consumer(method, consumers, bean, bytes.get(1)); } } } } private String getTopic(MessageListener annotation) { String value = annotation.value(); String topic = annotation.topic(); return StrUtil.isBlank(topic) ? value : topic; }}
pubsub模式消息处理器
@MessageHandler(value = MessageListener.Mode.PUBSUB)public class PubsubMessageHandler extends AbstractMessageHandler { public PubsubMessageHandler(RedisTemplate redisTemplate) { super(redisTemplate); } @Override public void invokeMessage(Method method) { Set consumers = new HashSet<>(); MessageListener listener = method.getAnnotation(MessageListener.class); String channel = getChannel(listener); RedisConnection connection = getConnection(); connection.subscribe((message, pattern) -> { Class> declaringClass = method.getDeclaringClass(); Object bean = applicationContext.getBean(declaringClass); byte[] body = message.getBody(); consumer(method, consumers, bean, body); }, channel.getBytes()); } private String getChannel(MessageListener annotation) { String value = annotation.value(); String channel = annotation.channel(); return StrUtil.isBlank(channel) ? value : channel; }}
3.创建消费消息启动器
@[email protected] class MessageConsumerStater implements ApplicationRunner, ApplicationContextAware { private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) { Map invokers = getInvokers(); applicationContext.getBeansWithAnnotation(MessageConsumer.class).values().parallelStream().forEach(consumer -> { Method[] methods = consumer.getClass().getMethods(); if (ArrayUtil.isNotEmpty(methods)) { Arrays.stream(methods).parallel().forEach(method -> invokeMessage(method, invokers)); } }); } // 启动消息监听器 private void startMessageListener(Method method, Map invokers) { MessageListener listener = method.getAnnotation(MessageListener.class); if (null == listener) { return; } MessageListener.Mode mode = listener.mode(); AbstractMessageHandler invoker = invokers.get(mode); if (invoker == null) { log.error("invoker is null"); return; } invoker.invokeMessage(method); } private Map getInvokers() { Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(MessageHandler.class); Map collect = beansWithAnnotation.values().stream().collect(Collectors .toMap(k -> k.getClass().getAnnotation(MessageHandler.class).value(), k -> (AbstractMessageHandler)k)); return collect; } public MessageConsumerStater() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}
4.消费消息器
@MessageConsumerpublic class MqConsumer { @MessageListener(topic = "topic1", mode = MessageListener.Mode.TOPIC) public void testTopic1(Message message) { System.out.println ("topic===> " + message); } @MessageListener(topic = "topic2", mode = MessageListener.Mode.TOPIC) public void testTopic2(Message message) { System.out.println ("topic===> " + message); } @MessageListener(topic = "topic3", mode = MessageListener.Mode.TOPIC) public void testTopic3(Message message) { System.out.println ("topic===> " + message); } @MessageListener(channel = "pubsub", mode = MessageListener.Mode.PUBSUB) public void testPubsub1(Message message) { System.out.println ("pubsub===> " + message); } @MessageListener(channel = "pubsub", mode = MessageListener.Mode.PUBSUB) public void testPubsub2(Message message) { System.out.println ("pubsub===> " + message); }}
5.辅助实体类
@[email protected]@[email protected] class Message implements Serializable { private String id; private T content;}
为了方便大家学习交流,源代码我这边上传到了码云仓库,欢迎大家学习交流。https://gitee.com/smn322/redismq/
上一篇: 移动或复制工作表