欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring boot+redis实现消息发布与订阅

程序员文章站 2022-04-15 23:27:46
一.创建spring boot项目 org.springframework.boot spring-boot-starter-data-redis

一.创建spring boot项目

<dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>com.alibaba</groupid>
            <artifactid>fastjson</artifactid>
            <version>1.2.41</version>
        </dependency>

二.编辑yml配置文件

server:
  port: 7888
# 日志配置
logging:
  config: classpath:log/logback.xml
  level:
    cn.com.dhcc: info
    org.springframework: info
    org.springframework.web: info
    com.alibaba.nacos.client.naming: error
spring:
  redis:
     host: localhost
     port: 6379
     password: *********
     database: 1
     jedis:
      pool:
        max-idle: 8
        max-active: 8
        max-wait: -1
        min-idle: 0
     timeout: 5000

三.配置redis

@configuration
public class redisconfiguration {

    /**
     * 实例化 redistemplate 对象
     *
     * @return
     */
    @bean("redistemplates")
    public redistemplate<string, object> functiondomainredistemplate(redisconnectionfactory redisconnectionfactory) {
        redistemplate<string, object> redistemplate = new redistemplate<>();
        initdomainredistemplate(redistemplate, redisconnectionfactory);
        return redistemplate;
    }

    /**
     * 设置数据存入 redis 的序列化方式,并开启事务
     * 
     * @param redistemplate
     * @param factory
     */
    private void initdomainredistemplate(@qualifier("redistemplates") redistemplate<string, object> redistemplate, redisconnectionfactory factory) {
        // 如果不配置serializer,那么存储的时候缺省使用string,如果用user类型存储,那么会提示错误user can't cast to
        // string!
        redistemplate.setkeyserializer(new stringredisserializer());
        redistemplate.sethashkeyserializer(new stringredisserializer());

        fastjsonredisserializer<object> fastjsonredisserializer = new fastjsonredisserializer<object>(object.class);
        redistemplate.sethashvalueserializer(fastjsonredisserializer);
        redistemplate.setvalueserializer(fastjsonredisserializer);
        //redistemplate.sethashvalueserializer(new genericjackson2jsonredisserializer());
        //redistemplate.setvalueserializer(new genericjackson2jsonredisserializer());
        // 开启事务
        redistemplate.setenabletransactionsupport(true);
        redistemplate.setconnectionfactory(factory);
    }

    /**
     * 注入封装redistemplate @title: redisutil @return redisutil @date
     * 
     */
    @bean(name = "redisutils")
    public redisutils redisutil(@qualifier("redistemplates") redistemplate<string, object> redistemplate) {
        redisutils redisutil = new redisutils();
        redisutil.setredistemplate(redistemplate);
        return redisutil;
    }

四.编写redisutil消息发布方法

public class redisutils {
    private static final logger log = loggerfactory.getlogger(redisutils.class);

    private redistemplate<string, object> redistemplate;

    public void setredistemplate(redistemplate<string, object> redistemplate) {
        this.redistemplate = redistemplate;
    }

    public void publish(string channal ,object obj) {
        redistemplate.convertandsend(channal,obj );
    }
}

五.配置消息监听

@configuration
public class redismessagelistener {

    /**
     * 创建连接工厂
     * @param connectionfactory
     * @param listeneradapter
     * @return
     */
    @bean
    public redismessagelistenercontainer container(redisconnectionfactory connectionfactory,
                                                   messagelisteneradapter listeneradapter,messagelisteneradapter listeneradapter2){
        redismessagelistenercontainer container = new redismessagelistenercontainer();
        container.setconnectionfactory(connectionfactory);
        //接受消息的key
        container.addmessagelistener(listeneradapter,new patterntopic("phone"));
        return container;
    }

    /**
     * 绑定消息监听者和接收监听的方法
     * @param receiver
     * @return
     */
    @bean
    public messagelisteneradapter listeneradapter(receiverredismessage  receiver){
        return new messagelisteneradapter(receiver,"receivemessage");
    }

    /**
     * 注册订阅者
     * @param latch
     * @return
     */
    @bean
    receiverredismessage receiver(countdownlatch latch) {
        return new receiverredismessage(latch);
    }

    /**
     * 计数器,用来控制线程
     * @return
     */
    @bean
    public countdownlatch latch(){
        return new countdownlatch(1);//指定了计数的次数 1
    }
}

六.消息订阅方法

public class receiverredismessage {

    private static final logger log = loggerfactory.getlogger(receiverredismessage.class);
    private countdownlatch latch;

    @autowired
    public receiverredismessage(countdownlatch latch) {
        this.latch = latch;
    }

    /**
     * 队列消息接收方法
     *
     * @param jsonmsg
     */
    public void receivemessage(string jsonmsg) {
        log.info("[开始消费redis消息队列phone数据...]");
        try {
            log.info("监听者收到消息:{}", jsonmsg);
            jsonobject exjson = jsonobject.parseobject(jsonmsg);
            user user = json.tojavaobject(exjson, user.class);
            system.out.println("转化为对象 :"+user);
            log.info("[消费redis消息队列phone数据成功.]");
        } catch (exception e) {
            log.error("[消费redis消息队列phone数据失败,失败信息:{}]", e.getmessage());
        }
        latch.countdown();
    }
}

七.定时消息发布测试

@enablescheduling
@component
public class publishercontroller {

    private static final logger log = loggerfactory.getlogger(publishercontroller.class);

    @autowired
    private redisutils redisutils;

    @scheduled(fixedrate = 5000)
    public string pubmsg() {
        user user=new user(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县");
        redisutils.publish("phone", user);
        log.info("publisher sendes topic... ");
        return "success";
    }
}

八.测试结果
Spring boot+redis实现消息发布与订阅

九.发布对象user实体

public class user implements serializable {

    /**
     * 
     */
    private static final long serialversionuid = 1l;
    private int id;
    private string name;
    private int age;
    private string sex;
    private string address;
     .....................
}