欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JAVA非阻塞网络通信NIO关键代码

程序员文章站 2022-05-18 13:21:04
...
	//--------------------------------------------------------------
	// 输入输入对象
	private ByteBuffer r_buff =  ByteBuffer.allocate(1024);
	private ByteBuffer w_buff =  ByteBuffer.allocate(1024);
	
	public   byte[]  networkForServer(byte[] toServerData) throws IOException{
		
	    byte[] result = new  byte[]{}; 
	    SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);    
        InetSocketAddress s = new InetSocketAddress("125.46.49.88",9000);
        channel.connect(s);
        
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        
    //    Charset charset=Charset.forName("GBK");
          
        boolean isFinished = false;
        int exec = 0;  // 执行此时
        while(!isFinished){
            int n = selector.select();
            if(n==0){
            	continue;
            }
            Iterator it = selector.selectedKeys().iterator();
            while(it.hasNext()){
            	exec++;
            	SelectionKey skey = (SelectionKey)it.next();
            	
                debug("0"," count = " + exec +"  -----  accept -->"+ skey.isAcceptable() +"   connect --> " +skey.isConnectable() 
        		 	      +"  read --> "+ skey.isReadable() +" valid --> " + skey.isValid() + " write --> " + skey.isWritable());
            	  
            	if(skey.isConnectable()){ // 是否连接
            		   SocketChannel sc = (SocketChannel) skey.channel();
                       sc.configureBlocking(false);
                       sc.finishConnect();
                       sc.register(selector, SelectionKey.OP_READ);  
                       debug("1","connect");
            	}
            	// 根据nio的注册状态,执行操作
            	if(skey.isReadable()){ 
            		debug("2","read");
            		readData(skey,selector);   
            	} 
            	if(skey.isWritable()){
            		debug("3","write");
            		ByteBuffer bbuffer = ByteBuffer.wrap(toServerData);
            		writeData(skey,bbuffer,selector); 
            	} 
            	
            	if(exec==6){ // 第二次读取完了之后此时获取的是最后数据
            			// 关闭连接
            			skey.cancel();
            			channel.close(); 
            			isFinished = true; // 终止执行
            	 }
            	it.remove();            	
            } 
            
            
           /* if(num>0){
                Set<SelectionKey> keys = selector.selectedKeys();
                a++;
              	 
                for(SelectionKey k:keys){
                	
                 System.out.println(" ---- " + a +"----  accept -->"+ k.isAcceptable() +"   connect --> " + k.isConnectable() 
        		 	      +"  read --> "+ k.isReadable() +" valid --> " + k.isValid() + " write --> " + k.isWritable());
                    
               if(k.isConnectable()){
                        SocketChannel sc = (SocketChannel) k.channel();
                        sc.configureBlocking(false);
                        sc.finishConnect();
                        sc.register(selector, SelectionKey.OP_READ);      
                        System.out.println(" --> 1" ); 
                        
                        
                        ByteBuffer bb = ByteBuffer.wrap(toServerData);
                        sc.write(bb);
                         
                   } else if (k.isReadable()) {
                        ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
                        SocketChannel sc = (SocketChannel) k.channel(); 
                        //---------------------------  
                         int len = 0; 
                         len =  sc.read(echoBuffer);
                         echoBuffer.flip(); 
                         result = echoBuffer.array(); 
                         System.out.println("echo server return:data len -->"+ echoBuffer.limit() +"   len = " + len); 
                         getResponseHeader(result); 
                         // +charset.decode(echoBuffer).toString());
                          echoBuffer.clear();   
                          
                          sc.finishConnect();
                          k.cancel();
                          sc.close(); 
                   }
                }
            }   */   
        } 
        return result; 
   }
	
	 
	public  void readData(SelectionKey k,Selector selector) throws IOException{
		  int count = 0; 
          SocketChannel sc = (SocketChannel) k.channel(); 
          r_buff.clear();
          while((count=sc.read(r_buff))>0){ //循环读取r_buff
        	  //确保r_buff可读
        	  r_buff.flip();  
        	  // 把数据拿出来并显示 
              byte[] result = r_buff.array(); 
              System.out.println("echo server return:data len -->"+ r_buff.limit() +"   len = " + count); 
              getResponseHeader(result);  
        	  r_buff.clear();
          }    
          sc.register(selector, SelectionKey.OP_WRITE);  
	}
	public  void writeData(SelectionKey k,ByteBuffer bbuffer,Selector selector) throws IOException{ 
          SocketChannel sc = (SocketChannel) k.channel();  
      	  w_buff.clear();
      	  w_buff.put(bbuffer);
      	  w_buff.flip();
      	  echo2Server(sc);
      	  w_buff.clear(); 
      	  sc.register(selector, SelectionKey.OP_READ); 
	}
	// 向服务端发数据
	public void echo2Server(SocketChannel sc) throws IOException {
		 while(w_buff.hasRemaining()){
			 sc.write(w_buff); 
		 }
	}
	
    public void debug(String msgId,String message){
    	  System.out.println("debug("+ msgId+") --> " + message ); 
    }
	

 

注意:此段代码是循环执行,用的时候需要修改循环参数。