互联网技术18——socket编程之NIO
NIO同步非阻塞
NIO是同步非阻塞的,在传统的TCP点对点直接连接的基础上做了一层封装,并不是Client与Server直接建立连接,而是Client先到Server端进行管道注册。在Server端创建一个Selector多路复用器,启动一个线程轮询注册到Selector上的所有Channerl的状态,根据通道的状态,执行相关操作。通道的状态包括: Connect连接状态、Accept阻塞状态、Read可读状态、Write可写状态。
NIO编程3个重要部分:Buffer缓冲区、Channel管道、Selecter多路复用器。
Buffer缓冲区:本质上是可以存储数据的内存,被封装成了Buffer对象而已。
( 本部分借鉴文章地址:https://www.cnblogs.com/tengpan-cn/p/5809273.html )
缓冲区类型:
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
常用方法
- allocate() 分配一块缓冲区
- put() 向缓冲区写数据
- get() 从缓冲区读数据, 其中get()方法会将position加1,get(index)不会改变position的大小,
- filp() 将缓冲区从写模式切换到读模式
- clear() 将缓冲区从读模式切换到写模式,不会清空数据,但是后续写数据会覆盖元数据,即使有数据没被读,也会被遗忘
- compat() 从读数据切换到写模式,数据不会清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据,
- mark() 对position做出标记,配合reset使用
- reset() 将position置为标记值
- duplicate() 复制一分buffer缓冲区
缓冲区的属性
- capacity 缓冲区的大小,无论是读写哪种模式,属性不会变
- position 写数据时:position表示当前写的位置,每写一个数据,会向下移动一个数据单元。初始值为0,最大值为capacity-1,读数据时:position会被置0,表示当前读的位置
- limit 写模式下,limit相当capacity,表示最多能写多少数据,切换到读模式时,limit等于原先写操作时的position,表示最多可以读多少数据
BufferTest.java
package com.socketNio;
import com.DisruptorLiuBianXing.Main;
import java.nio.IntBuffer;
/**
* Created by BaiTianShi on 2018/8/30.
*/
public class bufferTest {
public static void main(String[] args) {
//allocate 分配一块空间
IntBuffer intBuffer = IntBuffer.allocate(10);
intBuffer.put(1);
intBuffer.put(2);
intBuffer.put(3);
intBuffer.put(4);
intBuffer.put(5);
//写模式下,position记录的是当前写入的位置
System.out.println("intBuffer"+intBuffer);
//切换到读模式,将position置0
intBuffer.flip();
System.out.println("intBuffer执行flip后"+intBuffer);
System.out.println("pos"+intBuffer.position());
System.out.println("lim"+intBuffer.limit());
System.out.println("cap"+intBuffer.capacity());
//执行get(index)不会改变position 的大小
System.out.println("执行intBuffer.get(3)"+intBuffer.get(3));
System.out.println("执行intBuffer.get(3)后"+intBuffer);
//执行put(index,value)不会改变position 的大小
intBuffer.put(2,33);
System.out.println("intBuffer执行put(2,33)后"+intBuffer);
//执行get()方法,无索引参数,会改变position大小
for (int i = 0; i < intBuffer.limit(); i++) {
System.out.print(intBuffer.get()+"\t");
}
System.out.println();
System.out.println("intBuffer使用循环遍历之后"+intBuffer);
System.out.println("pos"+intBuffer.position());
System.out.println("lim"+intBuffer.limit());
System.out.println("cap"+intBuffer.capacity());
System.out.println("--------warp包裹数组---------");
//通过warp包裹数组声明的缓冲区,改变缓冲区中的内容会同时改变原数组中的内容
int[] array = new int[]{6,7,8,9,10};
IntBuffer warpBuffer = IntBuffer.wrap(array);
System.out.println("warpBuffer"+warpBuffer);
for (int i = 0; i < warpBuffer.limit(); i++) {
System.out.print(warpBuffer.get()+"\t");
}
System.out.println();
System.out.println("wrapIntBuffer使用for循环遍历之后: " + warpBuffer);
warpBuffer.flip();
warpBuffer.put(2,88);
System.out.println("pos"+warpBuffer.position());
System.out.println("lim"+warpBuffer.limit());
System.out.println("cap"+warpBuffer.capacity());
System.out.println("使用for循环遍历");
for (int i = 0; i < warpBuffer.limit(); i++) {
System.out.print(warpBuffer.get()+"\t");
}
System.out.println();
System.out.println("被warp包裹的array内容发生了改变");
for (int i = 0; i < array.length ; i++) {
System.out.print(array[i]+"\t");
}
//复制方法
System.out.println();
System.out.println("------复制方法------");
IntBuffer intBufferOne = IntBuffer.allocate(10);
intBufferOne.put(array);
System.out.println("intBufferOne:"+intBufferOne);
intBufferOne.flip();
System.out.println("使用for循环遍历");
for (int i = 0; i < intBufferOne.limit(); i++) {
System.out.print(intBufferOne.get()+"\t");
}
IntBuffer intBufferTwo = intBufferOne.duplicate();
System.out.println();
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可读数据为:" + intBufferTwo.remaining());//limit - position
intBufferTwo.position(2);
System.out.println("intBufferTwo:"+intBufferTwo);
System.out.println("可读数据为:" + intBufferTwo.remaining());//limit - position
}
}
Channel通道:
1. 网络数据通过Channel通道读取和写入,通与流的不同之处在于通道是双向的,而流只能在一个方向上移动,用到可以读、写或同时进行。channerl通道可以与多路复用器结合起来,有多重状态位,方便多路福通气识别并执行操作。
2. channel分为两大类,一类是网络读写的selectableChannel,一类是用于文件操作的FileChannel。SocketChannel和SocketServerChannel都是SelectableChannerl的子类。
selector多路复用器:
它是NIO编程的基础,提供选择已经就绪任务的能力,当io事件(管道)注册到选择器后,seletor会分配给每个管道一个key值。seletor会不断的轮询注册在其上的通道channel,如果某个通道发生了读写操作,这个管道就处于就绪状态,会被selevtor轮询出来,然后通过selectionKey可以取得就绪的channel集合,从而进行io操作。宇哥selector可以辅助成千上万的channel通道,jdk使用epoll代替了传统的select实现,使得获取连接句柄没有限制,只需要一个线程负责selector轮询,就可以接入成千上万的客户端。
下面代码中,在server类的构造方法中,创建ServerSocketChannel对象,将该对象注册到多路复用器Sekector上,并处于阻塞ACCEPT状态。由于server实现了runnable接口,在run方法中while(true)循环,在while循环体中,不论客户端channel还是服务器channel,都在selector轮询范围内。在轮询过程中,获取所有注册到多路复用器selector上的key,在这个while首次执行的时候,获取到的处于阻塞状态的channel一定是我们启动服务端是注册的服务端管道ScoketServerChannel,这个服务端channel执行accept方法,监听处于就绪状态的客户端channel,将客户端channel通道注册到多路复用器select上,并监听器读标识位。在存在的客户端channel注册到selector的情况下,在while循环体重,如客户端key处于key.isReadable()为true是,就会执行read方法,结合byteBUffer的一系列方法,将channel中的数据读取到缓冲区中。
每一次从客户端发送数据,到服务端获取到数据,需要两个发现的过程,第一次:发现是服务端的accept方法监听服务端连接并准备就绪事件,发现后将此客户端在多路复用器selector上的状态标识位变为可读,即:发现并标识可读,第二次:当while()进行下一次循环的时候,再拿到这个channel时已经被标识为可读的了,这样因为其实可读状态,就可以直接读取了,即:读可读的channel;服务端可通信端原理相同,
综合使用Buffer、Channel、Selector的Client端与Server端双向通信示例
Server.java
package com.socketNio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* Created by BaiTianShi on 2018/8/31.
*
*/
public class Server implements Runnable{
//多路复用器,管理所有的管道
private Selector selector;
//建立读缓冲区,大小为1024
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//建立写缓存区
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port) {
try {
//打开多路复用器
this.selector = Selector.open();
//打开服务通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置服务器为非阻塞模式
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(port));
//把服务器通道注册到多路复用器上,并且监听阻塞事件
ssc.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("server start port:"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true){
try {
//多路复用器开始监听
this.selector.select();
//返回多路复用器已经选择的结果,
System.out.println("轮序已经就绪的管道");
//t通过注释掉137行,跟踪断点可证明,当多路复用器selector中有channel状态改变时,才会被这个selectedKeys方法获取到
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
System.out.println("发现就绪的channel管道");
while (keys.hasNext()){
//获取一个选择元素
SelectionKey key = keys.next();
//直接从容器中移除
keys.remove();
//如果是有效的
if(key.isValid()){
//如果是阻塞的状态,一定是最开始注册的服务端通道
if(key.isAcceptable()){
this.accept(key);
}
//如果是可读状态
if(key.isReadable()){
this.read(key);
}
//如果是可写状态
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
//向socketChannel中写数据
private void write(SelectionKey key){
SocketChannel sc = (SocketChannel) key.channel();
//定义一个字节数据
byte[] bytes = new byte[1024];
try {
//使用系统录入功能,等待用户输入数据并回车
System.in.read(bytes);
//把系统录入的数据防盗缓冲区
writeBuf.put(bytes);
//对缓冲区进行复位
writeBuf.flip();
//将缓冲区的数据写给client端
sc.write(writeBuf);
//清空缓冲区
writeBuf.clear();
//因以执行了想socketChannel的写操作,这里想select注册sc通道的读事件状态,
sc.register(selector,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//读socketChannel中的数据
private void read(SelectionKey key){
try {
//清空缓冲区
this.readBuf.clear();
//通过key获取之前注册的通道
SocketChannel sc = (SocketChannel) key.channel();
//读取数据到缓冲区
int count = sc.read(this.readBuf);
//如果没有数据
if(count == -1){
//关闭
key.channel().close();
//取消此key绑定的socketChannel的注册
key.cancel();
return;
}
//有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
/*Flips this buffer. The limit is set to the current position and then
the position is set to zero. If the mark is defined then it is discarded.*/
this.readBuf.flip();
//根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用数据个数
//接收缓冲区数据到字节数组
this.readBuf.get(bytes);
//打印结果
String body = new String(bytes).trim();
System.out.println("服务器端接收到客户端发送的信息 : " + body);
//因已经执行了向SocketChannel的读操作,这里向selector注册sc通道的写事件状态
sc.register(this.selector,SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
//如果是阻塞的状态,一定是最开始注册的服务端通道
private void accept(SelectionKey key){
try {
//最开始注册的服务端通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//这是方法,直到客户端有通道注册,才能返回客户端的通道
SocketChannel sc = ssc.accept();
//将客户端设置为阻塞模式
sc.configureBlocking(false);
//将客户端通道注册搭配多路复用器上,并设置为读取标识
sc.register(this.selector,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread( new Server(8765)).start();
}
}
在Server.java中,因为仅仅ServerSocketChannel对象在Selector上注册了SelectionKey.OP_ACCEPT事件状态,因此Server端创建的一个线程,在轮询Selector过程中,获取处于就绪状态的所有Channel通道的集合。Selector分配给ServerSocketChannel对象的唯一key,这个key.isAcceptable()为true则执行accept(key)方法,使这个key对应的服务器端Channel一直处于accept监听状态。
Client.java
package com.socketNio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* Created by BaiTianShi on 2018/9/1.
*/
public class Client implements Runnable {
//多路复用器,管理所有通道
private Selector selector;
//写缓冲区
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
//读缓冲区
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//建立连接地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8765);
public Client() {
try {
//打开多路复用器
this.selector = Selector.open();
//打开客户端通道
SocketChannel sc = SocketChannel.open();
//客户端通道为非阻塞模式
sc.configureBlocking(false);
//注册到多路复用器selector上,给sc注册connect事件状态
sc.register(selector, SelectionKey.OP_CONNECT);
//进行连接
sc.connect(address);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
SocketChannel socketChannel;
while (true){
try {
//要让多路复用器开始监听
this.selector.select();
//返回多路复用器已经选择的结果
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
//遍历
while (keys.hasNext()){
//获取一个选择的元素
SelectionKey key = keys.next();
//直接容器中移除
keys.remove();
//如果是有效的
if(key.isValid()){
//如果为连接状态,
if(key.isConnectable()){
System.out.println("client connect");
socketChannel =(SocketChannel)key.channel();
/*Returns:
true if, and only if, a connection operation has been initiated on
this channel but not yet completed by invoking the finishConnect method*/
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
System.out.println("客户端完成连接操作!");
//把数据放到缓冲区中
writeBuf.put("Hello,Server".getBytes());
//对缓冲区进行复位
writeBuf.flip();
//写出数据给Server端
socketChannel.write(writeBuf);
//清空写缓冲区
writeBuf.clear();
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
//如果为可读状态
if(key.isReadable()){
this.read(key);
}
//如果为可写状态
if(key.isWritable()){
this.write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
try {
SocketChannel sc = (SocketChannel) key.channel();
byte[] bytes = new byte[1024];
System.in.read(bytes);
//把数据放到缓冲区中
writeBuf.put(bytes);
//对缓冲区进行复位
writeBuf.flip();
//写出数据给Server端
sc.write(writeBuf);
//清空缓冲区数据
writeBuf.clear();
sc.register(this.selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//清空缓冲区旧的数据
this.readBuf.clear();
//获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//读取数据到缓冲区
int count = sc.read(this.readBuf);
//如果没有数据
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用数据个数
//接收缓冲区数据到字节数组
this.readBuf.get(bytes);
// 打印结果
String body = new String(bytes).trim();
System.out.println("客户端接收到服务器端返回的信息 : " + body);
sc.register(this.selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
Client client=new Client();
new Thread(client).start();//单独启动一个线程,去轮询注册到多路复用器上的所有通道
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动server端
启动client端后server端控制台,第一遍轮序为发现通过accept的服务端发现就绪的客户端,并置为可读状态。第二遍轮询为重新拿到刚刚置为可读的客户端读取数据,并置为可写。第三遍循环为拿到刚刚可写的客户端,然后准备写数据(待键盘输入)
客户端控制台:当服务端没有回复数据的时候,一直阻塞在 this.selector.select();
服务端手动输入数据,绿色的输入内容我写错了,应该是服务端,
客户端手动输入数据