欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

解决socket负载均衡集群方案和代码实现

程序员文章站 2022-07-11 11:58:46
...

有一段时间,在考虑下socket 之间集群 可以在Nginx 下可以 但是不同服务器之间怎么通讯呢 后来自己也想可不可以用什么东西或者中间件来通讯 ,后来在百度之下 发现果然就是按照我所想的 ,在网上看了一个方案,架构如下:

解决socket负载均衡集群方案和代码实现

说一下这个方案他们之间的作用

redis : 存取用户和服务器的关系,如果A想发一条数据,是发给B ,这时A发起一个mq订阅模式推送的数据 ,然后B收到数据处理好 ,在推送给用户就ok 

mq : 是了桥接A和B 的之间的通讯

这样A和B之间通讯就没有问题了

以下是搭建过程

首先配置好nginx ,可以百度下载和安装 ,我的路径是如下 打开配置nginx.conf   vim nginx.conf 配置如下

解决socket负载均衡集群方案和代码实现

解决socket负载均衡集群方案和代码实现

 map $http_upgrade $connection_upgrade {

      default upgrade;

      '' close;

    }


    upstream ws_name {

      server 127.0.0.1:12345 weight=1; ## weight 权重越大越大获取链接机会就越大

      server 127.0.0.1:12346 weight=1;

    }

    server {

        listen       9999;

        server_name  localhost;

        location / {

            proxy_pass   http://ws_name/;

            proxy_http_version 1.1;

            proxy_set_header Upgrade $http_upgrade;

            proxy_set_header Connection "Upgrade";

           ### 以下配置是为了解决在nginx 下 socket 短时间的断开 ,即使你配置长链接也没用 必须有如下的配置

            proxy_connect_timeout 4s;

            proxy_read_timeout 600s; #这个配置连接保持多长时间 这配置十分钟 ,然后可以自己心跳来保证长链接

            proxy_send_timeout 12s;



        }

        error_page   500 502 503 504  /50x.html;

        location = /50x.html {

            root   html;

        }

以上是nginx的配置 然后启动就ok 启动 命令 :./nginx 

查看是否启动 命令  ps -ef |grep nginx

 然后 mq 我的mq是 activemq  这个可以百度看看怎么安装 ,因为这个安装比较简单所以就不说太多了 

接下来是代码的实现了 :

package com.yw.socket;

import com.yw.mq.Producer;
import com.yw.redis.JedisUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import javax.jms.JMSException;
import java.util.Date;
import java.util.Iterator;


/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:29
 **/
public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private String URLSTR="ws://%s:%s/%s";

    private String ws="";

    private String port="";



    private WebSocketServerHandshaker handshaker;

    public SocketServerHandler(String url,String port,String name){
        ws = String.format(URLSTR, url,port,name);
        this.port=port;

    }

    public SocketServerHandler(String port,String name){
        this("127.0.0.1",port,name);
    }

    public  SocketServerHandler(String port){
        this("127.0.0.1",port,"websocket");
    }

    public  SocketServerHandler(){

    };
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {


        if (msg instanceof FullHttpRequest) {
            System.out.println("handleHttpRequest  FullHttpRequest");
            ///处理第一次连接过来的请求
            handleHttpRequest(channelHandlerContext,(FullHttpRequest)msg);
        }  else if (msg instanceof WebSocketFrame) {

            handlerWebSocketFrame(channelHandlerContext,(WebSocketFrame)msg);
        }

    }



    /**
     * 添加新的连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
           System.out.println("有新的连接进来了额  " + ctx.channel().id() + " 连接时间是 " + new Date().toLocaleString());
          //新的连接,保存对应服务对象和和所在服务,这里用 channel.id 当做key只是演示 真实不可,可以根据的自己业务需求去分析
           JedisUtil JedisUtil=new JedisUtil();
           JedisUtil.setKey(ctx.channel().id().toString(),this.port);
           JedisUtil.closeResource();

           MapChannerlGlobal.GROUP.add(ctx.channel());
    }

    /**
     * 删除关闭连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        MapChannerlGlobal.GROUP.add(ctx.channel());
        System.out.println("删除该链接" + ctx.channel().id() +  "断开连接时间 " + new Date().toLocaleString());
        //删除对应key
        JedisUtil JedisUtil=new JedisUtil();
        JedisUtil.delKeyString(ctx.channel().id().toString());
        JedisUtil.closeResource();

    }


    /**
     *  处理发过来的信息
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){

        System.out.println("handlerWebSocketFrame-->> "+frame+frame.getClass().getSimpleName());

        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame);
            System.out.println("关闭连接---CloseWebSocketFrame");
        }
        //不是二进制的数据
        if (!(frame instanceof TextWebSocketFrame)){
            System.out.println("暂时只有二进制数据处理");
            return ;
        }

        String msg = ((TextWebSocketFrame) frame).text();
        if(msg.indexOf(":")>=0){
            String[] split = msg.split(":");
            //获取该用户所在的服务器
            JedisUtil JedisUtil=new JedisUtil();
            String cid = JedisUtil.getkeyStr(split[0]);
            JedisUtil.closeResource();//关闭redis
            //如果端口号不相同的话 说明该服务器不在同一个  所以借用中间件推送
            if(!cid.equals(this.port)){
                try {
                    new Producer().sendTocp(msg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{
                ///如果相同则直接推送
                ChannelGroup group = MapChannerlGlobal.GROUP;
                Iterator<Channel> iterator = group.iterator();
                while(iterator.hasNext()){
                    if(cid.equals(iterator.next().id().toString())){
                        //推送数据
                        TextWebSocketFrame  tws = new TextWebSocketFrame(split[1]);
                        iterator.next().writeAndFlush(tws);
                    }
                }


            }
            return  ;
        }

        //其他消息

        System.out.println("客户端发过来的的数据: "+msg);
        long time = new Date().getTime();
        TextWebSocketFrame tws =new TextWebSocketFrame(" 这是服务器回馈的数据 时间:"+ time +" 通道的 id " +ctx.channel().id() );
        ctx.channel().writeAndFlush(tws);

    }



    /**
     * 发送响应回馈请求者
     * @param ctx
     * @param req
     * @param resp
     */
    private void sendHttpResponse(ChannelHandlerContext ctx ,FullHttpRequest req,DefaultFullHttpResponse resp){

        if (resp.getStatus().code()!=200){

            ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);
            resp.content().writeBytes(buffer);
            buffer.release();
        }

        //响应请求
        ChannelFuture f=ctx.channel().writeAndFlush(resp);

        if ( resp .getStatus().code()!=200){
            f.addListener(ChannelFutureListener.CLOSE);///发起关闭
        }
    }


    /**
     * 处理发过来的请求
     * @param ctx
     * @param req
     */
    private  void  handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){

        if (req.getDecoderResult().isSuccess() && (!"websocket".equals(req.headers().get("Upgrade")))) {
            ///创建一个响应者
            DefaultFullHttpResponse resp=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST,false);

           sendHttpResponse(ctx,req,resp);
        }

        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(ws,null,false);

        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory
                    .sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }

    }


    /**
     * 读取完通道流 进行刷新
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        ctx.flush();
    }









}


package com.yw.socket;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:28
 **/
public class MapChannerlGlobal {

    public static ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


}
package com.yw.socket;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.util.Iterator;

/**
 * @program: mqYw
 * @description: 接受消息队列的数据
 * @author: yw
 * @create: 2018-07-06 21:58
 **/
public class ConsumerSocket implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        ChannelGroup group = MapChannerlGlobal.GROUP;
        TextWebSocketFrame tws = null;
        Channel next = null;
        Iterator<Channel> iterator = group.iterator();
        try {
            // 这是一个标志  如果是 key:msg  ,key 是Channel的id  msg是消息 演示用 真实中不能这杨 可以根据自己的需求去定这个key
            if( textMessage.getText().toString().indexOf(":")>=0){
                String[] strings = textMessage.getText().toString().split(":");
                while (iterator.hasNext()) {
                    next = iterator.next();
                    ///这里用Channel的id当键值是不合理 只是用来演示而已
                    if(next.id().toString().equals(strings[0])){
                        try {
                            //推送数据
                            tws = new TextWebSocketFrame(textMessage.getText());
                            next.writeAndFlush(tws);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }



    }


}

package com.yw.socket;

import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;

import io.netty.channel.socket.SocketChannel;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 21:37
 **/
public class ChannelInitializerHandler extends ChannelInitializer<SocketChannel> {

    public ChannelInitializerHandler() {

    }
    //端口号
    private  String  port="";
    public ChannelInitializerHandler(String port) {
        this.port=port;
    }


    @Override
    protected void initChannel(SocketChannel cl) throws Exception {
             cl.pipeline().addLast("http-codec",new HttpServerCodec())
                ///接受数据长度
                .addLast("aggregator",new HttpObjectAggregator(1024*1024))
                // 设置当有新的模块时候要恢复传输
                .addLast("http-chunked",new ChunkedWriteHandler())

                .addLast(new StringDecoder(CharsetUtil.UTF_8))
                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                //添加自己定义handler
                .addLast("handler",new SocketServerHandler(port,"ts"));
    }
}

package com.yw.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-06 22:28
 **/
public class JedisTools {


    private static JedisPool jedisPool = null;
   ///连接ip
    private  static final String ADDRIP="127.0.0.1";
    //端口号
    private static final  Integer PORT = 6379;
    ///密码
    private  static  final  String PWD="123456";


    static {
        if(jedisPool==null){
            JedisPoolConfig config=new JedisPoolConfig();
            //设置最大连接数 默认是8   , -1表示 无限
            config.setMaxTotal(10);
            //最多有多少个状态为idle(空闲的)的jedis实例  默认是 8
            config.setMaxIdle(10);
            ///等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时
            config.setMaxWaitMillis(300);
            ///在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
            config.setTestOnBorrow(true);
            jedisPool=new JedisPool(config,ADDRIP,PORT,3000);

        }

    }


    public synchronized static Jedis getJedis(){

        if(jedisPool!=null){
            Jedis j= jedisPool.getResource();
            return j;
        }

        return null;

    }




}

package com.yw.redis;

import redis.clients.jedis.Jedis;

import java.io.*;

/**
 * @program: mqYw
 * @description:
 * @author: yw
 * @create: 2018-07-07 12:46
 **/
public class JedisUtil {


    private Jedis jedis;

    public JedisUtil(){
        this.jedis = JedisTools.getJedis();
    }


    public void setKey(String key,Object obj){

        jedis.set(key.getBytes(),objToByte(obj));

    }


    public Object getKeyObj(String key){
        return byteToObj(jedis.get(key.getBytes()));
    }



    public void  setKey(String key,String value){
        jedis.set(key,value);
    }


    public String getkeyStr(String key){
        return jedis.get(key);
    }


    public void closeResource(){

        if(this.jedis!=null){
        ///    this.jedis.flushAll();
            this.jedis.close();
        }

    }


    public void delKeyString(String key){
        this.jedis.del(key);
    }


    public void delKeyBytes(String key){
        this.jedis.del(key.getBytes());

    }
    /**
     * 序列化转为obj
     * @param
     * @return
     */
    public Object byteToObj(byte[] bt){
        Object obj=null;
        if(bt==null) return null;
        ByteArrayInputStream bis=null;
        ObjectInputStream ois=null;
        try {
            bis=new ByteArrayInputStream(bt);
            ois=new ObjectInputStream(bis);

            obj=ois.readObject();

        } catch (Exception e) {
         e.printStackTrace();
        }finally{
            try {
                if(bis!=null){
                    bis.close();
                }
                if(ois!=null){
                    ois.close();
                }
            } catch (IOException e1) {
                System.out.println(e1);
            }
        }
        return obj;
    }


    /**
     * obj 序列化
     * @param obj
     * @return
     */
    public byte[] objToByte(Object obj){
        if(obj==null) return null;
        byte [] bt=null;
        ByteArrayOutputStream bos=null;
        ObjectOutputStream oos=null;
        try {
            bos=new ByteArrayOutputStream();
            oos=new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bt=bos.toByteArray();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                if(bos!=null){
                    bos.close();
                }
                if(oos!=null){
                    oos.close();
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bt;
    }


    public static void main(String[] args) {
        JedisUtil JedisUtil=new JedisUtil();

        JedisUtil.setKey("test","123");
        System.out.println(JedisUtil.getkeyStr("test"));
        JedisUtil.closeResource();

    }
}

public class Producer {
    private static final   String TOPIC ="test-topic";

    //订阅模式
    public void sendTocp(String msg) throws JMSException {
//TOPIC
        Connection connection = ConnectionUtil.getConnection(URL);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(TOPIC);
        MessageProducer producer = session.createProducer(topic);

        TextMessage textMessage=null;

        textMessage = session.createTextMessage(msg);
        producer.send(textMessage);

        producer.close();
        session.close();
        connection.close();
    }
}
package test.mq;

import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import com.yw.socket.SocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import javax.jms.*;

/**
 * 启动 服务A 
 *
 */
public class App

{

    private static final   String TOPIC ="test-topic";
    private static final   String URL ="tcp://127.0.0.1:61616";
    private static final   String QUEUENAME ="test-queue";

    public void msgTocp() throws JMSException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection(URL);
        ///创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //订阅模式
        Topic topic = session.createTopic(TOPIC);
        /// 创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //加入监听者
        consumer.setMessageListener(new ConsumerSocket());

    }

    public static void main( String[] args ) throws InterruptedException, JMSException {
        //启动消费者
        new App().msgTocp();

        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();

        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup,workGroup);
        sb.channel(NioServerSocketChannel.class);
        sb.option(ChannelOption.SO_BACKLOG,128)
                // 有数据立即发送
                .option(ChannelOption.TCP_NODELAY,true)
                // 长连接 建议不用 用心跳来保持长链接 ,免得浪费资源  经常测试 在nginx负载均衡下是这个长链接不起作用的
                .childOption(ChannelOption.SO_KEEPALIVE,true);
        sb.childHandler(new ChannelInitializerHandler("12345"));
        System.out.println("服务端开启等待客户端连接 ... ...");
        sb.bind(12345).sync().channel().closeFuture().sync();
    }
}

package test.mq;

import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import javax.jms.*;

/**
 * 启动服务 B
 *
 */
public class App02

{


    public static void main( String[] args ) throws InterruptedException, JMSException {
        //启动消费者
        new App().msgTocp();

        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();

        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup,workGroup);
        sb.channel(NioServerSocketChannel.class);
        sb.option(ChannelOption.SO_BACKLOG,128)
                // 有数据立即发送
                .option(ChannelOption.TCP_NODELAY,true)
                // 长连接 建议不用 用心跳来保持长链接 ,免得浪费资源  经常测试 在nginx负载均衡下是这个长链接不起作用的
                .childOption(ChannelOption.SO_KEEPALIVE,true);
        sb.childHandler(new ChannelInitializerHandler("12346"));
        System.out.println("服务端开启等待客户端连接 ... ...");
        sb.bind(12346).sync().channel().closeFuture().sync();
    }
}

以上是全部代码了 。接下是效果图

解决socket负载均衡集群方案和代码实现



这样就大功告成了 ,

但是这样架构是很危险的 提单一了 redis 和mq 如果我之间有写过mq 的集群加高可用了 请看这里,redis 有空会研究一下集群高可用才行 ,以上如果有雷同的地方,都是学习中 ,架构是来自百度的 不过自己之前也想过类似 最终是别人的博客证实了我的想法了