Netty实现心跳检测
程序员文章站
2024-03-23 22:49:52
...
一.服务端代码
1.1 Server
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
// 设置日志
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
};
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
1.2 ServerHeartBeatHandler
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
/** key:ip value:auth **/
private static HashMap<String, String> AUTH_IP_MAP = new HashMap<>();
private static final String SUCCESS_KEY = "auth_success_key";
static {
AUTH_IP_MAP.put("192.168.50.250", "1234");
}
private boolean auth(ChannelHandlerContext ctx, Object msg) {
String[] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if (auth != null && auth.equals(ret[1])) {
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
// 直接关闭 服务器端主动断开连接
ctx.writeAndFlush("auth failure!!!").addListener(ChannelFutureListener.CLOSE);
return false;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.err.println("服务端收到的消息:" + msg);
if (msg instanceof String) {
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo) msg;
System.err.println("------------------------------");
System.out.println("当前主机IP为: " + info.getIp());
System.out.println("当前主机CPU:");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle"));
System.out.println("当前主机memory情况:");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total") + "M");
System.out.println("当前内存使用量: " + memory.get("used") + "M");
System.out.println("当前内存剩余量: " + memory.get("free") + "M");
ctx.writeAndFlush("info received");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}
}
1.3 RequestInfo
public class RequestInfo implements Serializable {
private String ip;
private HashMap<String, Object> cpuPercMap;
private HashMap<String, Object> memoryMap;
public void setIp(String hostAddress) {
this.ip = hostAddress;
}
public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}
public String getIp() {
return ip;
}
public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public HashMap<String, Object> getMemoryMap() {
return memoryMap;
}
@Override
public String toString() {
return "RequestInfo [ip=" + ip + ", cpuPercMap=" + cpuPercMap + ", memoryMap=" + memoryMap + "]";
}
}
1.4 MarshallingCodeCFactory
public final class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
二 客户端代码
2.1 Client
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup workgroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workgroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHeartBeatHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
cf1.channel().closeFuture().sync();
workgroup.shutdownGracefully();
}
}
2.2 ClientHeartBeatHandler
public class ClientHeartBeatHandler extends ChannelHandlerAdapter {
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> heartBeat;
// 主动向服务器发送认证信息
private InetAddress addr;
private static final String SUCCESS_KEY = "auth_success_key";
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
// 证书
String auth = ip + "," + key;
ctx.writeAndFlush(auth);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (msg instanceof String) {
String ret = (String) msg;
if (SUCCESS_KEY.equals(ret)) {
// 握手成功,主动发送心跳消息
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,
TimeUnit.SECONDS);
System.out.println("服务端响应消息:" + msg);
}
} else {
System.out.println(msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
class HeartBeatTask implements Runnable {
private ChannelHandlerContext ctx;
public HeartBeatTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
RequestInfo info = new RequestInfo();
// ip
info.setIp(addr.getHostAddress());
Sigar sigar = new Sigar();
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<>();
DecimalFormat df = new DecimalFormat("######0.00");
cpuPercMap.put("combined", df.format(cpuPerc.getCombined() * 100) + "%");
cpuPercMap.put("user", df.format(cpuPerc.getUser() * 100) + "%");
cpuPercMap.put("sys", df.format(cpuPerc.getSys() * 100) + "%");
cpuPercMap.put("wait", df.format(cpuPerc.getWait() * 100) + "%");
cpuPercMap.put("idle", df.format(cpuPerc.getIdle() * 100) + "%");
// memory
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<>();
memoryMap.put("total", mem.getTotal() / 1024L / 1024);
memoryMap.put("used", mem.getUsed() / 1024L / 1024);
memoryMap.put("free", mem.getFree() / 1024L / 1024);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
// 证书
String auth = ip + "," + key;
ctx.writeAndFlush(info);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
public static void main(String[] args) throws SigarException {
Sigar sigar = new Sigar();
// CPU数量(单位:个)
int cpuLength = sigar.getCpuInfoList().length;
System.out.println(cpuLength);
// CPU的总量(单位:HZ)及CPU的相关信息
CpuInfo infos[] = sigar.getCpuInfoList();
for (int i = 0; i < infos.length; i++) {// 不管是单块CPU还是多CPU都适用
CpuInfo info = infos[i];
System.out.println("mhz=" + info.getMhz());// CPU的总量MHz
System.out.println("vendor=" + info.getVendor());// 获得CPU的卖主,如:Intel
System.out.println("model=" + info.getModel());// 获得CPU的类别,如:Celeron
System.out.println("cache size=" + info.getCacheSize());// 缓冲存储器数量
}
/** CPU的用户使用量、系统使用剩余量、总的剩余量、总的使用占用量等(单位:100%) **/
// 方式一,主要是针对一块CPU的情况
CpuPerc cpu;
try {
cpu = sigar.getCpuPerc();
System.out.println(cpu);
} catch (SigarException e) {
e.printStackTrace();
}
// 方式二,不管是单块CPU还是多CPU都适用
CpuPerc cpuList[] = null;
try {
cpuList = sigar.getCpuPercList();
} catch (SigarException e) {
e.printStackTrace();
}
for (int i = 0; i < cpuList.length; i++) {
System.out.println(cpuList[i]);
}
System.err.println("-------------------内存使用情况-------------------");
// 物理内存信息
Mem mem = sigar.getMem();
// 内存总量
System.out.println("Total = " + mem.getTotal() / 1024L / 1024 + "M av");
// 当前内存使用量
System.out.println("Used = " + mem.getUsed() / 1024L / 1024 + "M used");
// 当前内存剩余量
System.out.println("Free = " + mem.getFree() / 1024L / 1024 + "M free");
System.err.println("------------------系统页面文件交换区信息--------------------");
// 系统页面文件交换区信息
Swap swap = sigar.getSwap();
// 交换区总量
System.out.println("Total = " + swap.getTotal() / 1024L + "K av");
// 当前交换区使用量
System.out.println("Used = " + swap.getUsed() / 1024L + "K used");
// 当前交换区剩余量
System.out.println("Free = " + swap.getFree() / 1024L + "K free");
}
}
上一篇: tensorflow入门第一步hello world
下一篇: 输出一个整数的每一位
推荐阅读
-
Netty实现心跳检测
-
Netty心跳检测(1)
-
Netty简单入门:获取请求、多客户端连接与通信、心跳检测、长链接
-
nio实现Socket长连接和心跳 博客分类: SocketJ2SE
-
netty实现http服务器
-
手把手教你用yolov3模型实现目标检测教程(二) -数据标注
-
VBScript 之检测文件夹是否存在&不存在则创建【代码实现】
-
shell脚本实现:定时检测 若ssh用户访问密码输入错误次数超过上限 则强制加入ssh访问黑名单
-
Java io nio netty三种方式实现简单聊天功能 博客分类: java基础 ionionetty聊天
-
【经典算法实现 39】图的最短路径计算(优化Dijkstra算法支持负权计算 及 负环检测功能)(参考Bellman_Ford算法)