Spring boot+redis实现消息发布与订阅
程序员文章站
2022-07-10 21:14:51
一.创建spring boot项目 org.springframework.boot spring-boot-starter-data-redis
一.创建spring boot项目
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> <version>1.2.41</version> </dependency>
二.编辑yml配置文件
server: port: 7888 # 日志配置 logging: config: classpath:log/logback.xml level: cn.com.dhcc: info org.springframework: info org.springframework.web: info com.alibaba.nacos.client.naming: error spring: redis: host: localhost port: 6379 password: ********* database: 1 jedis: pool: max-idle: 8 max-active: 8 max-wait: -1 min-idle: 0 timeout: 5000
三.配置redis
@configuration public class redisconfiguration { /** * 实例化 redistemplate 对象 * * @return */ @bean("redistemplates") public redistemplate<string, object> functiondomainredistemplate(redisconnectionfactory redisconnectionfactory) { redistemplate<string, object> redistemplate = new redistemplate<>(); initdomainredistemplate(redistemplate, redisconnectionfactory); return redistemplate; } /** * 设置数据存入 redis 的序列化方式,并开启事务 * * @param redistemplate * @param factory */ private void initdomainredistemplate(@qualifier("redistemplates") redistemplate<string, object> redistemplate, redisconnectionfactory factory) { // 如果不配置serializer,那么存储的时候缺省使用string,如果用user类型存储,那么会提示错误user can't cast to // string! redistemplate.setkeyserializer(new stringredisserializer()); redistemplate.sethashkeyserializer(new stringredisserializer()); fastjsonredisserializer<object> fastjsonredisserializer = new fastjsonredisserializer<object>(object.class); redistemplate.sethashvalueserializer(fastjsonredisserializer); redistemplate.setvalueserializer(fastjsonredisserializer); //redistemplate.sethashvalueserializer(new genericjackson2jsonredisserializer()); //redistemplate.setvalueserializer(new genericjackson2jsonredisserializer()); // 开启事务 redistemplate.setenabletransactionsupport(true); redistemplate.setconnectionfactory(factory); } /** * 注入封装redistemplate @title: redisutil @return redisutil @date * */ @bean(name = "redisutils") public redisutils redisutil(@qualifier("redistemplates") redistemplate<string, object> redistemplate) { redisutils redisutil = new redisutils(); redisutil.setredistemplate(redistemplate); return redisutil; }
四.编写redisutil消息发布方法
public class redisutils { private static final logger log = loggerfactory.getlogger(redisutils.class); private redistemplate<string, object> redistemplate; public void setredistemplate(redistemplate<string, object> redistemplate) { this.redistemplate = redistemplate; } public void publish(string channal ,object obj) { redistemplate.convertandsend(channal,obj ); } }
五.配置消息监听
@configuration public class redismessagelistener { /** * 创建连接工厂 * @param connectionfactory * @param listeneradapter * @return */ @bean public redismessagelistenercontainer container(redisconnectionfactory connectionfactory, messagelisteneradapter listeneradapter,messagelisteneradapter listeneradapter2){ redismessagelistenercontainer container = new redismessagelistenercontainer(); container.setconnectionfactory(connectionfactory); //接受消息的key container.addmessagelistener(listeneradapter,new patterntopic("phone")); return container; } /** * 绑定消息监听者和接收监听的方法 * @param receiver * @return */ @bean public messagelisteneradapter listeneradapter(receiverredismessage receiver){ return new messagelisteneradapter(receiver,"receivemessage"); } /** * 注册订阅者 * @param latch * @return */ @bean receiverredismessage receiver(countdownlatch latch) { return new receiverredismessage(latch); } /** * 计数器,用来控制线程 * @return */ @bean public countdownlatch latch(){ return new countdownlatch(1);//指定了计数的次数 1 } }
六.消息订阅方法
public class receiverredismessage { private static final logger log = loggerfactory.getlogger(receiverredismessage.class); private countdownlatch latch; @autowired public receiverredismessage(countdownlatch latch) { this.latch = latch; } /** * 队列消息接收方法 * * @param jsonmsg */ public void receivemessage(string jsonmsg) { log.info("[开始消费redis消息队列phone数据...]"); try { log.info("监听者收到消息:{}", jsonmsg); jsonobject exjson = jsonobject.parseobject(jsonmsg); user user = json.tojavaobject(exjson, user.class); system.out.println("转化为对象 :"+user); log.info("[消费redis消息队列phone数据成功.]"); } catch (exception e) { log.error("[消费redis消息队列phone数据失败,失败信息:{}]", e.getmessage()); } latch.countdown(); } }
七.定时消息发布测试
@enablescheduling @component public class publishercontroller { private static final logger log = loggerfactory.getlogger(publishercontroller.class); @autowired private redisutils redisutils; @scheduled(fixedrate = 5000) public string pubmsg() { user user=new user(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县"); redisutils.publish("phone", user); log.info("publisher sendes topic... "); return "success"; } }
八.测试结果
九.发布对象user实体
public class user implements serializable { /** * */ private static final long serialversionuid = 1l; private int id; private string name; private int age; private string sex; private string address; ..................... }
上一篇: Java异常总结