netty心跳检查之UDP篇
程序员文章站
2022-05-06 20:08:21
...
部分UDP通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端IP,以便服务器主动发送控制命令。如移动通信,内网穿越等。
使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。
学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
程序逻辑比较简单,不多解释,请看注释。
一、辅助类
二、客户端代码
三、服务器端代码
使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。
学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
程序逻辑比较简单,不多解释,请看注释。
一、辅助类
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import io.netty.util.AttributeKey; /** * 记录一些常量。真正的应用要从配置文件中读取。 * * <br> * <br>时间:2019年9月14日 下午11:41:26,作者:wallimn */ public class Config { private Config(){}; public static final AttributeKey<InetSocketAddress> SERVER_ADDR_ATTR=AttributeKey.newInstance("SERVER_ADDR_ATTR"); //原来打算将客户端的ID记录在Channel的属性中,后来发现对于UDP不适用。 //public static final AttributeKey<String> CLIENT_ID=AttributeKey.newInstance("CLIENT_ID"); public static final int IDLE_TIME=5;//允许的发呆时间 public static final int SERVER_PORT=8585; public static final String SERVER_IP="localhost"; public static final long CLIENT_VALID_THRESHOLD=5000;//客户端地址有效的时间阀值。单位为毫秒。 }
package com.wallimn.iteye.netty.heart; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; /** * 用来模拟持有数据 * * <br> * <br>时间:2019年9月14日 下午7:58:40,作者:wallimn */ public class DataHolder { private DataHolder(){} /** * 记录客户端的消息 */ public static ConcurrentLinkedQueue<ClientMessage> clientMessageQueue = new ConcurrentLinkedQueue<ClientMessage>(); /** * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息 */ public static ConcurrentMap<String, ClientInformation> clientInformationMap = new ConcurrentHashMap<String, ClientInformation>(); }
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import java.util.Date; import lombok.Data; /** * 客户端信息 * * <br> * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn */ @Data public class ClientInformation { /** * 客户端唯一标识 */ private String id; /** * 收到时间 */ private Date recordTime; /** * 客户端地址, */ private InetSocketAddress address; }
package com.wallimn.iteye.netty.heart; import lombok.Data; /** * 客户端发来的消息 * * <br> * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn */ @Data public class ClientMessage { /** * 消息 */ private String message; /** * 客户端信息 */ private ClientInformation client; }
二、客户端代码
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import java.util.UUID; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; /** * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。 * * <br> * <br>时间:2019年9月14日 下午9:09:47,作者:wallimn */ public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { String msg = packet.content().toString(CharsetUtil.UTF_8); System.out.println(msg); //如果收到exit信息,关闭channel if("exit".equals(msg)){ ctx.close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress addr = ctx.channel().attr(Config.SERVER_ADDR_ATTR).get(); String clientId; String message; //发送1条心跳 clientId = UUID.randomUUID().toString().toUpperCase().replace("-", ""); // message = clientId+";"+"heart";//发送的信息 // ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr)); // System.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录 message = clientId+";"+"time";//发送的信息 ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr)); System.out.println("发送对时信息"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; /** * 客户端程序 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ClientApp * * <br> * <br>时间:2019年9月14日 上午8:58:13,作者:wallimn */ public class ClientApp { public static void main(String[] args) { int port = Config.SERVER_PORT; new ClientApp().run(port); } public void run(int port){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioDatagramChannel.class) //.option(ChannelOption.SO_BROADCAST, true) .handler(new ClientHandler()); InetSocketAddress addr = new InetSocketAddress(Config.SERVER_IP,port); b.attr(Config.SERVER_ADDR_ATTR, addr); try { Channel ch = b.bind(0).sync().channel();//使用一个随机端口 //最长运行30秒 if(!ch.closeFuture().await(30000)){ System.out.println("操作超时"); } System.out.println("退出"); } catch (InterruptedException e) { e.printStackTrace(); } finally{ group.shutdownGracefully(); } } }
三、服务器端代码
package com.wallimn.iteye.netty.heart; import java.util.Date; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; /** * 服务器处理器(handler) * * <br> * <br>时间:2019年9月15日 上午8:37:15,作者:wallimn */ public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { System.out.println("读取信息,channelShortId="+ctx.channel().id().asShortText()); String msg = packet.content().toString(CharsetUtil.UTF_8); System.out.println("host: " + packet.sender().getHostString()); System.out.println("port: " + packet.sender().getPort()); System.out.println("content: " + msg); String[] fields = msg.split(";"); if (fields.length != 2) { return; } ClientInformation client = new ClientInformation(); client.setId(fields[0]); client.setRecordTime(new Date()); client.setAddress(packet.sender()); ClientMessage message = new ClientMessage(); message.setClient(client); message.setMessage(fields[1]); System.out.println("加入待处理数据队列"); //标注客户端的ID // 不对消息进行处理,只是加入队列,由其他线程进行处理 if(!"heart".equals(message.getMessage())){//如果不是心跳消息 DataHolder.clientMessageQueue.add(message); } //不管什么消息,更新客户端的信息 DataHolder.clientInformationMap.put(client.getId(), client); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } //这个方法对于UDP没有什么意义 // @Override // public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // if (evt instanceof IdleStateEvent) { // IdleStateEvent e = (IdleStateEvent) evt; // switch (e.state()) { // case READER_IDLE: // System.out.println("READER_IDLE"); // break; // case WRITER_IDLE: // System.out.println("WRITER_IDLE"); // break; // case ALL_IDLE: // System.out.println("ALL_IDLE"); // break; // default: // break; // } // } // } }
package com.wallimn.iteye.netty.heart; import java.util.Date; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.util.CharsetUtil; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; /** * 服务器应用 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ServerApp * <br> * <br> * 时间:2019年9月14日 上午9:47:29,作者:wallimn */ public class ServerApp { //public static ConcurrentMap<String, ClientMessage> clientMessageMap = new ConcurrentHashMap<String, ClientMessage>(); // 内部放置多个Task, public static HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 1, // tick一下的时间 TimeUnit.SECONDS, 3);// 放置Timer的数量 public static Channel channel = null; public static final long RESPONSE_TIMEER_DELAY = 1L; public static final long CHECK_TIMEER_DELAY = 5L; public static final long HELLO_TIMEER_DELAY = 2L; /** * 处理客户端发来的消息 */ public static TimerTask responseTasker = new TimerTask() { public void run(Timeout timeout) throws Exception { timer.newTimeout(this, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS); if (channel == null){ System.out.println("channel is null"); return; } if (channel.isActive() == false) { System.out.println("channel is inactive"); timeout.cancel(); return; } //System.out.println("responseTasker run, size is "+DataHolder.clientMessageQueue.size()); ClientMessage message; for (Iterator<ClientMessage> iterator=DataHolder.clientMessageQueue.iterator();iterator.hasNext();) { message = iterator.next(); System.out.println(message.getMessage()); if("time".equals(message.getMessage())){ channel.writeAndFlush( new DatagramPacket(Unpooled.copiedBuffer("18:18", CharsetUtil.UTF_8), message.getClient().getAddress())); } else{ ; } //处理完后清除 iterator.remove(); } } }; /** * 用于检查客户端是否有效 */ public static TimerTask checkTasker = new TimerTask() { public void run(Timeout timeout) throws Exception { timer.newTimeout(this, CHECK_TIMEER_DELAY, TimeUnit.SECONDS); ClientInformation client = null; long now = new Date().getTime(); for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) { client = entry.getValue(); if (now - client.getRecordTime().getTime() > Config.CLIENT_VALID_THRESHOLD) { System.out.println("client kick : " + client.getId()); DataHolder.clientInformationMap.remove(entry.getKey()); } } } }; /** * 用于模拟主动向客户端发送消息 */ public static TimerTask helloTimer = new TimerTask(){ public void run(Timeout timeout) throws Exception { if (channel == null){ System.out.println("channel is null"); return; } if (channel.isActive() == false) { System.out.println("channel is inactive"); timeout.cancel(); return; } timer.newTimeout(this, HELLO_TIMEER_DELAY, TimeUnit.SECONDS); ClientInformation client = null; for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) { client = entry.getValue(); System.out.println("helloTimer run. send to "+client.getId()); channel.writeAndFlush( new DatagramPacket(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8), client.getAddress())); } } }; public static void main(String[] args) throws Exception { int port = Config.SERVER_PORT; timer.newTimeout(responseTasker, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS); timer.newTimeout(checkTasker, CHECK_TIMEER_DELAY, TimeUnit.SECONDS); timer.newTimeout(helloTimer, HELLO_TIMEER_DELAY, TimeUnit.SECONDS); new ServerApp().run(port); } public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) // .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>(){ @Override protected void initChannel(Channel ch) throws Exception { //ch.pipeline().addLast(new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = b.bind(port).sync(); channel = future.channel(); System.out.println("服务器准备就绪,channelShortId="+channel.id().asShortText()); channel.closeFuture().await(); group.shutdownGracefully(); } }