欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

websocket+rabbitmq实战

程序员文章站 2022-10-06 16:35:07
1. websocket+rabbitmq实战 1.1. 前言   接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息 1.2. 遇坑 1. 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现webs ......

1. websocket+rabbitmq实战

1.1. 前言

  接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息

1.2. 遇坑

  1. 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现websocket每隔一段时间会断开,看网上有人因为nginx的连接超时机制断开,而我这似乎是因为长连接空闲时间太长而断开
  2. 经过测试,如果一直保持每隔段时间发送消息,那么连接不会断开,所以我采用了断开重连机制,分三种情况
    1. 服务器正常,客户端正常且空闲时间不超过1分钟,则情况正常,超过一分钟会断线,前端发起请求重连
    2. 服务器正常,客户端关闭或注销,服务器正常收到通知,去除对应客户端session
    3. 服务器异常,客户端正常,客户端发现连不上服务器会尝试重连3次,3次都连不上放弃重连
  3. rabbitmq定向推送,按需求需要一台机器对应一批用户,所以定制化需要服务启动的时候定向订阅该ip对应的队列名,简单说就是动态队列名的设定,所以又复杂了点,不能直接在注解写死。同时因为使用的apollo配置中心,同一集群应该相同的配置,所以也不能通过提取配置的方式设定值,为了这个点设置apollo的集群方式有点小题大做,所以采用动态读取数据库对应的ip取出对应的队列名。
  4. 部署线上tomcat的话,不需要加上一块代码
/**
 * 使用tomcat启动无需配置
 */
//@configuration
//@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class websocketconfig {
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}

1.3. 正式代码

1.3.1. rabbimq部分

  1. application.properties配置
spring.rabbitmq.addresses = i.tzxylao.com:5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = 123456
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout = 15000
  1. 交换机和队列配置
/**
 * @author laoliangliang
 * @date 2019/3/29 11:41
 */
@configuration
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class rabbitmqconfig {

    final public static string exchangename = "websocketexchange";

    /**
     * 创建交换器
     */
    @bean
    fanoutexchange exchange() {
        return new fanoutexchange(exchangename);
    }

    @bean
    public queue queue(){
        return new queue(orderqueuename());
    }

    @bean
    binding bindingexchangemessage(queue queue,fanoutexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange);
    }

    @bean
    public simplemessagelistenercontainer messagelistenercontainer(orderreceiver orderreceiver, @qualifier("rabbitconnectionfactory") cachingconnectionfactory cachingconnectionfactory){
        simplemessagelistenercontainer container = new simplemessagelistenercontainer(cachingconnectionfactory);
        // 监听队列的名称
        container.setqueuenames(orderqueuename());
        container.setexposelistenerchannel(true);
        // 设置每个消费者获取的最大消息数量
        container.setprefetchcount(100);
        // 消费者的个数
        container.setconcurrentconsumers(1);
        // 设置确认模式为自动确认
        container.setacknowledgemode(acknowledgemode.auto);
        container.setmessagelistener(orderreceiver);
        return container;
    }


    /**
     * 在这里写获取订单队列名的具体过程
     * @return
     */
    public string orderqueuename(){
        return "orderchannel";
    }
}
  1. 消息监听类
/**
 * @author laoliangliang
 * @date 2019/3/29 11:38
 */
@component
@slf4j
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class orderreceiver implements channelawaremessagelistener {

    @autowired
    private mywebsocket mywebsocket;

    @override
    public void onmessage(message message, channel channel) throws exception {
        byte[] body = message.getbody();
        log.info("接收到消息:" + new string(body));
        try {
            mywebsocket.sendmessage(new string(body));
        } catch (ioexception e) {
            log.error("send rabbitmq message error", e);
        }
    }
}

1.3.2. websocket部分

  1. 配置服务端点
@configuration
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class websocketconfig {
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}
  1. 核心代码
/**
 * @author laoliangliang
 * @date 2019/3/28 14:40
 */
public abstract class abstractwebsocket {

    protected static map<string, copyonwritearrayset<session>> sessionstore = new hashmap<>();

    public void sendmessage(string message) throws ioexception {
        list<string> usercodes = beforesendmessage();
        for (string usercode : usercodes) {
            copyonwritearrayset<session> sessions = sessionstore.get(usercode);
            //阻塞式的(同步的)
            if (sessions !=null && sessions.size() != 0) {
                for (session s : sessions) {
                    if (s != null) {
                        s.getbasicremote().sendtext(message);
                    }
                }
            }
        }
    }

    /**
     * 删选给谁发消息
     * @return
     */
    protected abstract list<string> beforesendmessage();

    protected void clearsession(session session) {
        collection<copyonwritearrayset<session>> values = sessionstore.values();
        for (copyonwritearrayset<session> sessions : values) {
            for (session session1 : sessions) {
                if (session.equals(session1)) {
                    sessions.remove(session);
                }
            }
        }
    }
}
@serverendpoint(value = "/websocket")
@component
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class mywebsocket extends abstractwebsocket {


    private static logger log = logmanager.getlogger(mywebsocket.class);

    @autowired
    private amqptemplate amqptemplate;

    @postconstruct
    public void init() {
        /*scheduledexecutorservice executorservice = executors.newscheduledthreadpool(1);
        executorservice.scheduleatfixedrate(new runnable() {

            int i = 0;

            @override
            public void run() {
                amqptemplate.convertandsend(rabbitfanout.exchangename, "",("msg num : " + i).getbytes());
                i++;
            }
        }, 50, 1, timeunit.seconds);*/
    }

    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @onopen
    public void onopen(session session) throws timeoutexception {
        log.info("websocket connect");
        //10m
        session.setmaxtextmessagebuffersize(10485760);
    }

    /**
     * 连接关闭调用的方法
     */
    @onclose
    public void onclose(session session) {
        clearsession(session);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @onmessage
    public void onmessage(string message, session session) {
        log.info("from client request:" + message);
        copyonwritearrayset<session> sessions = sessionstore.get(message);
        if (sessions == null) {
            sessions = new copyonwritearrayset<>();
        }
        sessions.add(session);
        sessionstore.put(message, sessions);
    }

    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @onerror
    public void onerror(session session, throwable error) {
        clearsession(session);
    }

    /**
     * 这里返回需要给哪些用户发送消息
     * @return
     */
    @override
    protected list<string> beforesendmessage() {
        //todo 给哪些用户发送消息
        return lists.newarraylist("6");
    }
}

1.3.3. 前端代码

var websocket = null;
var reconnectcount = 0;
function connectsocket(){
    var data = basicconfig();
    if(data.websocketenable !== "true"){
        return;
    }
    //判断当前浏览器是否支持websocket
    if ('websocket' in window) {
        if(data.localip && data.localip !== "" && data.serverport && data.serverport !== ""){
            websocket = new websocket("ws://"+data.localip+":"+data.serverport+data.servercontextpath+"/websocket");
        }else{
            return;
        }
    }else {
        alert('当前浏览器 不支持websocket')
    }

    //连接发生错误的回调方法
    websocket.onerror = function () {
        console.log("连接发生错误");
    };

    //连接成功建立的回调方法
    websocket.onopen = function () {
        reconnectcount = 0;
        console.log("连接成功");
    };

    //接收到消息的回调方法,此处添加处理接收消息方法,当前是将接收到的信息显示在网页上
    websocket.onmessage = function (event) {
        console.log("receive message:" + event.data);
    };

    //连接关闭的回调方法
    websocket.onclose = function () {
        console.log("连接关闭,如需登录请刷新页面。");
        if(reconnectcount === 3) {
            reconnectcount = 0;
            return;
        }
        connectsocket();
        basicconfig();
        reconnectcount++;
    };

    //添加事件监听
    websocket.addeventlistener('open', function () {
        websocket.send(data.usercode);
    });


    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function () {
        console.log("closewebsocket");
    };


}

connectsocket();

function basicconfig(){
        var result = {};
        $.ajax({
            type: "post",
            async: false,
            url: "${request.contextpath}/basicconfig",
            data: {},
            success: function (data) {
                result = data;
            }
        });
        return result;
    }

1.3.4. 后端提供接口

    @apolloconfig
    private config config;

    @requestmapping(value = {"/basicconfig"})
    @responsebody
    public map<string, object> getusercode(httpsession session) {
        map<string, object> map = new hashmap<>(2);
        map.put("usercode",string.valueof(session.getattribute("usercode")));
        string websocketenable = config.getproperty("websocket.enabled", "false");
        string servercontextpath  = config.getproperty("server.context-path", "");
        map.put("websocketenable", websocketenable);
        map.put("servercontextpath", servercontextpath);

        string localip = config.getproperty("local.ip", "");
        string serverport = config.getproperty("server.port", "80");

        map.put("localip", localip);
        map.put("serverport", serverport);
        return map;
    }