欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java Nio使用NioSocket客户端与服务端交互实现方式

程序员文章站 2022-06-25 23:30:49
niosocket 客户端与服务端交互实现java nio是jdk1.4新增的io方式—–nio(new io),这种方式在目前来说算不算new,更合适的解释应该是non-block io。non-b...

niosocket 客户端与服务端交互实现

java nio是jdk1.4新增的io方式—–nio(new io),这种方式在目前来说算不算new,更合适的解释应该是non-block io。

non-block是相对于传统的io方式来讲的。传统的io方式是阻塞的,我们拿网络io来举例,传统的io模型如下:

java Nio使用NioSocket客户端与服务端交互实现方式

服务端主线程负责不断地server.accept(),如果没有客户端请求主线程就会阻塞,当客户端请求时,主线程会通过线程池创建一个新的线程执行。

简单解释就是一个线程负责一个客户端的socket,当客户端因网络等原因传递速度慢的时候,服务端对应的客户端的线程就会等待,很浪费资源。

同时线程过少的话会影响服务的吞吐量,而线程过多的话由于上下文切换等原因会导致效率十分低下,传统的io方式并不适合如今的网络流量。

nio的模型如下:

java Nio使用NioSocket客户端与服务端交互实现方式

nio相比传统的io模型,最大的特点是优化了线程的使用。

nio通过selector可以使用一个线程去管理多个socket句柄,说是管理也不太合适,nio是采用的事件驱动模型,selector负责的是监控各个连接句柄的状态,不是去轮询每个句柄,而是在数据就绪后,将消息通知给selector,而具体的socket句柄管理则是采用多路复用的模型,交由操作系统来完成。

selector充当的是一个消息的监听者,负责监听channel在其注册的事件,这样就可以通过一个线程完成了大量连接的管理,当注册的事件发生后,再调用相应线程进行处理。

这样就不需要为每个连接都使用一个线程去维持长连接,减少了长连接的开销,同时减少了上下文的切换提高了系统的吞吐量。

java nio的组成

java nio主要由三个核心部分组成:

- buffer 
- channel 
- selector

所有的io的nio都是从一个channel开始的,channel有点类似于流,但是和流不同的是,channel是可以双向读写的。channel有几种类型,主要包含文件io操作和网络io:

- filechannel (文件io) 
- datagramchannel (udp数据报) 
- socketchannel (tcp客户端) 
- serversocketchannel (tcp服务端)

buffer是一个中间缓存区,数据可以从channel读取到buffer,也可以从buffer写到channel中,在java中,传统方式与io的交互,需要将数据从堆内存读取到直接内存中,然后交由c语言来调用系统服务完成io的交互。

而使用buffer可以直接在直接内存中开辟内存区域,减少了io复制的操作,从而提高了io操作的效率。

#基本数据类型的buffer 
- bytebuffer 
- charbuffer 
- doublebuffer 
- floatbuffer 
- intbuffer 
- longbuffer 
- shortbuffer
#文件内存映射buffer 
- mappedbytebuffer
#直接内存区buffer 
- directbuffer

selector允许单个线程处理多个channel,可以将多个channel教给selector管理,并注册相应的事件,而selector则采用事件驱动的方式,当注册的事件就绪后,调用相应的相应的线程处理该时间,不用使用线程去维持长连接,减少了线程的开销。

selector通过静态工厂的open方法建立,然后通过channel的register注册到channel上。

注册后通过select方法等待请求,select请求有long类型参数,代表等待时间,如果等待时间内接受到操作请求,则返回可以操作请求的数量,否则超时往下走。

传入参数为零或者无参方法,则会采用阻塞模式知道有相应请求。

收到请求后调用selectedkeys返回selectionkey的集合。

selectionkey保存了处理当前请求的channel和selector,并且提供了不同的操作类型。

selectionkey的操作有四种:

- selectionkey.op_connect 
- selectionkey.op_accept 
- selectionkey.op_read 
- selectionkey.op_write

下面为一个客户端与服务端实用niosocket交互的简单例子:

//对selectionkey事件的处理
/**
 * description:
 *
 * @author wkgui
 */
interface serverhandlerbs {
    void handleaccept(selectionkey selectionkey) throws ioexception;
    string handleread(selectionkey selectionkey) throws ioexception;
}
/**
 * description:
 *
 * @author wkgui
 */
public class serverhandlerimpl implements serverhandlerbs {
    private int buffersize = 1024;
    private string localcharset = "utf-8";
    public serverhandlerimpl() {
    }
    public serverhandlerimpl(int buffersize) {
        this(buffersize, null);
    }
    public serverhandlerimpl(string localcharset) {
        this(-1, localcharset);
    }
    public serverhandlerimpl(int buffersize, string localcharset) {
        this.buffersize = buffersize > 0 ? buffersize : this.buffersize;
        this.localcharset = localcharset == null ? this.localcharset : localcharset;
    }
    @override
    public void handleaccept(selectionkey selectionkey) throws ioexception {
        //获取channel
        socketchannel socketchannel = ((serversocketchannel) selectionkey.channel()).accept();
        //非阻塞
        socketchannel.configureblocking(false);
        //注册selector
        socketchannel.register(selectionkey.selector(), selectionkey.op_read, bytebuffer.allocate(buffersize));
        system.out.println("建立请求......");
    }
    @override
    public string handleread(selectionkey selectionkey) throws ioexception {
        socketchannel socketchannel = (socketchannel) selectionkey.channel();
        bytebuffer buffer = (bytebuffer) selectionkey.attachment();
        string receivedstr = "";
        if (socketchannel.read(buffer) == -1) {
            //没读到内容关闭
            socketchannel.shutdownoutput();
            socketchannel.shutdowninput();
            socketchannel.close();
            system.out.println("连接断开......");
        } else {
            //将channel改为读取状态
            buffer.flip();
            //按照编码读取数据
            receivedstr = charset.forname(localcharset).newdecoder().decode(buffer).tostring();
            buffer.clear();
            //返回数据给客户端
            buffer = buffer.put(("received string : " + receivedstr).getbytes(localcharset));
            //读取模式
            buffer.flip();
            socketchannel.write(buffer);
            //注册selector 继续读取数据
            socketchannel.register(selectionkey.selector(), selectionkey.op_read, bytebuffer.allocate(buffersize));
        }
        return receivedstr;
    }
}
//服务端server类
/**
 * description:
 *
 * @author wkgui
 */
public class niosocketserver {
    private volatile byte flag = 1;
    public void setflag(byte flag) {
        this.flag = flag;
    }
    public void start() {
        //创建serversocketchannel,监听8888端口
        try (serversocketchannel serversocketchannel = serversocketchannel.open()) {
            serversocketchannel.socket().bind(new inetsocketaddress(8888));
            //设置为非阻塞模式
            serversocketchannel.configureblocking(false);
            //为serverchannel注册selector
            selector selector = selector.open();
            serversocketchannel.register(selector, selectionkey.op_accept);
            system.out.println("服务端开始工作:");
            //创建消息处理器
            serverhandlerbs handler = new serverhandlerimpl(1024);
            while (flag == 1) {
                selector.select();
                system.out.println("开始处理请求 : ");
                //获取selectionkeys并处理
                iterator<selectionkey> keyiterator = selector.selectedkeys().iterator();
                while (keyiterator.hasnext()) {
                    selectionkey key = keyiterator.next();
                    try {
                        //连接请求
                        if (key.isacceptable()) {
                            handler.handleaccept(key);
                        }
                        //读请求
                        if (key.isreadable()) {
                            system.out.println(handler.handleread(key));
                        }
                    } catch (ioexception e) {
                        e.printstacktrace();
                    }
                    //处理完后移除当前使用的key
                    keyiterator.remove();
                }
                system.out.println("完成请求处理。");
            }
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }
}
//server端启动类
/**
 * description:
 *
 * @author wkgui
 */
public class servermain {
    public static void main(string[] args) {
        niosocketserver server = new niosocketserver();
        new thread(() -> {
            try {
                thread.sleep(10*60*1000);
            } catch (interruptedexception e) {
                e.printstacktrace();
            }finally {
                server.setflag((byte) 0);
            }
        }).start();
        server.start();
    }
}
//客户端client类
/**
 * description:
 *
 * @author wkgui
 */
public class niosocketclient {
    public void start() {
        try (socketchannel socketchannel = socketchannel.open()) {
            //连接服务端socket
            socketaddress socketaddress = new inetsocketaddress("localhost", 8888);
            socketchannel.connect(socketaddress);
            int sendcount = 0;
            bytebuffer buffer = bytebuffer.allocate(1024);
            //这里最好使用selector处理   这里只是为了写的简单
            while (sendcount < 10) {
                buffer.clear();
                //向服务端发送消息
                buffer.put(("current time : " + system.currenttimemillis()).getbytes());
                //读取模式
                buffer.flip();
                socketchannel.write(buffer);
                buffer.clear();
                //从服务端读取消息
                int readlenth = socketchannel.read(buffer);
                //读取模式
                buffer.flip();
                byte[] bytes = new byte[readlenth];
                buffer.get(bytes);
                system.out.println(new string(bytes, "utf-8"));
                buffer.clear();
                sendcount++;
                try {
                    thread.sleep(1000);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
            }
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }
}
//client启动类
/**
 * description:
 *
 * @author wkgui
 */
public class clientmain {
    public static void main(string[] args) {
        new niosocketclient().start();
    }
}

java nio 实现 websocket 协议

websocket协议

websocket是一种在单个tcp连接上进行全双工通信的协议。 websocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在websocket api中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

websocket协议相比于http协议来说,最大的特点就是可以实现服务端主动向客户端发送消息。在websocket出现之前,如果客户端想实时获取服务端的消息,就需要使用ajax轮询,查询是否有消息,这样就很消耗服务器资源和带宽。但是用websocket就可以实现服务端主动向客户端发送数据,并且只需要占用一个tcp连接,节省了资源和带宽。

java Nio使用NioSocket客户端与服务端交互实现方式

websocket连接建立过程

为了建立一个websocket连接,客户端浏览器首先要向服务器发起一个http请求,这个请求和通常的http请求不同,包含了一些附加的头信息,其中附加头信息“upgrade: websocket” 表明这是一个申请协议升级的http请求。服务器端解析这些附加的信息头,然后生成应答消息返回给客户端,客户端和服务端的websocket连接就建立了。之后就可以使用websocket协议的格式来双向发送消息。

建立连接时发送的http请求头:

java Nio使用NioSocket客户端与服务端交互实现方式

返回的http响应头:

java Nio使用NioSocket客户端与服务端交互实现方式

在响应头中的 sec-websocket-accept 时通过sec-websocket-key构造出来的。首先在sec-websocket-key后接上一个258eafa5-e914-47da-95ca-c5ab0dc85b11,然后再进行sha1摘要得到160位数据在,在使用base64进行编码,最后得到的就是sec-websocket-accept。

websocket数据发送过程

websocket数据发送的帧格式如下所示:

java Nio使用NioSocket客户端与服务端交互实现方式

fin - 1bit

在数据发送的过程中,可能会分片发送,fin表示是否为最后一个分片。如果发生了分片,则1表示时最后一个分片;不能再分片的情况下,这个标志总是为1。

rsv1 rsv2 rsv3 - 1bit each

用于扩展,不使用扩展时需要为全0;非零时通信双方必须协商好扩展。这里我们用不上。

opcode - 4bits

用于表示所传送数据的类型,也就是payload中的数据。

数值 含义
0x0 附加数据帧
0x1 文本数据帧
0x2 二进制数据帧
0x3-0x7 保留
0x8 关闭连接帧
0x9 ping帧
0xa pong帧
0xb-0xf 保留

mask - 1bit

用于表示payload是否被进行了掩码运算,1表示使用掩码,0表示不使用掩码。从客户端发送向服务端的数据帧必须使用掩码。

payload length 7 bits,7+16 bits or 7+64 bits

用于表示payload的长度,有以下三种情况:

payload length 表示的大小 payload的长度
0 - 125 payload length 大小
126 之后的2个字节表示的无符号整数
127 之后的8个字节表示的无符号整数

masking-key - 0 or 4 bytes

32 bit长的掩码,如果mask为1,则帧中就存在这一个字段,在解析payload时,需要进行使用32长掩码进行异或操作,之后才能得到正确结果。

java nio 实现

利用java nio 来实现一个聊天室。部分代码如下。

nio的常规代码:

selector.select(1000);
set<selectionkey> selectionkeys = selector.selectedkeys();
iterator<selectionkey> it = selectionkeys.iterator();
while (it.hasnext()) {
    selectionkey key = it.next();
    it.remove();
    if (key.isacceptable()) {
        handleaccept(key);
    }
    if (key.isreadable()) {
        handleread(key);
    }
}

接受连接:

public void handleaccept(selectionkey key) {
    serversocketchannel ssc = (serversocketchannel) key.channel();
    socketchannel sc;
    try {
        sc = ssc.accept();
        sc.configureblocking(false);
        sc.register(selector, selectionkey.op_read);
        system.out.println(string.format("[server] -- client %s connected.", sc.getremoteaddress().tostring()));
    } catch (ioexception e) {
        system.out.println(string.format("[server] -- error occur when accept: %s.", e.getmessage()));
        key.cancel();
    }
}

读取通道中的数据:

public void handleread(selectionkey key) {
    socketchannel sc = (socketchannel) key.channel();
    client client = (client) key.attachment();
    bytebuffer bytebuffer = bytebuffer.allocate(1024);
    // 如果是第一次连接进来,就需要创建一个客户端对象,存储起来
    if (client == null) {
        client = new client(sc);
        clients.add(client);
        key.attach(client);
        bytebuffer.clear();
        // 如果连接还没有建立,就是要http建立连接
        try {
            sc.read(bytebuffer);
            bytebuffer.flip();
            string response = websockethandler.getresponse(new string(bytebuffer.array()));
            bytebuffer.clear();
            bytebuffer.put(response.getbytes());
            bytebuffer.flip();
            while (bytebuffer.hasremaining()) {
                sc.write(bytebuffer);
            }
        } catch (ioexception e) {
            system.out.println(string.format("[server] -- error occur when read: %s.", e.getmessage()));
        }
        string message = "[系统消息] " + client.tostring() + " 加入了群聊";
        broadcast(message.getbytes(), client);
    }
    bytebuffer.clear();
    int read = 0;
    try {
        read = sc.read(bytebuffer);
        if (read > 0) {
            bytebuffer.flip();
            int opcode = bytebuffer.get() & 0x0f;
            // 8表示客户端关闭了连接
            if (opcode == 8) {
                system.out.println(string.format("[server] -- client %s connection close.", sc.getremoteaddress()));
                clients.remove(client);
                string message = "[系统消息] " + client.tostring() + " 退出了群聊";
                broadcast(message.getbytes(), client);
                sc.close();
                key.cancel();
                return;
            }
   // 只考虑了最简单的payload长度情况。
            int len = bytebuffer.get();
            len &= 0x7f;
            byte[] mask = new byte[4];
            bytebuffer.get(mask);
            byte[] payload = new byte[len];
            bytebuffer.get(payload);
            for (int i = 0; i < payload.length; i++) {
                payload[i] ^= mask[i % 4];
            }
            system.out.println(string
                    .format("[server] -- client: [%s], send: [%s].", client.tostring(), new string(payload)));
            string message = string.format("[%s]: %s", client.tostring(), new string(payload));
            broadcast(message.getbytes(), client);
        } else if (read == -1) {
            system.out.println(string.format("[server] -- client %s connection close.", sc.getremoteaddress()));
            clients.remove(client);
            string message = "[系统消息] " + client.tostring() + " 退出了群聊";
            broadcast(message.getbytes(), client);
            sc.close();
            key.cancel();
        }
    } catch (ioexception e) {
        system.out.println(string.format("[server] -- error occur when read: %s.", e.getmessage()));
    }
}

使用http建立websocket连接。

public class websockethandler {
    private static string append_string = "258eafa5-e914-47da-95ca-c5ab0dc85b11";
    static class header {
        private map<string, string> properties = new hashmap<>();
        public string get(string key) {
            return properties.get(key);
        }
    }
    private websockethandler() {}
    private static header phrase(string request) {
        header header = new header();
        string[] pros = request.split("\r\n");
        for (string pro : pros) {
            if (pro.contains(":")) {
                int index = pro.indexof(":");
                string key = pro.substring(0, index).trim();
                string value = pro.substring(index + 1).trim();
                header.properties.put(key, value);
            }
        }
        return header;
    }
    public static string getresponse(string request) {
        header header = phrase(request);
        string acceptkey = header.get("sec-websocket-key") + append_string;
        messagedigest sha1;
        try {
            sha1 = messagedigest.getinstance("sha1");
            sha1.update(acceptkey.getbytes());
            acceptkey = new string(base64.getencoder().encode(sha1.digest()));
        } catch (nosuchalgorithmexception e) {
            system.out.println("fail to encode " + e.getmessage());
            return null;
        }
        stringbuilder stringbuilder = new stringbuilder();
        stringbuilder.append("http/1.1 101 switching protocols\r\n").append("upgrade: websocket\r\n")
                     .append("connection: upgrade\r\n").append("sec-websocket-accept: " + acceptkey + "\r\n")
                     .append("\r\n");
        return stringbuilder.tostring();
    }
}

客户端对象

/**
 * @author xinhui chen
 * @date 2020/2/8 19:20
 */
public class client {
    private socketchannel socketchannel = null;
    private string id = null;
    public socketchannel getsocketchannel() {
        return socketchannel;
    }
    public string getid() {
        return id;
    }
    client(socketchannel socketchannel) {
        this.socketchannel = socketchannel;
        this.id = uuid.randomuuid().tostring();
    }
    @override
    public string tostring() {
        try {
            return id + " " + socketchannel.getremoteaddress().tostring();
        } catch (ioexception e) {
            system.out.println(e.getmessage());
            return null;
        }
    }
}

结果

使用网页和控制台与服务端建立websocket连接,发送数据。两个都能成功显示。

java Nio使用NioSocket客户端与服务端交互实现方式

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。