Netty多端口绑定服务端
程序员文章站
2022-06-04 19:47:31
...
package vua.system.netty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import vua.system.pojo.Network;
import vua.system.service.NetworkService;
import java.util.ArrayList;
import java.util.List;
/**
* Created by pxx on 2020/8/20 10:50
* netty 启动类
*/
@Component
public class ServerStart implements CommandLineRunner {
@Autowired
private NetworkService networkService;
//获取网络端口
private List<Integer> ports = new ArrayList<>();
@Override
public void run(String... args) {
try {
List<Network> works = networkService.getAllNetwork();
if (!CollectionUtils.isEmpty(works)) {
for (Network network : works) {
ports.add(network.getNetworkPort());
}
//启动
new BootNettyServer(ports).start();
}
} catch (Exception e) {
System.out.println("启动失败!");
}
}
}
package vua.system.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.util.CollectionUtils;
import javax.annotation.PreDestroy;
import java.util.List;
/**
* 测试多端口监听服务端
* Created by pxx on 2020/8/17 13:43
*/
public class BootNettyServer {
private ChannelFuture[] ChannelFutures = null;
List<Integer> ports = null;
BootNettyServer(List<Integer> ports) {
this.ports = ports;
}
ServerBootstrap serverBootstrap = new ServerBootstrap();
private EventLoopGroup boss;
private EventLoopGroup worker;
private ChannelFuture futureTcp;
// linux平台才支持epoll
boolean epoll = false;
@PreDestroy
public void stop() {
if (futureTcp != null) {
futureTcp.channel().close().addListener(ChannelFutureListener.CLOSE);
futureTcp.awaitUninterruptibly();
boss.shutdownGracefully();
worker.shutdownGracefully();
futureTcp = null;
}
}
public void start() throws InterruptedException {
if (epoll) {
boss = new EpollEventLoopGroup();
worker = new EpollEventLoopGroup();
} else {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
}
//设置监听组,线程组,初始化器
serverBootstrap.group(boss, worker)
//这个参数表示允许重复使用本地地址和端口
.childOption(ChannelOption.SO_REUSEADDR, true)
//保持连接 如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
.childHandler(new BootNettyChannelInitializer());
if (epoll) {
serverBootstrap.channel(EpollServerSocketChannel.class);
} else {
serverBootstrap.channel(NioServerSocketChannel.class);
}
try {
if (!CollectionUtils.isEmpty(ports)) {
if (ChannelFutures == null) {
ChannelFutures = new ChannelFuture[ports.size()];
}
int i = 0;
for (Integer port : ports) {
//绑定端口,同步等待成功 绑定的服务器
futureTcp = serverBootstrap.bind(port).sync();
ChannelFutures[i++] = futureTcp;
futureTcp.addListener(future -> {
if (future.isSuccess()) {
System.out.println("netty server 启动成功!" + port);
} else {
System.out.println("netty server 启动失败!" + port);
}
});
}
}
if (ChannelFutures.length == ports.size()) {
for (ChannelFuture futureTcp : ChannelFutures) {
futureTcp.channel().closeFuture().sync().channel();
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
} finally {
if (ChannelFutures.length == ports.size()) {
worker.shutdownGracefully();
boss.shutdownGracefully();
stop();
}
}
}
}
package vua.system.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.util.CollectionUtils;
import vua.system.service.ClientImelService;
import vua.system.service.MessageService;
import vua.system.service.TerminalDataService;
import vua.system.service.impl.SendMsgServiceImpl;
import vua.system.utils.PackageUtils;
import vua.system.utils.SpringContextUtil;
import vua.system.webscoket.WebsocketController;
import java.io.IOException;
import java.util.*;
/**
* Created by pxx on 2020/8/17 13:46
*/
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
private WebsocketController websocketController = SpringContextUtil.getApplicationContext().getBean(WebsocketController.class);
private SendMsgServiceImpl sendMsgServiceImpl = SpringContextUtil.getApplicationContext().getBean(SendMsgServiceImpl.class);
private ClientImelService clientImelService = SpringContextUtil.getApplicationContext().getBean(ClientImelService.class);
private TerminalDataService terminalDataService = SpringContextUtil.getApplicationContext().getBean(TerminalDataService.class);
private MessageService messageService = SpringContextUtil.getApplicationContext().getBean(MessageService.class);
private BusinessThread businessThread;
private BusinessBy4GThread businessBy4GThread;
private Thread thread;
private ChannelHandlerContext channelHandlerContext;
private Set<String> imels = new HashSet<>();
private Integer port;
//imel号与端口绑定 用来设置无人机页面显示开关
private Map<String, Integer> bindPort = new HashMap<>();
//向前端发送的开关
public Map<Map<String, Integer>, String> onOff = new HashMap<>();
BootNettyChannelInboundHandlerAdapter(Integer port) {
this.port = port;
}
/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
//把每个客户端存进通道
String data = msg.toString().trim();
//处理粘包
List<String> datas = PackageUtils.stickingSplit(data);
if (!CollectionUtils.isEmpty(datas)) {
String imel;
for (String meg : datas) {
//处理客户端数据
if (meg.length() == 20) {
//维护webScoket推送map
sendMsgServiceImpl.addNettyClient(ctx, data);
//心跳数据
clientImelService.insert(meg);
//imel号与端口绑定 用来设置无人机页面显示开关
imel = meg.substring(8, 12);
bindPort.put(imel, port);
} else if (meg.startsWith("$$,") && meg.endsWith(",*ff")) {
//获取客户端代号
imel = meg.substring(3, 7);
imels.add(imel);
bindPort.put(imel, port);
//处理数据协议
terminalDataService.insert(meg);
} else {
//乱码
messageService.insert(meg);
}
//不同端口不同的处理客户端断开策略
if (port == 6666) businessThread.count = 0;//清零计数器
if (port == 7777) businessBy4GThread.count = 0;
}
//发送前端无人机显示开关
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "open");
websocketController.sendTopicmassTmOpen(onOff);
}
}
}
/**
* 从客户端收到新的数据、读取完成时调用
* 冲刷所有待审消息到远程节点,关闭通道后,操作完成
*
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时,
* 后台出错以及Channel异常都会触发此方法
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
//发送前端无人机显示开关
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "close");
websocketController.sendTopicmassTmOpen(onOff);
System.out.println(onOff+"----------关....");
}
System.out.println("错误或者处理器在处理事件时抛出异常....");
cause.printStackTrace();
ctx.close();//抛出异常,断开与客户端的连接
} catch (Exception e) {
System.out.println("后台处理事件时抛出异常....");
}
}
/**
* 客户端与服务端第一次建立连接时 执行
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelHandlerContext = ctx;
//不同端口不同的处理客户端断开策略
if (port == 6666) {
channelHandlerContext = ctx;
//开启线程监事链接心跳
businessThread = new BusinessThread(ctx, bindPort);
thread = new Thread(businessThread);
thread.start();
}
if (port == 7777) {
channelHandlerContext = ctx;
//开启线程监事链接心跳
businessBy4GThread = new BusinessBy4GThread(ctx, bindPort);
thread = new Thread(businessBy4GThread);
thread.start();
}
}
/**
* 客户端与服务端 断连时 执行
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception{
super.channelInactive(ctx);
//与客户端断开方式
if (port == 6666) businessThread.exit = true;
if (port == 7777) businessBy4GThread.exit = true;
//维护webScoket推送map
sendMsgServiceImpl.deleteNettyClient(ctx);
ctx.channel().writeAndFlush("客户端与服务端断开连接...");
//发送前端无人机显示开关
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "close");
websocketController.sendTopicmassTmOpen(onOff);
System.out.println(onOff+"----------关....");
}
//断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
ctx.close();
System.out.println("客户端与服务端 断连...:");
}
/**
* 服务端当read超时, 会调用这个方法
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
try {
if (businessThread != null || businessBy4GThread != null) {
businessThread.exit = true;
businessBy4GThread.exit = true;
}
ctx.channel().writeAndFlush("超时......");
ctx.channel().close();
} catch (Exception e) {
System.out.println("线程为空.........");
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx){
System.out.println("注册..channel");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx){
System.out.println("未注册..channel");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx){
System.out.println("channelWritabilityChanged..channel");
}
}
package vua.system.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
/**
* Created by pxx on 2020/8/17 13:45
* 添加编解码处理器ChannelPipeline
* 当有一个新的连接被接受,一个新的子Channel将被创建,ChannelInitializer添加EchoServerHandler到Channel的ChannelPipeline
*/
public class BootNettyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
ChannelPipeline pipeline = socketChannel.pipeline();
// 属于ChannelInboundHandler,依照顺序执行
pipeline.addLast("decoder", new StringDecoder(Charset.forName("utf-8")));
// ChannelOutboundHandler,依照逆序执行 添加字符串解码器
pipeline.addLast("encoder", new StringEncoder(Charset.forName("utf-8")));
//自定义处理器
pipeline.addLast("handler", new BootNettyChannelInboundHandlerAdapter(socketChannel.localAddress().getPort()));
} catch (Exception e) {
System.out.println("加编解码处理器ChannelPipeline异常...");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
package vua.system.netty;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import vua.system.service.TerminalDataService;
import vua.system.utils.SpringContextUtil;
import vua.system.webscoket.WebsocketController;
import java.util.HashMap;
import java.util.Map;
/**
* Created by pxx on 2020/8/22 17:27
* <p>
* 3.每个客户端接入后,发送:客户端IMEI信息查询协议 并解析相应信息。如客户端已存在则进行覆盖。
* Thuraya卫星通信每隔60分钟发送查询协议,若6次始终没有应答,则认为客户端已断开连接,并删除客户端
*/
public class BusinessThread implements Runnable {
private WebsocketController websocketController = SpringContextUtil.getApplicationContext().getBean(WebsocketController.class);
@Autowired
private TerminalDataService terminalDataService;
private ChannelHandlerContext channelHandlerContext = null;
public boolean exit = false;
//轮询间隔
private int timeGap = 600;
//超时断开
public int count = 0;
//无人页面显示前端开关
private Map<Map<String, Integer>, String> onOff = new HashMap<>();
//imel号与端口绑定 用来设置无人机页面显示开关
private Map<String, Integer> bindPort;
BusinessThread(ChannelHandlerContext ctx, Map<String, Integer> bindPort) {
this.channelHandlerContext = ctx;
this.bindPort = bindPort;
System.out.println("线程构造");
}
@Override
public void run() {
count = 0;
while (!exit) {
try {
//心跳逻辑在此
if (count >= 6) {
//关闭线程和链接
System.out.println("由于无应答,线程退出");
channelHandlerContext.close();
return;
}
channelHandlerContext.write("$$,REQUEST_IMEI*ff");
channelHandlerContext.flush();
count++;
//发送前端无人机显示开关
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "open");
websocketController.sendTopicmassTmOpen(onOff);
//System.out.println(onOff+"-------开....");
}
Thread.sleep(1000 * timeGap);
} catch (Exception e) {
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "close");
websocketController.sendTopicmassTmOpen(onOff);
System.out.println(onOff+"-------2---关....");
}
System.out.println("线程异常退出");
}
}
System.out.println("由于exit被调用者置为true,线程退出");
if (!CollectionUtils.isEmpty(bindPort)) {
onOff.put(bindPort, "close");
websocketController.sendTopicmassTmOpen(onOff);
System.out.println(onOff+"-------3---关....");
}
channelHandlerContext.close();
}
}
上一篇: Netty服务端口的绑定
下一篇: 手机使用代理IP访问电脑服务器所遇到的坑