JAVA非阻塞网络通信NIO关键代码
程序员文章站
2022-05-18 13:21:04
...
//-------------------------------------------------------------- // 输入输入对象 private ByteBuffer r_buff = ByteBuffer.allocate(1024); private ByteBuffer w_buff = ByteBuffer.allocate(1024); public byte[] networkForServer(byte[] toServerData) throws IOException{ byte[] result = new byte[]{}; SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); InetSocketAddress s = new InetSocketAddress("125.46.49.88",9000); channel.connect(s); Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ|SelectionKey.OP_WRITE); // Charset charset=Charset.forName("GBK"); boolean isFinished = false; int exec = 0; // 执行此时 while(!isFinished){ int n = selector.select(); if(n==0){ continue; } Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ exec++; SelectionKey skey = (SelectionKey)it.next(); debug("0"," count = " + exec +" ----- accept -->"+ skey.isAcceptable() +" connect --> " +skey.isConnectable() +" read --> "+ skey.isReadable() +" valid --> " + skey.isValid() + " write --> " + skey.isWritable()); if(skey.isConnectable()){ // 是否连接 SocketChannel sc = (SocketChannel) skey.channel(); sc.configureBlocking(false); sc.finishConnect(); sc.register(selector, SelectionKey.OP_READ); debug("1","connect"); } // 根据nio的注册状态,执行操作 if(skey.isReadable()){ debug("2","read"); readData(skey,selector); } if(skey.isWritable()){ debug("3","write"); ByteBuffer bbuffer = ByteBuffer.wrap(toServerData); writeData(skey,bbuffer,selector); } if(exec==6){ // 第二次读取完了之后此时获取的是最后数据 // 关闭连接 skey.cancel(); channel.close(); isFinished = true; // 终止执行 } it.remove(); } /* if(num>0){ Set<SelectionKey> keys = selector.selectedKeys(); a++; for(SelectionKey k:keys){ System.out.println(" ---- " + a +"---- accept -->"+ k.isAcceptable() +" connect --> " + k.isConnectable() +" read --> "+ k.isReadable() +" valid --> " + k.isValid() + " write --> " + k.isWritable()); if(k.isConnectable()){ SocketChannel sc = (SocketChannel) k.channel(); sc.configureBlocking(false); sc.finishConnect(); sc.register(selector, SelectionKey.OP_READ); System.out.println(" --> 1" ); ByteBuffer bb = ByteBuffer.wrap(toServerData); sc.write(bb); } else if (k.isReadable()) { ByteBuffer echoBuffer = ByteBuffer.allocate(1024); SocketChannel sc = (SocketChannel) k.channel(); //--------------------------- int len = 0; len = sc.read(echoBuffer); echoBuffer.flip(); result = echoBuffer.array(); System.out.println("echo server return:data len -->"+ echoBuffer.limit() +" len = " + len); getResponseHeader(result); // +charset.decode(echoBuffer).toString()); echoBuffer.clear(); sc.finishConnect(); k.cancel(); sc.close(); } } } */ } return result; } public void readData(SelectionKey k,Selector selector) throws IOException{ int count = 0; SocketChannel sc = (SocketChannel) k.channel(); r_buff.clear(); while((count=sc.read(r_buff))>0){ //循环读取r_buff //确保r_buff可读 r_buff.flip(); // 把数据拿出来并显示 byte[] result = r_buff.array(); System.out.println("echo server return:data len -->"+ r_buff.limit() +" len = " + count); getResponseHeader(result); r_buff.clear(); } sc.register(selector, SelectionKey.OP_WRITE); } public void writeData(SelectionKey k,ByteBuffer bbuffer,Selector selector) throws IOException{ SocketChannel sc = (SocketChannel) k.channel(); w_buff.clear(); w_buff.put(bbuffer); w_buff.flip(); echo2Server(sc); w_buff.clear(); sc.register(selector, SelectionKey.OP_READ); } // 向服务端发数据 public void echo2Server(SocketChannel sc) throws IOException { while(w_buff.hasRemaining()){ sc.write(w_buff); } } public void debug(String msgId,String message){ System.out.println("debug("+ msgId+") --> " + message ); }
注意:此段代码是循环执行,用的时候需要修改循环参数。