一个NIO例子
程序员文章站
2022-04-24 10:50:05
...
例子完成的功能是,客户端发送命令time到服务端,服务端返回当前时间给客户端。
服务端逻辑代码
public class MultiplexerTimeServerHandler implements Runnable {
private Selector selector = null;
private ServerSocketChannel servChannel = null;
private int port;
private boolean stop;
/**
* 初始化多路复用器,绑定监听端口
*
* @param port
*/
public MultiplexerTimeServerHandler(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
//非阻塞
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
//在通道上注册selector,感兴趣事件是OP_ACCEPT
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器监听" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
public void run() {
while (!stop) {
//循环监听就绪时间
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//多路复用器关闭后,所有注册的channel和pipe等资源都会自动去注册并关闭
// ,所有不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//处理事件
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuff = ByteBuffer.allocate(1024);
//上面设置了这里是非阻塞的
int read = sc.read(readBuff);
if (read > 0) {
readBuff.flip();
byte[] bytes = new byte[readBuff.remaining()];
readBuff.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("服务收到命令:" + body);
String currentTime = "time".equals(body) ? new Date(System.currentTimeMillis()).toString() : "无效命令";
doWrite(sc, currentTime);
} else if (read < 0) {
//对端链路关闭
key.cancel();
sc.close();
} else {
//读到0字节
}
}
}
}
private void doWrite(SocketChannel sc, String content) throws IOException {
if (content != null && content.trim().length() > 0) {
byte[] bytes = content.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
//可能存在写半包情况
sc.write(byteBuffer);
}
}
}
服务端启动代码
public class TimeServer {
public static void main(String... args) throws IOException {
int port = 8888;
MultiplexerTimeServerHandler server = new MultiplexerTimeServerHandler(port);
new Thread(server,"nio-MultiplexerTimeServerHandler-001").start();
}
}
客户端逻辑代码
public class TimeClientHandler implements Runnable {
private SocketChannel socketChannel;
private int port;
private Selector selector;
private String host;
private boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void run() {
try {
doConnect();//连接
} catch (IOException e) {
e.printStackTrace();
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
//在通道上注册感兴趣事件为读
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);//发送命令
} else {
//连接失败,退出
System.exit(1);
}
}
if (key.isReadable()) {//读取消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read = sc.read(readBuffer);
if (read > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("现在时间为:" + body);
this.stop = true;
} else if (read < 0) {
//对端链路关闭
key.cancel();
sc.close();
} else {
//读到0字节
}
}
}
}
private void doConnect() throws IOException {
//如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel socketChannel) throws IOException {
byte[] bytes = "time".getBytes();
ByteBuffer writeBuff = ByteBuffer.allocate(bytes.length);
writeBuff.put(bytes);
writeBuff.flip();
socketChannel.write(writeBuff);
if (!writeBuff.hasRemaining()) {
System.out.println("客户端发送命令成功");
}
}
}
客户端启动代码
public class TimeClient {
public static void main(String... args) throws IOException {
int port = 8888;
new Thread(new TimeClientHandler("127.0.0.1",port),"TimeClientHandler-001").start();
}
}
上一篇: Java NIO学习(一)NIO相关概念