欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty心跳检查之UDP篇

程序员文章站 2022-05-06 20:08:21
...
  部分UDP通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端IP,以便服务器主动发送控制命令。如移动通信,内网穿越等。
  使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。

  学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
  程序逻辑比较简单,不多解释,请看注释。

一、辅助类
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.util.AttributeKey;

/**
 * 记录一些常量。真正的应用要从配置文件中读取。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午11:41:26,作者:wallimn
 */
public class Config {
	private Config(){};
	
	public static final AttributeKey<InetSocketAddress> SERVER_ADDR_ATTR=AttributeKey.newInstance("SERVER_ADDR_ATTR");
	
	//原来打算将客户端的ID记录在Channel的属性中,后来发现对于UDP不适用。
	//public static final AttributeKey<String> CLIENT_ID=AttributeKey.newInstance("CLIENT_ID");
	public static final int IDLE_TIME=5;//允许的发呆时间
	
	public static final int SERVER_PORT=8585;
	public static final String SERVER_IP="localhost";
	
	public static final long CLIENT_VALID_THRESHOLD=5000;//客户端地址有效的时间阀值。单位为毫秒。
}


package com.wallimn.iteye.netty.heart;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/**
 * 用来模拟持有数据
 * 
 * <br>
 * <br>时间:2019年9月14日 下午7:58:40,作者:wallimn
 */
public class DataHolder {
	
	private DataHolder(){}
	
	/**
	 * 记录客户端的消息
	 */
	public static ConcurrentLinkedQueue<ClientMessage> clientMessageQueue = new ConcurrentLinkedQueue<ClientMessage>();
	
	/**
	 * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息
	 */
	public static ConcurrentMap<String, ClientInformation> clientInformationMap = new ConcurrentHashMap<String, ClientInformation>();
}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.Date;

import lombok.Data;

/**
 * 客户端信息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientInformation {
	/**
	 * 客户端唯一标识
	 */
	private String id;
	/**
	 * 收到时间
	 */
	private Date recordTime;
	/**
	 * 客户端地址,
	 */
	private InetSocketAddress address;
}


package com.wallimn.iteye.netty.heart;

import lombok.Data;

/**
 * 客户端发来的消息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientMessage {
	/**
	 * 消息
	 */
	private String message;
	
	/**
	 * 客户端信息
	 */
	private ClientInformation client;
}


二、客户端代码
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.UUID;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

/**
 * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午9:09:47,作者:wallimn
 */
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println(msg);
		
		//如果收到exit信息,关闭channel
		if("exit".equals(msg)){
			ctx.close();
		}
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		InetSocketAddress addr = ctx.channel().attr(Config.SERVER_ADDR_ATTR).get();
		String clientId;
		String message;
		
		//发送1条心跳
		clientId = UUID.randomUUID().toString().toUpperCase().replace("-", "");
//		message = clientId+";"+"heart";//发送的信息
//		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
//		System.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录


		message = clientId+";"+"time";//发送的信息
		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
		System.out.println("发送对时信息");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		
		ctx.close();
	}


}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

/**
 * 客户端程序
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ClientApp
 * 
 * <br>
 * <br>时间:2019年9月14日 上午8:58:13,作者:wallimn
 */
public class ClientApp {
	

	public static void main(String[] args) {
		int port = Config.SERVER_PORT;
		new ClientApp().run(port);
	}

	
	public void run(int port){
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
			.channel(NioDatagramChannel.class)
			//.option(ChannelOption.SO_BROADCAST, true)
			.handler(new ClientHandler());
		InetSocketAddress addr = new InetSocketAddress(Config.SERVER_IP,port);
		b.attr(Config.SERVER_ADDR_ATTR, addr);
		
		try {
			Channel ch = b.bind(0).sync().channel();//使用一个随机端口
			
			//最长运行30秒
			if(!ch.closeFuture().await(30000)){
				System.out.println("操作超时");
			}
			
			System.out.println("退出");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		finally{
			group.shutdownGracefully();
		}
	}
}


三、服务器端代码
package com.wallimn.iteye.netty.heart;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
 * 服务器处理器(handler)
 * 
 * <br>
 * <br>时间:2019年9月15日 上午8:37:15,作者:wallimn
 */
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		System.out.println("读取信息,channelShortId="+ctx.channel().id().asShortText());
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println("host: " + packet.sender().getHostString());
		System.out.println("port: " + packet.sender().getPort());
		System.out.println("content: " + msg);
		String[] fields = msg.split(";");
		if (fields.length != 2) {
			return;
		}
		ClientInformation client = new ClientInformation();
		client.setId(fields[0]);
		client.setRecordTime(new Date());
		client.setAddress(packet.sender());
		ClientMessage message = new ClientMessage();
		message.setClient(client);
		message.setMessage(fields[1]);
		
		System.out.println("加入待处理数据队列");
		
		//标注客户端的ID

		// 不对消息进行处理,只是加入队列,由其他线程进行处理
		if(!"heart".equals(message.getMessage())){//如果不是心跳消息
			DataHolder.clientMessageQueue.add(message);		
		}
		
		//不管什么消息,更新客户端的信息
		DataHolder.clientInformationMap.put(client.getId(), client);
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		super.exceptionCaught(ctx, cause);
	}
	
//这个方法对于UDP没有什么意义
//	@Override
//	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//		if (evt instanceof IdleStateEvent) {
//			IdleStateEvent e = (IdleStateEvent) evt;
//			switch (e.state()) {
//			case READER_IDLE:
//				System.out.println("READER_IDLE");
//				break;
//			case WRITER_IDLE:
//				System.out.println("WRITER_IDLE");
//				break;
//			case ALL_IDLE:
//				System.out.println("ALL_IDLE");
//				break;
//			default:
//				break;
//			}
//		}
//	}

}


package com.wallimn.iteye.netty.heart;

import java.util.Date;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

/**
 * 服务器应用
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ServerApp
 * <br>
 * <br>
 * 时间:2019年9月14日 上午9:47:29,作者:wallimn
 */
public class ServerApp {


	//public static ConcurrentMap<String, ClientMessage> clientMessageMap = new ConcurrentHashMap<String, ClientMessage>();

	// 内部放置多个Task,
	public static HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 1, // tick一下的时间
			TimeUnit.SECONDS, 3);// 放置Timer的数量

	public static Channel channel = null;

	public static final long RESPONSE_TIMEER_DELAY = 1L;
	public static final long CHECK_TIMEER_DELAY = 5L;
	public static final long HELLO_TIMEER_DELAY = 2L;

	/**
	 * 处理客户端发来的消息
	 */
	public static TimerTask responseTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			//System.out.println("responseTasker run, size is "+DataHolder.clientMessageQueue.size());
			
			ClientMessage message;
			for (Iterator<ClientMessage> iterator=DataHolder.clientMessageQueue.iterator();iterator.hasNext();) {
				message = iterator.next();
				System.out.println(message.getMessage());
				if("time".equals(message.getMessage())){
					channel.writeAndFlush(
							new DatagramPacket(Unpooled.copiedBuffer("18:18", CharsetUtil.UTF_8), message.getClient().getAddress()));
				}
				else{
					;
				}
				//处理完后清除
				iterator.remove();
			}

		}
	};

	/**
	 * 用于检查客户端是否有效
	 */
	public static TimerTask checkTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			long now = new Date().getTime();
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				if (now - client.getRecordTime().getTime() > Config.CLIENT_VALID_THRESHOLD) {

					System.out.println("client kick : " + client.getId());
					DataHolder.clientInformationMap.remove(entry.getKey());
				}
			}

		}

	};
	
	/**
	 * 用于模拟主动向客户端发送消息
	 */
	public static TimerTask helloTimer = new TimerTask(){

		public void run(Timeout timeout) throws Exception {
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			timer.newTimeout(this, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				System.out.println("helloTimer run. send to "+client.getId());
				
				channel.writeAndFlush(
						new DatagramPacket(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8), client.getAddress()));
			}
		}
		
	};
	
	
	public static void main(String[] args) throws Exception {
		int port = Config.SERVER_PORT;
		timer.newTimeout(responseTasker, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(checkTasker, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(helloTimer, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
		new ServerApp().run(port);

	}

	public void run(int port) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();

		b.group(group).channel(NioDatagramChannel.class)
				// .option(ChannelOption.SO_BROADCAST, true)
				.handler(new ChannelInitializer<Channel>(){

					@Override
					protected void initChannel(Channel ch) throws Exception {
						//ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
						ch.pipeline().addLast(new ServerHandler());
					}
					
				});

		ChannelFuture future = b.bind(port).sync();
		channel = future.channel();
		System.out.println("服务器准备就绪,channelShortId="+channel.id().asShortText());
		channel.closeFuture().await();

		group.shutdownGracefully();
	}
}



相关标签: netty udp