netty 实现长连接
程序员文章站
2022-06-21 21:09:45
...
1.server 端信息
package com.boce.netty.longlink.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.BaseMsg;
import com.boce.netty.longlink.common.LoginMsg;
import com.boce.netty.longlink.common.MsgType;
import com.boce.netty.longlink.common.PingMsg;
import com.boce.netty.longlink.common.ReplyClientBody;
import com.boce.netty.longlink.common.ReplyMsg;
import com.boce.netty.longlink.common.ReplyServerBody;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
/**
* Created by
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
private static Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("client logout -->"+ctx.channel().remoteAddress());
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseMsg msg) throws Exception {
log.info("channel read->{},clientid=>{}",msg.getType(),msg.getClientId());
if(MsgType.LOGIN.equals(msg.getType())){
LoginMsg loginMsg=(LoginMsg)msg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)ctx.channel());
System.out.println("client"+loginMsg.getClientId()+" 登录成功");
}
}else{
if(NettyChannelMap.get(msg.getClientId())==null){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsg loginMsg=new LoginMsg();
ctx.channel().writeAndFlush(loginMsg);
}
}
switch (msg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)msg;
PingMsg replyPing=new PingMsg();
log.info("ping clientid={}",pingMsg.getClientId());
log.info("clientids={}",NettyChannelMap.clientIds());
if(NettyChannelMap.clientIds().indexOf(pingMsg.getClientId()) < 0){
NettyChannelMap.add(pingMsg.getClientId(), (SocketChannel)ctx.channel());
}
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
case ASK:{
//收到客户端的请求
AskMsg askMsg=(AskMsg)msg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客户端回复
ReplyMsg replyMsg=(ReplyMsg)msg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receive client msg: "+clientBody.getClientInfo());
}break;
default:break;
}
}
}
package com.boce.netty.longlink.server;
import java.util.concurrent.TimeUnit;
import com.boce.netty.longlink.common.AskMsg;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* Created by
*/
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public static void main(String[] args) throws InterruptedException {
NettyServerBootstrap bootstrap = new NettyServerBootstrap(9999);
while (true) {
String clients = NettyChannelMap.clientIds();
String[] sct = clients.split(";");
int len = sct.length;
for (int i = 0; i < len; i++) {
if (!StringUtils.isEmpty(sct[i])) {
SocketChannel channel = (SocketChannel) NettyChannelMap.get(sct[i]);
if (channel != null) {
AskMsg askMsg = new AskMsg();
askMsg.setClientId(sct[i]);
channel.writeAndFlush(askMsg);
}
}
}
TimeUnit.SECONDS.sleep(30);
}
}
}
2客户端编写
package com.boce.netty.longlink.client;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.AskParams;
import com.boce.netty.longlink.common.Constants;
import com.boce.netty.longlink.common.LoginMsg;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* 创建单例对象
* @author gjp
*
*/
public class NettyClientBootstrapSingle {
private static Lock lock = new ReentrantLock();
private static NettyClientBootstrapSingle nettyClientBootstrapSingle = null;
private int port;
private String host;
private SocketChannel socketChannel;
public SocketChannel getSocketChannel() {
return socketChannel;
}
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClientBootstrapSingle(final int port, final String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}
public static NettyClientBootstrapSingle getInstance(int port, String host){
if(null == nettyClientBootstrapSingle){
lock.lock();
try{
if(null == nettyClientBootstrapSingle){
try {
System.out.println("===========================new 实例");
nettyClientBootstrapSingle = new NettyClientBootstrapSingle(port, host);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}else{
System.out.println("==========================已经声明对象--------------------");
}
return nettyClientBootstrapSingle;
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server 成功---------");
}
}
}
package com.boce.netty.longlink.client;
import java.util.concurrent.TimeUnit;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.AskParams;
import com.boce.netty.longlink.common.Constants;
import com.boce.netty.longlink.common.LoginMsg;
public class ThreadClient implements Runnable {
@Override
public void run() {
for (int i = 0; i < 2000; i++) {
NettyClientBootstrapSingle netty = NettyClientBootstrapSingle.getInstance(9999, "192.168.1.201");
Constants.setClientId("" + Thread.currentThread().getId());
if(i <1){
LoginMsg loginMsg = new LoginMsg();
loginMsg.setClientId("" + Thread.currentThread().getId());
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
netty.getSocketChannel().writeAndFlush(loginMsg);
}
AskMsg askMsg = new AskMsg();
askMsg.setClientId(Constants.getClientId());
AskParams askParams = new AskParams();
askParams.setAuth("authToken");
askMsg.setParams(askParams);
netty.getSocketChannel().writeAndFlush(askMsg);
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread thread = new Thread(new ThreadClient());
thread.start();
Thread thread2 = new Thread(new ThreadClient());
thread2.start();
}
}
源代码在附件中
package com.boce.netty.longlink.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.BaseMsg;
import com.boce.netty.longlink.common.LoginMsg;
import com.boce.netty.longlink.common.MsgType;
import com.boce.netty.longlink.common.PingMsg;
import com.boce.netty.longlink.common.ReplyClientBody;
import com.boce.netty.longlink.common.ReplyMsg;
import com.boce.netty.longlink.common.ReplyServerBody;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
/**
* Created by
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
private static Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("client logout -->"+ctx.channel().remoteAddress());
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseMsg msg) throws Exception {
log.info("channel read->{},clientid=>{}",msg.getType(),msg.getClientId());
if(MsgType.LOGIN.equals(msg.getType())){
LoginMsg loginMsg=(LoginMsg)msg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)ctx.channel());
System.out.println("client"+loginMsg.getClientId()+" 登录成功");
}
}else{
if(NettyChannelMap.get(msg.getClientId())==null){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsg loginMsg=new LoginMsg();
ctx.channel().writeAndFlush(loginMsg);
}
}
switch (msg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)msg;
PingMsg replyPing=new PingMsg();
log.info("ping clientid={}",pingMsg.getClientId());
log.info("clientids={}",NettyChannelMap.clientIds());
if(NettyChannelMap.clientIds().indexOf(pingMsg.getClientId()) < 0){
NettyChannelMap.add(pingMsg.getClientId(), (SocketChannel)ctx.channel());
}
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
case ASK:{
//收到客户端的请求
AskMsg askMsg=(AskMsg)msg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客户端回复
ReplyMsg replyMsg=(ReplyMsg)msg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receive client msg: "+clientBody.getClientInfo());
}break;
default:break;
}
}
}
package com.boce.netty.longlink.server;
import java.util.concurrent.TimeUnit;
import com.boce.netty.longlink.common.AskMsg;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* Created by
*/
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public static void main(String[] args) throws InterruptedException {
NettyServerBootstrap bootstrap = new NettyServerBootstrap(9999);
while (true) {
String clients = NettyChannelMap.clientIds();
String[] sct = clients.split(";");
int len = sct.length;
for (int i = 0; i < len; i++) {
if (!StringUtils.isEmpty(sct[i])) {
SocketChannel channel = (SocketChannel) NettyChannelMap.get(sct[i]);
if (channel != null) {
AskMsg askMsg = new AskMsg();
askMsg.setClientId(sct[i]);
channel.writeAndFlush(askMsg);
}
}
}
TimeUnit.SECONDS.sleep(30);
}
}
}
2客户端编写
package com.boce.netty.longlink.client;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.AskParams;
import com.boce.netty.longlink.common.Constants;
import com.boce.netty.longlink.common.LoginMsg;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* 创建单例对象
* @author gjp
*
*/
public class NettyClientBootstrapSingle {
private static Lock lock = new ReentrantLock();
private static NettyClientBootstrapSingle nettyClientBootstrapSingle = null;
private int port;
private String host;
private SocketChannel socketChannel;
public SocketChannel getSocketChannel() {
return socketChannel;
}
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClientBootstrapSingle(final int port, final String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}
public static NettyClientBootstrapSingle getInstance(int port, String host){
if(null == nettyClientBootstrapSingle){
lock.lock();
try{
if(null == nettyClientBootstrapSingle){
try {
System.out.println("===========================new 实例");
nettyClientBootstrapSingle = new NettyClientBootstrapSingle(port, host);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}else{
System.out.println("==========================已经声明对象--------------------");
}
return nettyClientBootstrapSingle;
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server 成功---------");
}
}
}
package com.boce.netty.longlink.client;
import java.util.concurrent.TimeUnit;
import com.boce.netty.longlink.common.AskMsg;
import com.boce.netty.longlink.common.AskParams;
import com.boce.netty.longlink.common.Constants;
import com.boce.netty.longlink.common.LoginMsg;
public class ThreadClient implements Runnable {
@Override
public void run() {
for (int i = 0; i < 2000; i++) {
NettyClientBootstrapSingle netty = NettyClientBootstrapSingle.getInstance(9999, "192.168.1.201");
Constants.setClientId("" + Thread.currentThread().getId());
if(i <1){
LoginMsg loginMsg = new LoginMsg();
loginMsg.setClientId("" + Thread.currentThread().getId());
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
netty.getSocketChannel().writeAndFlush(loginMsg);
}
AskMsg askMsg = new AskMsg();
askMsg.setClientId(Constants.getClientId());
AskParams askParams = new AskParams();
askParams.setAuth("authToken");
askMsg.setParams(askParams);
netty.getSocketChannel().writeAndFlush(askMsg);
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread thread = new Thread(new ThreadClient());
thread.start();
Thread thread2 = new Thread(new ThreadClient());
thread2.start();
}
}
源代码在附件中
上一篇: 腊八节的风俗及寓意是什么
下一篇: maven根据不同环境打不同的war