Netty 系列九(支持UDP协议).
一、基础知识
udp 协议相较于 tcp 协议的特点:
1、无连接协议,没有持久化连接;
2、每个 udp 数据报都是一个单独的传输单元;
3、一定的数据报丢失;
4、没有重传机制,也不管数据报是否可达;
5、速度比tcp快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。
6、常用于音频、视频场景,可以忍受一定的数据包丢失,追求速度上的提升。
tcp 协议采用的是一种叫做单播的传输形式,udp 协议提供了向多个接收者发送消息的额外传输形式(多播、广播):
单播(tcp 和 udp):发送消息给一个由唯一的地址所标识的单一的网络目的地。
多播(udp):传输给一个预定义的主机组。
广播(udp):传输到网络(或者子网)上的所有主机。
二、功能说明
广播方:打开一个文件,通过 udp 使用特殊的受限广播地址或者零网络地址 255.255.255.255,把每一行作为一个消息广播到一个指定的端口。
接收方:通过 udp 广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。所有的在该 udp 端口上监听的事件监听器都将会接收到广播信息。
三、实现
下图展示了怎么将我们的 文件数据 广播为 udp消息:所有的将要被传输的数据都被封装在了 logevent 消息中。 logeventbroadcaster 将把这些写入到 channel 中,并通过 channelpipeline 发送它们,在那里它们将会被转换(编码)为 datagrampacket 消息。最后,他们都将通过 udp 被广播,并由远程节点(监视器)所捕获。
netty 中支持 udp 协议主要通过以下相关类:
datagrampacket:使用 bytebuf 作为数据源,是 udp 协议传输的消息容器。
datagramchannel:扩展了 netty 的 channel 抽象以支持 udp 的多播组管理,它的实现类 niodatagramchannnel 用来和远程节点通信。
bootstrap:udp 协议的引导类,使用 bind() 方法绑定 channel。
public class logevent { public static final byte separator = ':'; /** * ip套接字地址(ip地址+端口号) */ private final inetsocketaddress inetsocketaddress; /** * 文件名 */ private final string logfile; /** * 消息内容 */ private final string msg; private final long received; /** * 用于传入消息的构造函数 * * @param inetsocketaddress * @param logfile * @param msg * @param received */ public logevent(inetsocketaddress inetsocketaddress, string logfile, string msg, long received) { this.inetsocketaddress = inetsocketaddress; this.logfile = logfile; this.msg = msg; this.received = received; } /** * 用于传出消息的构造函数 * * @param logfile * @param msg */ public logevent(string logfile, string msg) { this(null, logfile, msg, -1); } public inetsocketaddress getinetsocketaddress() { return inetsocketaddress; } public string getlogfile() { return logfile; } public string getmsg() { return msg; } public long getreceived() { return received; } }
public class logeventencoder extends messagetomessageencoder<logevent> { private final inetsocketaddress remoteaddress; public logeventencoder(inetsocketaddress remoteaddress) { this.remoteaddress = remoteaddress; } @override protected void encode(channelhandlercontext ctx, logevent msg, list<object> out) throws exception { byte[] file = msg.getlogfile().getbytes(charsetutil.utf_8); byte[] content = msg.getmsg().getbytes(charsetutil.utf_8); bytebuf bytebuf = ctx.alloc().buffer(file.length + content.length + 1); bytebuf.writebytes(file); bytebuf.writebyte(logevent.separator); bytebuf.writebytes(content); out.add(new datagrampacket(bytebuf, remoteaddress)); } }
该编码器实现了将 logevent 实体类内容转换为 datagrampacket udp数据报。
public class logeventbroadcaster { private final eventloopgroup group; private final bootstrap bootstrap; private final file file; public logeventbroadcaster(inetsocketaddress address, file file) { group = new nioeventloopgroup(); bootstrap = new bootstrap(); bootstrap.group(group) //引导该 niodatagramchannel(无连接的) .channel(niodatagramchannel.class) // 设置 so_broadcast 套接字选项 .option(channeloption.so_broadcast, true) .handler(new logeventencoder(address)); this.file = file; } public void run() throws interruptedexception, ioexception { //绑定 channel,udp 协议的连接用 bind() 方法 channel channel = bootstrap.bind(0).sync().channel(); long pointer = 0; //长轮询 监听是否有新的日志文件生成 while (true) { long length = file.length(); if (length < pointer) { // 如果有必要,将文件指针设置到该文件的最后一个字节 pointer = length; } else { randomaccessfile raf = new randomaccessfile(file, "r"); // 确保当前的文件指针,以确保没有任何的旧数据被发送 raf.seek(pointer); string line; while ((line = raf.readline()) != null) { //对于每个日志条目,写入一个 logevent 到 channel 中,最后加入一个换行符号 channel.writeandflush(new logevent(file.getabsolutepath(), line + system.getproperty("line.separator"))); } pointer = raf.getfilepointer(); raf.close(); } try { // 休眠一秒,如果被中断,则退出循环,否则重新处理它 thread.sleep(1000); } catch (interruptedexception e) { while (!thread.interrupted()) { break; } } } } public void stop() { group.shutdowngracefully(); } public static void main(string[] args) throws ioexception, interruptedexception { inetsocketaddress socketaddress = new inetsocketaddress("255.255.255.255", 8888); file file = new file("e:\\2018-09-12.log"); logeventbroadcaster logeventbroadcaster = new logeventbroadcaster(socketaddress, file); try { logeventbroadcaster.run(); } finally { logeventbroadcaster.stop(); } } }
现在,我们来测试一下这个 udp 广播类,首先我们需要一个工具 nmap ,用它来监听 udp 的 8888 端口,以接收我们广播的日志文件。下载地址:
下载完成后,命令行进入安装目录,执行命令:ncat.exe -l -u -p 8888 ,监听 udp 端口。
当然,也可以自己写个测试类监听 udp 端口,打印日志查看。这里我没有用 netty 写监听类,直接用了 java 原生的 datagramsocket 和 datagrampacket 写的监听类,如下:
public class udpserver { public static void main(string[] args) { datagramsocket server = null; try { server = new datagramsocket(8888); byte[] datas = new byte[1024]; //用一个字节数组接收udp包,字节数组在传递给构造函数时是空的 while (true) { datagrampacket datagrampacket = new datagrampacket(datas, datas.length); server.receive(datagrampacket); system.out.println(new string(datas)); } } catch (socketexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } finally { server.close(); } } }
基于 netty 的监听类实现可以参考我上传 github 上的源代码。
参考资料:《netty in action》
演示源代码:https://github.com/jmcuixy/nettydemo/tree/master/src/main/java/org/netty/demo/udp
上一篇: C#多线程编程系列(二)- 线程基础
下一篇: 胖胖的女生好追