欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty多端口绑定服务端

程序员文章站 2022-06-04 19:47:31
...
package vua.system.netty;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import vua.system.pojo.Network;
import vua.system.service.NetworkService;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by pxx  on 2020/8/20 10:50
 * netty 启动类
 */

@Component
public class ServerStart implements CommandLineRunner {

    @Autowired
    private NetworkService networkService;
    //获取网络端口
    private List<Integer> ports = new ArrayList<>();

    @Override
    public void run(String... args) {
        try {
            List<Network> works = networkService.getAllNetwork();
            if (!CollectionUtils.isEmpty(works)) {
                for (Network network : works) {
                    ports.add(network.getNetworkPort());
                }
                //启动
                new BootNettyServer(ports).start();
            }
        } catch (Exception e) {
            System.out.println("启动失败!");
        }
    }
}
package vua.system.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.util.CollectionUtils;

import javax.annotation.PreDestroy;
import java.util.List;


/**
 * 测试多端口监听服务端
 * Created by pxx  on 2020/8/17 13:43
 */

public class BootNettyServer {

    private ChannelFuture[] ChannelFutures = null;
    List<Integer> ports = null;

    BootNettyServer(List<Integer> ports) {
        this.ports = ports;
    }

    ServerBootstrap serverBootstrap = new ServerBootstrap();

    private EventLoopGroup boss;
    private EventLoopGroup worker;
    private ChannelFuture futureTcp;

    // linux平台才支持epoll
    boolean epoll = false;

    @PreDestroy
    public void stop() {
        if (futureTcp != null) {
            futureTcp.channel().close().addListener(ChannelFutureListener.CLOSE);
            futureTcp.awaitUninterruptibly();
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            futureTcp = null;
        }
    }


    public void start() throws InterruptedException {

        if (epoll) {
            boss = new EpollEventLoopGroup();
            worker = new EpollEventLoopGroup();
        } else {
            boss = new NioEventLoopGroup();
            worker = new NioEventLoopGroup();
        }
        //设置监听组,线程组,初始化器
        serverBootstrap.group(boss, worker)
                //这个参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.SO_REUSEADDR, true)
                //保持连接 如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
                .childHandler(new BootNettyChannelInitializer());

        if (epoll) {
            serverBootstrap.channel(EpollServerSocketChannel.class);
        } else {
            serverBootstrap.channel(NioServerSocketChannel.class);
        }

        try {
            if (!CollectionUtils.isEmpty(ports)) {
                if (ChannelFutures == null) {
                    ChannelFutures = new ChannelFuture[ports.size()];
                }
                int i = 0;
                for (Integer port : ports) {
                    //绑定端口,同步等待成功 绑定的服务器
                    futureTcp = serverBootstrap.bind(port).sync();
                    ChannelFutures[i++] = futureTcp;
                    futureTcp.addListener(future -> {
                        if (future.isSuccess()) {
                            System.out.println("netty server 启动成功!" + port);
                        } else {
                            System.out.println("netty server 启动失败!" + port);
                        }
                    });
                }
            }
            if (ChannelFutures.length == ports.size()) {
                for (ChannelFuture futureTcp : ChannelFutures) {
                    futureTcp.channel().closeFuture().sync().channel();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(e.getMessage());
        } finally {
            if (ChannelFutures.length == ports.size()) {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
                stop();
            }
        }
    }

}
package vua.system.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.util.CollectionUtils;
import vua.system.service.ClientImelService;
import vua.system.service.MessageService;
import vua.system.service.TerminalDataService;
import vua.system.service.impl.SendMsgServiceImpl;
import vua.system.utils.PackageUtils;
import vua.system.utils.SpringContextUtil;
import vua.system.webscoket.WebsocketController;

import java.io.IOException;
import java.util.*;


/**
 * Created by pxx  on 2020/8/17 13:46
 */

public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {

    private WebsocketController websocketController = SpringContextUtil.getApplicationContext().getBean(WebsocketController.class);

    private SendMsgServiceImpl sendMsgServiceImpl = SpringContextUtil.getApplicationContext().getBean(SendMsgServiceImpl.class);

    private ClientImelService clientImelService = SpringContextUtil.getApplicationContext().getBean(ClientImelService.class);

    private TerminalDataService terminalDataService = SpringContextUtil.getApplicationContext().getBean(TerminalDataService.class);

    private MessageService messageService = SpringContextUtil.getApplicationContext().getBean(MessageService.class);

    private BusinessThread businessThread;

    private BusinessBy4GThread businessBy4GThread;

    private Thread thread;

    private ChannelHandlerContext channelHandlerContext;

    private Set<String> imels = new HashSet<>();

    private Integer port;

    //imel号与端口绑定 用来设置无人机页面显示开关
    private Map<String, Integer> bindPort = new HashMap<>();

    //向前端发送的开关
    public Map<Map<String, Integer>, String> onOff = new HashMap<>();


    BootNettyChannelInboundHandlerAdapter(Integer port) {
        this.port = port;
    }

    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);

        //把每个客户端存进通道
        String data = msg.toString().trim();

        //处理粘包
        List<String> datas = PackageUtils.stickingSplit(data);

        if (!CollectionUtils.isEmpty(datas)) {

            String imel;

            for (String meg : datas) {

                //处理客户端数据
                if (meg.length() == 20) {

                    //维护webScoket推送map
                    sendMsgServiceImpl.addNettyClient(ctx, data);

                    //心跳数据
                    clientImelService.insert(meg);

                    //imel号与端口绑定 用来设置无人机页面显示开关
                    imel = meg.substring(8, 12);
                    bindPort.put(imel, port);

                } else if (meg.startsWith("$$,") && meg.endsWith(",*ff")) {

                    //获取客户端代号
                    imel = meg.substring(3, 7);
                    imels.add(imel);
                    bindPort.put(imel, port);

                    //处理数据协议
                    terminalDataService.insert(meg);
                } else {
                    //乱码
                    messageService.insert(meg);
                }

                //不同端口不同的处理客户端断开策略
                if (port == 6666) businessThread.count = 0;//清零计数器
                if (port == 7777) businessBy4GThread.count = 0;
            }

            //发送前端无人机显示开关
            if (!CollectionUtils.isEmpty(bindPort)) {
                onOff.put(bindPort, "open");
                websocketController.sendTopicmassTmOpen(onOff);
            }
        }
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     * 冲刷所有待审消息到远程节点,关闭通道后,操作完成
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时,
     * 后台出错以及Channel异常都会触发此方法
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        try {
            //发送前端无人机显示开关
            if (!CollectionUtils.isEmpty(bindPort)) {
                onOff.put(bindPort, "close");
                websocketController.sendTopicmassTmOpen(onOff);
                System.out.println(onOff+"----------关....");
            }
            System.out.println("错误或者处理器在处理事件时抛出异常....");

            cause.printStackTrace();
            ctx.close();//抛出异常,断开与客户端的连接

        } catch (Exception e) {
            System.out.println("后台处理事件时抛出异常....");
        }
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        channelHandlerContext = ctx;

        //不同端口不同的处理客户端断开策略
        if (port == 6666) {

            channelHandlerContext = ctx;

            //开启线程监事链接心跳
            businessThread = new BusinessThread(ctx, bindPort);
            thread = new Thread(businessThread);
            thread.start();
        }
        if (port == 7777) {

            channelHandlerContext = ctx;
            //开启线程监事链接心跳
            businessBy4GThread = new BusinessBy4GThread(ctx, bindPort);
            thread = new Thread(businessBy4GThread);
            thread.start();
        }

    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception{

        super.channelInactive(ctx);

        //与客户端断开方式
        if (port == 6666) businessThread.exit = true;
        if (port == 7777) businessBy4GThread.exit = true;

        //维护webScoket推送map
        sendMsgServiceImpl.deleteNettyClient(ctx);

        ctx.channel().writeAndFlush("客户端与服务端断开连接...");
        //发送前端无人机显示开关
        if (!CollectionUtils.isEmpty(bindPort)) {

            onOff.put(bindPort, "close");
            websocketController.sendTopicmassTmOpen(onOff);
            System.out.println(onOff+"----------关....");
        }

        //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        ctx.close();

        System.out.println("客户端与服务端 断连...:");
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        try {
            if (businessThread != null || businessBy4GThread != null) {

                businessThread.exit = true;
                businessBy4GThread.exit = true;

            }

            ctx.channel().writeAndFlush("超时......");
            ctx.channel().close();

        } catch (Exception e) {

            System.out.println("线程为空.........");
        }

    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx){
        System.out.println("注册..channel");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx){
        System.out.println("未注册..channel");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx){
        System.out.println("channelWritabilityChanged..channel");
    }

}
package vua.system.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.Charset;


/**
 * Created by pxx  on 2020/8/17 13:45
 * 添加编解码处理器ChannelPipeline
 * 当有一个新的连接被接受,一个新的子Channel将被创建,ChannelInitializer添加EchoServerHandler到Channel的ChannelPipeline
 */
public class BootNettyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        try {
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 属于ChannelInboundHandler,依照顺序执行
            pipeline.addLast("decoder", new StringDecoder(Charset.forName("utf-8")));
            // ChannelOutboundHandler,依照逆序执行 添加字符串解码器
            pipeline.addLast("encoder", new StringEncoder(Charset.forName("utf-8")));
            //自定义处理器
            pipeline.addLast("handler", new BootNettyChannelInboundHandlerAdapter(socketChannel.localAddress().getPort()));

        } catch (Exception e) {
            System.out.println("加编解码处理器ChannelPipeline异常...");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        super.exceptionCaught(ctx, cause);
    }
}
package vua.system.netty;

import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import vua.system.service.TerminalDataService;
import vua.system.utils.SpringContextUtil;
import vua.system.webscoket.WebsocketController;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by pxx  on 2020/8/22 17:27
 * <p>
 * 3.每个客户端接入后,发送:客户端IMEI信息查询协议 并解析相应信息。如客户端已存在则进行覆盖。
 * Thuraya卫星通信每隔60分钟发送查询协议,若6次始终没有应答,则认为客户端已断开连接,并删除客户端
 */
public class BusinessThread implements Runnable {

    private WebsocketController websocketController = SpringContextUtil.getApplicationContext().getBean(WebsocketController.class);

    @Autowired
    private TerminalDataService terminalDataService;

    private ChannelHandlerContext channelHandlerContext = null;

    public boolean exit = false;

    //轮询间隔
    private int timeGap = 600;

    //超时断开
    public int count = 0;

    //无人页面显示前端开关
    private Map<Map<String, Integer>, String> onOff = new HashMap<>();

    //imel号与端口绑定 用来设置无人机页面显示开关
    private Map<String, Integer> bindPort;

    BusinessThread(ChannelHandlerContext ctx, Map<String, Integer> bindPort) {

        this.channelHandlerContext = ctx;
        this.bindPort = bindPort;

        System.out.println("线程构造");
    }

    @Override
    public void run() {
        count = 0;

        while (!exit) {

            try {
                //心跳逻辑在此
                if (count >= 6) {

                    //关闭线程和链接
                    System.out.println("由于无应答,线程退出");

                    channelHandlerContext.close();
                    return;
                }

                channelHandlerContext.write("$$,REQUEST_IMEI*ff");
                channelHandlerContext.flush();
                count++;

                //发送前端无人机显示开关
                if (!CollectionUtils.isEmpty(bindPort)) {

                    onOff.put(bindPort, "open");
                    websocketController.sendTopicmassTmOpen(onOff);
                    //System.out.println(onOff+"-------开....");

                }

                Thread.sleep(1000 * timeGap);
            } catch (Exception e) {

                if (!CollectionUtils.isEmpty(bindPort)) {

                    onOff.put(bindPort, "close");
                    websocketController.sendTopicmassTmOpen(onOff);
                    System.out.println(onOff+"-------2---关....");

                }
                System.out.println("线程异常退出");
            }
        }
        System.out.println("由于exit被调用者置为true,线程退出");

        if (!CollectionUtils.isEmpty(bindPort)) {
            onOff.put(bindPort, "close");
            websocketController.sendTopicmassTmOpen(onOff);
            System.out.println(onOff+"-------3---关....");

        }
        channelHandlerContext.close();
    }

}

 

相关标签: java netty