欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty 系列九(支持UDP协议).

程序员文章站 2022-04-25 20:04:05
一、基础知识 UDP 协议相较于 TCP 协议的特点: 1、无连接协议,没有持久化连接;2、每个 UDP 数据报都是一个单独的传输单元;3、一定的数据报丢失;4、没有重传机制,也不管数据报是否可达;5、速度比TCP快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。6、常用于音频、视频 ......

一、基础知识

    udp 协议相较于 tcp 协议的特点:

1、无连接协议,没有持久化连接;
2、每个 udp 数据报都是一个单独的传输单元;
3、一定的数据报丢失;
4、没有重传机制,也不管数据报是否可达;
5、速度比tcp快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。
6、常用于音频、视频场景,可以忍受一定的数据包丢失,追求速度上的提升。

    tcp 协议采用的是一种叫做单播的传输形式,udp 协议提供了向多个接收者发送消息的额外传输形式(多播、广播):

单播(tcp 和 udp):发送消息给一个由唯一的地址所标识的单一的网络目的地。
多播(udp):传输给一个预定义的主机组。
广播(udp):传输到网络(或者子网)上的所有主机。

二、功能说明

    广播方:打开一个文件,通过 udp 使用特殊的受限广播地址或者零网络地址 255.255.255.255,把每一行作为一个消息广播到一个指定的端口。

    接收方:通过 udp 广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。所有的在该 udp 端口上监听的事件监听器都将会接收到广播信息。

Netty 系列九(支持UDP协议).

三、实现

    下图展示了怎么将我们的 文件数据 广播为 udp消息:所有的将要被传输的数据都被封装在了 logevent 消息中。 logeventbroadcaster 将把这些写入到 channel 中,并通过 channelpipeline 发送它们,在那里它们将会被转换(编码)为 datagrampacket 消息。最后,他们都将通过 udp 被广播,并由远程节点(监视器)所捕获。

Netty 系列九(支持UDP协议).

    netty 中支持 udp 协议主要通过以下相关类:

datagrampacket:使用 bytebuf 作为数据源,是 udp 协议传输的消息容器。

datagramchannel:扩展了 netty 的 channel 抽象以支持 udp 的多播组管理,它的实现类 niodatagramchannnel 用来和远程节点通信。

bootstrap:udp 协议的引导类,使用 bind() 方法绑定 channel。    

Netty 系列九(支持UDP协议).
public class logevent {
    public static final byte separator = ':';
    /**
     * ip套接字地址(ip地址+端口号)
     */
    private final inetsocketaddress inetsocketaddress;
    /**
     * 文件名
     */
    private final string logfile;
    /**
     * 消息内容
     */
    private final string msg;
    
    private final long received;

    /**
     * 用于传入消息的构造函数
     *
     * @param inetsocketaddress
     * @param logfile
     * @param msg
     * @param received
     */
    public logevent(inetsocketaddress inetsocketaddress, string logfile, string msg, long received) {
        this.inetsocketaddress = inetsocketaddress;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    /**
     * 用于传出消息的构造函数
     *
     * @param logfile
     * @param msg
     */
    public logevent(string logfile, string msg) {
        this(null, logfile, msg, -1);
    }

    public inetsocketaddress getinetsocketaddress() {
        return inetsocketaddress;
    }

    public string getlogfile() {
        return logfile;
    }

    public string getmsg() {
        return msg;
    }

    public long getreceived() {
        return received;
    }
}
文件实体类 logevent.java
Netty 系列九(支持UDP协议).
public class logeventencoder extends messagetomessageencoder<logevent> {
    private final inetsocketaddress remoteaddress;

    public logeventencoder(inetsocketaddress remoteaddress) {
        this.remoteaddress = remoteaddress;
    }


    @override
    protected void encode(channelhandlercontext ctx, logevent msg, list<object> out) throws exception {
        byte[] file = msg.getlogfile().getbytes(charsetutil.utf_8);
        byte[] content = msg.getmsg().getbytes(charsetutil.utf_8);
        bytebuf bytebuf = ctx.alloc().buffer(file.length + content.length + 1);
        bytebuf.writebytes(file);
        bytebuf.writebyte(logevent.separator);
        bytebuf.writebytes(content);
        out.add(new datagrampacket(bytebuf, remoteaddress));
    }
}
编码器 logeventencoder.java

    该编码器实现了将 logevent 实体类内容转换为 datagrampacket udp数据报。

public class logeventbroadcaster {
    private final eventloopgroup group;
    private final bootstrap bootstrap;
    private final file file;

    public logeventbroadcaster(inetsocketaddress address, file file) {
        group = new nioeventloopgroup();
        bootstrap = new bootstrap();
        bootstrap.group(group)
                //引导该 niodatagramchannel(无连接的)
                .channel(niodatagramchannel.class)
                // 设置 so_broadcast 套接字选项
                .option(channeloption.so_broadcast, true)
                .handler(new logeventencoder(address));
        this.file = file;
    }
    
    public void run() throws interruptedexception, ioexception {
        //绑定 channel,udp 协议的连接用 bind() 方法
        channel channel = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //长轮询 监听是否有新的日志文件生成
        while (true) {
            long length = file.length();
            if (length < pointer) {
                // 如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = length;
            } else {
                randomaccessfile raf = new randomaccessfile(file, "r");
                // 确保当前的文件指针,以确保没有任何的旧数据被发送
                raf.seek(pointer);
                string line;
                while ((line = raf.readline()) != null) {
                    //对于每个日志条目,写入一个 logevent 到 channel 中,最后加入一个换行符号
                    channel.writeandflush(new logevent(file.getabsolutepath(), line + system.getproperty("line.separator")));
                }
                pointer = raf.getfilepointer();
                raf.close();
            }
            try {
                // 休眠一秒,如果被中断,则退出循环,否则重新处理它
                thread.sleep(1000);
            } catch (interruptedexception e) {
                while (!thread.interrupted()) {
                    break;
                }
            }
        }
    }

    public void stop() {
        group.shutdowngracefully();
    }

    public static void main(string[] args) throws ioexception, interruptedexception {
        inetsocketaddress socketaddress = new inetsocketaddress("255.255.255.255", 8888);
        file file = new file("e:\\2018-09-12.log");
        logeventbroadcaster logeventbroadcaster = new logeventbroadcaster(socketaddress, file);
        try {
            logeventbroadcaster.run();
        } finally {
            logeventbroadcaster.stop();
        }
    }
}

    现在,我们来测试一下这个 udp 广播类,首先我们需要一个工具 nmap ,用它来监听 udp 的 8888 端口,以接收我们广播的日志文件。下载地址:

    下载完成后,命令行进入安装目录,执行命令:ncat.exe -l -u -p 8888 ,监听 udp 端口。

 Netty 系列九(支持UDP协议).

     当然,也可以自己写个测试类监听 udp 端口,打印日志查看。这里我没有用 netty 写监听类,直接用了 java 原生的 datagramsocket 和 datagrampacket 写的监听类,如下:

Netty 系列九(支持UDP协议).
public class udpserver {

    public static void main(string[] args) {
        datagramsocket server = null;
        try {
            server = new datagramsocket(8888);
            byte[] datas = new byte[1024];
            //用一个字节数组接收udp包,字节数组在传递给构造函数时是空的
            while (true) {
                datagrampacket datagrampacket = new datagrampacket(datas, datas.length);
                server.receive(datagrampacket);
                system.out.println(new string(datas));
            }
        } catch (socketexception e) {
            e.printstacktrace();
        } catch (ioexception e) {
            e.printstacktrace();
        } finally {
            server.close();
        }
    }
}
udpserver.java

    基于 netty 的监听类实现可以参考我上传 github 上的源代码。

 

参考资料:《netty in action》

演示源代码:https://github.com/jmcuixy/nettydemo/tree/master/src/main/java/org/netty/demo/udp