解决socket负载均衡集群方案和代码实现
有一段时间,在考虑下socket 之间集群 可以在Nginx 下可以 但是不同服务器之间怎么通讯呢 后来自己也想可不可以用什么东西或者中间件来通讯 ,后来在百度之下 发现果然就是按照我所想的 ,在网上看了一个方案,架构如下:
说一下这个方案他们之间的作用
redis : 存取用户和服务器的关系,如果A想发一条数据,是发给B ,这时A发起一个mq订阅模式推送的数据 ,然后B收到数据处理好 ,在推送给用户就ok
mq : 是了桥接A和B 的之间的通讯
这样A和B之间通讯就没有问题了
以下是搭建过程
首先配置好nginx ,可以百度下载和安装 ,我的路径是如下 打开配置nginx.conf vim nginx.conf 配置如下
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream ws_name {
server 127.0.0.1:12345 weight=1; ## weight 权重越大越大获取链接机会就越大
server 127.0.0.1:12346 weight=1;
}
server {
listen 9999;
server_name localhost;
location / {
proxy_pass http://ws_name/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
### 以下配置是为了解决在nginx 下 socket 短时间的断开 ,即使你配置长链接也没用 必须有如下的配置
proxy_connect_timeout 4s;
proxy_read_timeout 600s; #这个配置连接保持多长时间 这配置十分钟 ,然后可以自己心跳来保证长链接
proxy_send_timeout 12s;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
以上是nginx的配置 然后启动就ok 启动 命令 :./nginx
查看是否启动 命令 ps -ef |grep nginx
然后 mq 我的mq是 activemq 这个可以百度看看怎么安装 ,因为这个安装比较简单所以就不说太多了
接下来是代码的实现了 :
package com.yw.socket;
import com.yw.mq.Producer;
import com.yw.redis.JedisUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import javax.jms.JMSException;
import java.util.Date;
import java.util.Iterator;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:29
**/
public class SocketServerHandler extends SimpleChannelInboundHandler<Object> {
private String URLSTR="ws://%s:%s/%s";
private String ws="";
private String port="";
private WebSocketServerHandshaker handshaker;
public SocketServerHandler(String url,String port,String name){
ws = String.format(URLSTR, url,port,name);
this.port=port;
}
public SocketServerHandler(String port,String name){
this("127.0.0.1",port,name);
}
public SocketServerHandler(String port){
this("127.0.0.1",port,"websocket");
}
public SocketServerHandler(){
};
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
System.out.println("handleHttpRequest FullHttpRequest");
///处理第一次连接过来的请求
handleHttpRequest(channelHandlerContext,(FullHttpRequest)msg);
} else if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(channelHandlerContext,(WebSocketFrame)msg);
}
}
/**
* 添加新的连接
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("有新的连接进来了额 " + ctx.channel().id() + " 连接时间是 " + new Date().toLocaleString());
//新的连接,保存对应服务对象和和所在服务,这里用 channel.id 当做key只是演示 真实不可,可以根据的自己业务需求去分析
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.setKey(ctx.channel().id().toString(),this.port);
JedisUtil.closeResource();
MapChannerlGlobal.GROUP.add(ctx.channel());
}
/**
* 删除关闭连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
MapChannerlGlobal.GROUP.add(ctx.channel());
System.out.println("删除该链接" + ctx.channel().id() + "断开连接时间 " + new Date().toLocaleString());
//删除对应key
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.delKeyString(ctx.channel().id().toString());
JedisUtil.closeResource();
}
/**
* 处理发过来的信息
* @param ctx
* @param frame
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){
System.out.println("handlerWebSocketFrame-->> "+frame+frame.getClass().getSimpleName());
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame);
System.out.println("关闭连接---CloseWebSocketFrame");
}
//不是二进制的数据
if (!(frame instanceof TextWebSocketFrame)){
System.out.println("暂时只有二进制数据处理");
return ;
}
String msg = ((TextWebSocketFrame) frame).text();
if(msg.indexOf(":")>=0){
String[] split = msg.split(":");
//获取该用户所在的服务器
JedisUtil JedisUtil=new JedisUtil();
String cid = JedisUtil.getkeyStr(split[0]);
JedisUtil.closeResource();//关闭redis
//如果端口号不相同的话 说明该服务器不在同一个 所以借用中间件推送
if(!cid.equals(this.port)){
try {
new Producer().sendTocp(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}else{
///如果相同则直接推送
ChannelGroup group = MapChannerlGlobal.GROUP;
Iterator<Channel> iterator = group.iterator();
while(iterator.hasNext()){
if(cid.equals(iterator.next().id().toString())){
//推送数据
TextWebSocketFrame tws = new TextWebSocketFrame(split[1]);
iterator.next().writeAndFlush(tws);
}
}
}
return ;
}
//其他消息
System.out.println("客户端发过来的的数据: "+msg);
long time = new Date().getTime();
TextWebSocketFrame tws =new TextWebSocketFrame(" 这是服务器回馈的数据 时间:"+ time +" 通道的 id " +ctx.channel().id() );
ctx.channel().writeAndFlush(tws);
}
/**
* 发送响应回馈请求者
* @param ctx
* @param req
* @param resp
*/
private void sendHttpResponse(ChannelHandlerContext ctx ,FullHttpRequest req,DefaultFullHttpResponse resp){
if (resp.getStatus().code()!=200){
ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);
resp.content().writeBytes(buffer);
buffer.release();
}
//响应请求
ChannelFuture f=ctx.channel().writeAndFlush(resp);
if ( resp .getStatus().code()!=200){
f.addListener(ChannelFutureListener.CLOSE);///发起关闭
}
}
/**
* 处理发过来的请求
* @param ctx
* @param req
*/
private void handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
if (req.getDecoderResult().isSuccess() && (!"websocket".equals(req.headers().get("Upgrade")))) {
///创建一个响应者
DefaultFullHttpResponse resp=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST,false);
sendHttpResponse(ctx,req,resp);
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(ws,null,false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 读取完通道流 进行刷新
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.flush();
}
}
package com.yw.socket;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:28
**/
public class MapChannerlGlobal {
public static ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
package com.yw.socket;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.util.Iterator;
/**
* @program: mqYw
* @description: 接受消息队列的数据
* @author: yw
* @create: 2018-07-06 21:58
**/
public class ConsumerSocket implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
ChannelGroup group = MapChannerlGlobal.GROUP;
TextWebSocketFrame tws = null;
Channel next = null;
Iterator<Channel> iterator = group.iterator();
try {
// 这是一个标志 如果是 key:msg ,key 是Channel的id msg是消息 演示用 真实中不能这杨 可以根据自己的需求去定这个key
if( textMessage.getText().toString().indexOf(":")>=0){
String[] strings = textMessage.getText().toString().split(":");
while (iterator.hasNext()) {
next = iterator.next();
///这里用Channel的id当键值是不合理 只是用来演示而已
if(next.id().toString().equals(strings[0])){
try {
//推送数据
tws = new TextWebSocketFrame(textMessage.getText());
next.writeAndFlush(tws);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.yw.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.channel.socket.SocketChannel;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 21:37
**/
public class ChannelInitializerHandler extends ChannelInitializer<SocketChannel> {
public ChannelInitializerHandler() {
}
//端口号
private String port="";
public ChannelInitializerHandler(String port) {
this.port=port;
}
@Override
protected void initChannel(SocketChannel cl) throws Exception {
cl.pipeline().addLast("http-codec",new HttpServerCodec())
///接受数据长度
.addLast("aggregator",new HttpObjectAggregator(1024*1024))
// 设置当有新的模块时候要恢复传输
.addLast("http-chunked",new ChunkedWriteHandler())
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
//添加自己定义handler
.addLast("handler",new SocketServerHandler(port,"ts"));
}
}
package com.yw.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-06 22:28
**/
public class JedisTools {
private static JedisPool jedisPool = null;
///连接ip
private static final String ADDRIP="127.0.0.1";
//端口号
private static final Integer PORT = 6379;
///密码
private static final String PWD="123456";
static {
if(jedisPool==null){
JedisPoolConfig config=new JedisPoolConfig();
//设置最大连接数 默认是8 , -1表示 无限
config.setMaxTotal(10);
//最多有多少个状态为idle(空闲的)的jedis实例 默认是 8
config.setMaxIdle(10);
///等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时
config.setMaxWaitMillis(300);
///在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
jedisPool=new JedisPool(config,ADDRIP,PORT,3000);
}
}
public synchronized static Jedis getJedis(){
if(jedisPool!=null){
Jedis j= jedisPool.getResource();
return j;
}
return null;
}
}
package com.yw.redis;
import redis.clients.jedis.Jedis;
import java.io.*;
/**
* @program: mqYw
* @description:
* @author: yw
* @create: 2018-07-07 12:46
**/
public class JedisUtil {
private Jedis jedis;
public JedisUtil(){
this.jedis = JedisTools.getJedis();
}
public void setKey(String key,Object obj){
jedis.set(key.getBytes(),objToByte(obj));
}
public Object getKeyObj(String key){
return byteToObj(jedis.get(key.getBytes()));
}
public void setKey(String key,String value){
jedis.set(key,value);
}
public String getkeyStr(String key){
return jedis.get(key);
}
public void closeResource(){
if(this.jedis!=null){
/// this.jedis.flushAll();
this.jedis.close();
}
}
public void delKeyString(String key){
this.jedis.del(key);
}
public void delKeyBytes(String key){
this.jedis.del(key.getBytes());
}
/**
* 序列化转为obj
* @param
* @return
*/
public Object byteToObj(byte[] bt){
Object obj=null;
if(bt==null) return null;
ByteArrayInputStream bis=null;
ObjectInputStream ois=null;
try {
bis=new ByteArrayInputStream(bt);
ois=new ObjectInputStream(bis);
obj=ois.readObject();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(bis!=null){
bis.close();
}
if(ois!=null){
ois.close();
}
} catch (IOException e1) {
System.out.println(e1);
}
}
return obj;
}
/**
* obj 序列化
* @param obj
* @return
*/
public byte[] objToByte(Object obj){
if(obj==null) return null;
byte [] bt=null;
ByteArrayOutputStream bos=null;
ObjectOutputStream oos=null;
try {
bos=new ByteArrayOutputStream();
oos=new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
bt=bos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(bos!=null){
bos.close();
}
if(oos!=null){
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return bt;
}
public static void main(String[] args) {
JedisUtil JedisUtil=new JedisUtil();
JedisUtil.setKey("test","123");
System.out.println(JedisUtil.getkeyStr("test"));
JedisUtil.closeResource();
}
}
public class Producer {
private static final String TOPIC ="test-topic";
//订阅模式
public void sendTocp(String msg) throws JMSException {
//TOPIC
Connection connection = ConnectionUtil.getConnection(URL);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC);
MessageProducer producer = session.createProducer(topic);
TextMessage textMessage=null;
textMessage = session.createTextMessage(msg);
producer.send(textMessage);
producer.close();
session.close();
connection.close();
}
}
package test.mq;
import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import com.yw.socket.SocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.jms.*;
/**
* 启动 服务A
*
*/
public class App
{
private static final String TOPIC ="test-topic";
private static final String URL ="tcp://127.0.0.1:61616";
private static final String QUEUENAME ="test-queue";
public void msgTocp() throws JMSException {
//获取连接
Connection connection = ConnectionUtil.getConnection(URL);
///创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//订阅模式
Topic topic = session.createTopic(TOPIC);
/// 创建消费者
MessageConsumer consumer = session.createConsumer(topic);
//加入监听者
consumer.setMessageListener(new ConsumerSocket());
}
public static void main( String[] args ) throws InterruptedException, JMSException {
//启动消费者
new App().msgTocp();
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workGroup);
sb.channel(NioServerSocketChannel.class);
sb.option(ChannelOption.SO_BACKLOG,128)
// 有数据立即发送
.option(ChannelOption.TCP_NODELAY,true)
// 长连接 建议不用 用心跳来保持长链接 ,免得浪费资源 经常测试 在nginx负载均衡下是这个长链接不起作用的
.childOption(ChannelOption.SO_KEEPALIVE,true);
sb.childHandler(new ChannelInitializerHandler("12345"));
System.out.println("服务端开启等待客户端连接 ... ...");
sb.bind(12345).sync().channel().closeFuture().sync();
}
}
package test.mq;
import com.yw.mq.ConnectionUtil;
import com.yw.socket.ChannelInitializerHandler;
import com.yw.socket.ConsumerSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.jms.*;
/**
* 启动服务 B
*
*/
public class App02
{
public static void main( String[] args ) throws InterruptedException, JMSException {
//启动消费者
new App().msgTocp();
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workGroup);
sb.channel(NioServerSocketChannel.class);
sb.option(ChannelOption.SO_BACKLOG,128)
// 有数据立即发送
.option(ChannelOption.TCP_NODELAY,true)
// 长连接 建议不用 用心跳来保持长链接 ,免得浪费资源 经常测试 在nginx负载均衡下是这个长链接不起作用的
.childOption(ChannelOption.SO_KEEPALIVE,true);
sb.childHandler(new ChannelInitializerHandler("12346"));
System.out.println("服务端开启等待客户端连接 ... ...");
sb.bind(12346).sync().channel().closeFuture().sync();
}
}
以上是全部代码了 。接下是效果图
这样就大功告成了 ,
但是这样架构是很危险的 提单一了 redis 和mq 如果我之间有写过mq 的集群加高可用了 请看这里,redis 有空会研究一下集群高可用才行 ,以上如果有雷同的地方,都是学习中 ,架构是来自百度的 不过自己之前也想过类似 最终是别人的博客证实了我的想法了
上一篇: Nginx搭建负载均衡解决方案
下一篇: Zabbix添加自定义Item(监控项)