欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty实现心跳检测

程序员文章站 2024-03-23 22:49:52
...

一.服务端代码

1.1 Server


public class Server {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                // 设置日志
                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        sc.pipeline().addLast(new ServerHeartBeatHandler());
                    };
                });
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

1.2 ServerHeartBeatHandler


public class ServerHeartBeatHandler extends ChannelHandlerAdapter {

    /** key:ip value:auth **/
    private static HashMap<String, String> AUTH_IP_MAP = new HashMap<>();
    private static final String SUCCESS_KEY = "auth_success_key";
    static {
        AUTH_IP_MAP.put("192.168.50.250", "1234");
    }

    private boolean auth(ChannelHandlerContext ctx, Object msg) {
        String[] ret = ((String) msg).split(",");
        String auth = AUTH_IP_MAP.get(ret[0]);
        if (auth != null && auth.equals(ret[1])) {
            ctx.writeAndFlush(SUCCESS_KEY);
            return true;
        } else {
            // 直接关闭 服务器端主动断开连接
            ctx.writeAndFlush("auth failure!!!").addListener(ChannelFutureListener.CLOSE);
            return false;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.err.println("服务端收到的消息:" + msg);
        if (msg instanceof String) {
            auth(ctx, msg);
        } else if (msg instanceof RequestInfo) {
            RequestInfo info = (RequestInfo) msg;
            System.err.println("------------------------------");
            System.out.println("当前主机IP为:  " + info.getIp());
            System.out.println("当前主机CPU:");
            HashMap<String, Object> cpu = info.getCpuPercMap();

            System.out.println("总使用率:  " + cpu.get("combined"));
            System.out.println("用户使用率:  " + cpu.get("user"));
            System.out.println("系统使用率:  " + cpu.get("sys"));
            System.out.println("等待率:  " + cpu.get("wait"));
            System.out.println("空闲率:  " + cpu.get("idle"));

            System.out.println("当前主机memory情况:");
            HashMap<String, Object> memory = info.getMemoryMap();
            System.out.println("内存总量:  " + memory.get("total") + "M");
            System.out.println("当前内存使用量:  " + memory.get("used") + "M");
            System.out.println("当前内存剩余量:  " + memory.get("free") + "M");
            ctx.writeAndFlush("info received");
        } else {
            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
        }
    }
}

1.3 RequestInfo


public class RequestInfo implements Serializable {
    private String ip;
    private HashMap<String, Object> cpuPercMap;
    private HashMap<String, Object> memoryMap;

    public void setIp(String hostAddress) {
        this.ip = hostAddress;

    }

    public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
        this.cpuPercMap = cpuPercMap;

    }

    public void setMemoryMap(HashMap<String, Object> memoryMap) {
        this.memoryMap = memoryMap;
    }

    public String getIp() {
        return ip;
    }

    public HashMap<String, Object> getCpuPercMap() {
        return cpuPercMap;
    }

    public HashMap<String, Object> getMemoryMap() {
        return memoryMap;
    }

    @Override
    public String toString() {
        return "RequestInfo [ip=" + ip + ", cpuPercMap=" + cpuPercMap + ", memoryMap=" + memoryMap + "]";
    }

}

1.4 MarshallingCodeCFactory

public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * 
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        // 创建了MarshallingConfiguration对象,配置了版本号为5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        // 根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * 
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

二 客户端代码

2.1 Client


public class Client {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup workgroup = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(workgroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ClientHeartBeatHandler());
            }
        });
        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
        cf1.channel().closeFuture().sync();
        workgroup.shutdownGracefully();
    }

}

2.2 ClientHeartBeatHandler


public class ClientHeartBeatHandler extends ChannelHandlerAdapter {
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private ScheduledFuture<?> heartBeat;

    // 主动向服务器发送认证信息
    private InetAddress addr;

    private static final String SUCCESS_KEY = "auth_success_key";

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        addr = InetAddress.getLocalHost();
        String ip = addr.getHostAddress();
        String key = "1234";
        // 证书
        String auth = ip + "," + key;
        ctx.writeAndFlush(auth);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (msg instanceof String) {
                String ret = (String) msg;
                if (SUCCESS_KEY.equals(ret)) {
                    // 握手成功,主动发送心跳消息
                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,
                            TimeUnit.SECONDS);
                    System.out.println("服务端响应消息:" + msg);
                }
            } else {
                System.out.println(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    class HeartBeatTask implements Runnable {
        private ChannelHandlerContext ctx;

        public HeartBeatTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            try {
                RequestInfo info = new RequestInfo();
                // ip
                info.setIp(addr.getHostAddress());
                Sigar sigar = new Sigar();

                CpuPerc cpuPerc = sigar.getCpuPerc();
                HashMap<String, Object> cpuPercMap = new HashMap<>();
                DecimalFormat df = new DecimalFormat("######0.00");
                cpuPercMap.put("combined", df.format(cpuPerc.getCombined() * 100) + "%");
                cpuPercMap.put("user", df.format(cpuPerc.getUser() * 100) + "%");
                cpuPercMap.put("sys", df.format(cpuPerc.getSys() * 100) + "%");
                cpuPercMap.put("wait", df.format(cpuPerc.getWait() * 100) + "%");
                cpuPercMap.put("idle", df.format(cpuPerc.getIdle() * 100) + "%");

                // memory
                Mem mem = sigar.getMem();

                HashMap<String, Object> memoryMap = new HashMap<>();
                memoryMap.put("total", mem.getTotal() / 1024L / 1024);
                memoryMap.put("used", mem.getUsed() / 1024L / 1024);
                memoryMap.put("free", mem.getFree() / 1024L / 1024);

                info.setCpuPercMap(cpuPercMap);
                info.setMemoryMap(memoryMap);
                addr = InetAddress.getLocalHost();
                String ip = addr.getHostAddress();
                String key = "1234";
                // 证书
                String auth = ip + "," + key;
                ctx.writeAndFlush(info);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }

    public static void main(String[] args) throws SigarException {
        Sigar sigar = new Sigar();
        // CPU数量(单位:个)
        int cpuLength = sigar.getCpuInfoList().length;
        System.out.println(cpuLength);

        // CPU的总量(单位:HZ)及CPU的相关信息
        CpuInfo infos[] = sigar.getCpuInfoList();
        for (int i = 0; i < infos.length; i++) {// 不管是单块CPU还是多CPU都适用
            CpuInfo info = infos[i];
            System.out.println("mhz=" + info.getMhz());// CPU的总量MHz
            System.out.println("vendor=" + info.getVendor());// 获得CPU的卖主,如:Intel
            System.out.println("model=" + info.getModel());// 获得CPU的类别,如:Celeron
            System.out.println("cache size=" + info.getCacheSize());// 缓冲存储器数量
        }
        /** CPU的用户使用量、系统使用剩余量、总的剩余量、总的使用占用量等(单位:100%) **/
        // 方式一,主要是针对一块CPU的情况
        CpuPerc cpu;
        try {
            cpu = sigar.getCpuPerc();
            System.out.println(cpu);
        } catch (SigarException e) {
            e.printStackTrace();
        }
        // 方式二,不管是单块CPU还是多CPU都适用
        CpuPerc cpuList[] = null;
        try {
            cpuList = sigar.getCpuPercList();
        } catch (SigarException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < cpuList.length; i++) {
            System.out.println(cpuList[i]);
        }
        System.err.println("-------------------内存使用情况-------------------");
        // 物理内存信息
        Mem mem = sigar.getMem();
        // 内存总量
        System.out.println("Total = " + mem.getTotal() / 1024L / 1024 + "M av");
        // 当前内存使用量
        System.out.println("Used = " + mem.getUsed() / 1024L / 1024 + "M used");
        // 当前内存剩余量
        System.out.println("Free = " + mem.getFree() / 1024L / 1024 + "M free");
        System.err.println("------------------系统页面文件交换区信息--------------------");
        // 系统页面文件交换区信息
        Swap swap = sigar.getSwap();
        // 交换区总量
        System.out.println("Total = " + swap.getTotal() / 1024L + "K av");
        // 当前交换区使用量
        System.out.println("Used = " + swap.getUsed() / 1024L + "K used");
        // 当前交换区剩余量
        System.out.println("Free = " + swap.getFree() / 1024L + "K free");
    }
}
相关标签: 心跳检测