欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java 之异步套接字编程实例(AIO)

程序员文章站 2022-05-30 23:20:42
...
本文重点以“淘宝邓悟”中学习理解整理而来。更改了客户端为swing应用程序,并增加了服务端与客户端之间相互向对方发信息的功能。为了便 于阅读,用自已观察总结性的理解,进行了啰嗦的注解。
http://blog.sina.com.cn/s/blog_71ad0d3f01019y1c.html


   异步socket编程,一样分成客户端与服务端。
   AsynchronousServerSocketChannel  -------服务端socket;
   AsynchronousSocketChannel------客户端socket.
   AsynchronousChannelGroup-----socket管理器。服务端socket与客户端socket都由它生成。它的管理需要线程池。它的工作方式之一是把必要的资源交给客户端与服务端的处理器,并调用该处理器进行工作。
   ExecutorService-----线程池。是socket管理器需要的东西。
   CompletionHandler-------处理器。它有两个泛型参数A、V。这是个回调函数所用的标准接口。Socket管理器 会把相关实参放到这个A,V的参数中,让用户处理后,然后调用这个处理器的方法进行执行。如果用户有一个方法中的参数的类型是该处理器,那么在其他地方再次调用这个方法,尽管方法不同,但是传给该方法的CompletionHandler的处理器的A、V参数 却是不相同的,不仅是值不同,类型也有可能完全不同。这是学习中的难点。

   练习中总结:除了服务端与客户端初始化时差别很大,但是在各自与对方通信中,所使用的类都是 客户端socket类。

  下面的例子,展示了异步方式的服务端与客户端相互通信的例子。客户端是swing程序。
   调试使用方法:先运行服务器,再运行客服端。在客户端上的文本框中输入字符点“点我”按钮,立即通过0号套接字向服务器发送信息。在调试台中,可看到服务器与客户端的通信情况。
下面是我测试时服务端的信息
引用
debug:
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1606
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1607
收到/127.0.0.1:1606的消息:0
收到/127.0.0.1:1607的消息:1
收到/127.0.0.1:1606的消息:sd
收到/127.0.0.1:1606的消息:1111111111

  
下面是我测试时客户端的信息
引用

debug:
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:0
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:sd
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1111111111


例子说明:
  服务端与客户端各有四个类。
服务端:
  AioTcpServer---服务器主类。它与客户端通信由回调连接处理器AioAcceptHandler完成。
  AioAcceptHandler-----连接处理器。处理服务器与客户端的通信。具体的读操作,回调AioReadHandler处理器
  AioReadHandler-----读处理器。处理连接处理器交给的 读任务。即具体的读客户端的信息由它完成。如果服务器要回应该客户端,需要回调AioWriteHandler写处理器。
  AioWriteHandler----写处理器。完成读处理器交给的任务,即向客户端回应消息。

客户端:
   AioTcpClient-------客户端主类。它与服务器的通讯由回调AioConnectHandler连接处理器完成。具体写信息由客户端socket回调AioSendHandler完成。具体读信息由客户端socket回调AioReadHandler完成。
   AioConnectHandler-------连接处理器。完成与服务器的通讯。具体的读信息由客户端socket回调读处理器AioReadHandler完成,即完成读取服务器的信息。
   AioReadHandler-------读处理器,完成读取服务端信息。
   AioSendHandler----写处理器,向服务端发送信息。

1.9.3.1 服务端
AioTcpServer.java
import java.net.InetSocketAddress; 
import java.nio.channels.AsynchronousChannelGroup; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
/**
 *     AIO异步socket通讯,分成 用于服务端的socekt与用于客户端的socket,当然这两者都是<br>
 * 异步的。两者使用时,都用到了同样的异步通道管理器,异步通道管理器通过线程池管理。<br>
 *    异步通道管理器,可以生成服务端socket与客户端socket。 * 
 *    使用服务端socket或客户端socket都需要一个操作处理器(CompletionHandler),<br>
 *当有信息时异步通道管理器会把 相关信息传递给操作作处理器。 * 
 *    操作处理器的方法是同一方法,但方法的参数是泛型,随着调用它的方法不同而改变。<br> * 
 *    在AIO中,CompletionHandler这个操作处理器方法,是个泛型接口,当回调函数用。<br>
 * 使用CompletionHandler的方法,约定是把该方法前一个参数实例传递给A型参数<br>
 * (attachment),CompletionHandler的另一个参数将是存有该方法的使用情况的实例。
 * 
 */
public class AioTcpServer implements Runnable { 
    private AsynchronousChannelGroup asyncChannelGroup;  
    private AsynchronousServerSocketChannel listener;  
  
    public AioTcpServer(int port) throws Exception { 
        //创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(20); 
        //异步通道管理器
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); 
        //创建 用在服务端的异步Socket.以下简称服务器socket。
        //异步通道管理器,会把服务端所用到的相关参数
        listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).
                bind(new InetSocketAddress(port)); 
    } 
 
    public void run() { 
        try { 

            AioAcceptHandler acceptHandler = new AioAcceptHandler();
            //为服务端socket指定接收操作对象.accept原型是:
            //accept(A attachment, CompletionHandler<AsynchronousSocketChannel,
            // ? super A> handler)
            //也就是这里的CompletionHandler的A型参数是实际调用accept方法的第一个参数
            //即是listener。另一个参数V,就是原型中的客户端socket
            listener.accept(listener, new AioAcceptHandler());  
            Thread.sleep(400000); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } finally { 
            System.out.println("finished server");
        } 
    } 
 
    public static void main(String... args) throws Exception { 
        AioTcpServer server = new AioTcpServer(9008); 
        new Thread(server).start(); 
    } 
}

  
AioAcceptHandler.java
import client.AioSendHandler;
import java.io.IOException; 
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
 
//这里的参数受实际调用它的函数决定。本例是服务端socket.accetp调用决定
public class AioAcceptHandler implements CompletionHandler
        <AsynchronousSocketChannel, AsynchronousServerSocketChannel > 
{ 
    private  AsynchronousSocketChannel socket;
    @Override
    public void completed(AsynchronousSocketChannel socket, 
        AsynchronousServerSocketChannel attachment) 
    { //注意第一个是客户端socket,第二个是服户端socket
        try { 
            System.out.println("AioAcceptHandler.completed called");
            attachment.accept(attachment, this); 
            System.out.println("有客户端连接:" +
                socket.getRemoteAddress().toString()
            ); 
            startRead(socket);    
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) 
    { 
        exc.printStackTrace(); 
    } 
    
    //不是CompletionHandler的方法
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        //read的原型是
        //read(ByteBuffer dst, A attachment,
        //    CompletionHandler<Integer,? super A> handler) 
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。
        // V型是存有read的连接情况的参数
        AioReadHandler rd=new AioReadHandler(socket);
        socket.read(clientBuffer, clientBuffer, rd); 
        try {             
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 
 
}


AioReadHandler.java
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.util.logging.Level;
import java.util.logging.Logger;

//这里的参数型号,受调用它的函数决定。这里是受客户端socket.read调用
public class AioReadHandler implements CompletionHandler
        <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
    public  String msg;
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    }     
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();  
    
    @Override
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                msg=decoder.decode(buf).toString();
                System.out.println("收到" + 
                        socket.getRemoteAddress().toString() + "的消息:" + msg
                ); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
            try {
                write(socket);
            } catch (UnsupportedEncodingException ex) {
                Logger.getLogger(AioReadHandler.class.getName()).log(Level.SEVERE, null, ex);
            }
        } else if (i == -1) { 
            try { 
                System.out.println("客户端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
         System.out.println("cancelled"); 
    }
 
     //不是CompletionHandler的方法
    public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{
        String sendString="服务器回应,你输出的是:"+msg;
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));        
        socket.write(clientBuffer, clientBuffer, new AioWriteHandler(socket));
    }
}


AioWriteHandler.java

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AioWriteHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioWriteHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 

    @Override
    public void completed(Integer i, ByteBuffer buf) {
        if (i > 0) { 
            socket.write(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
        
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("cancelled"); 
    }
    
}


1.9.3.2 客户端
AioTcpClient.java

import java.awt.BorderLayout;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JTextField;

public class AioTcpClient {
    public static JTextField jt=new JTextField();
    public static ConcurrentHashMap<String,AsynchronousSocketChannel>
            sockets =new ConcurrentHashMap<>();
    static AioTcpClient me;
    
    private AsynchronousChannelGroup asyncChannelGroup;
    public AioTcpClient() throws Exception {
        //创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(20);
        //创建异眇通道管理器   
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
    }
    
    private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
    
    public void start(final String ip, final int port) throws Exception {
        // 启动20000个并发连接,使用20个线程的池子
        for (int i = 0; i < 2; i++) {
            try {
                //客户端socket.当然它是异步方式的。
                AsynchronousSocketChannel connector = null;
                if (connector == null || !connector.isOpen()) {
                    //从异步通道管理器处得到客户端socket
                    connector = AsynchronousSocketChannel.open(asyncChannelGroup);
                    sockets.putIfAbsent(String.valueOf(i), connector);
                    
                    connector.setOption(StandardSocketOptions.TCP_NODELAY,
                        true);
                    connector.setOption(StandardSocketOptions.SO_REUSEADDR,
                        true);
                    connector.setOption(StandardSocketOptions.SO_KEEPALIVE,
                        true);
                    //开始连接服务器。这里的的connect原型是
                    // connect(SocketAddress remote, A attachment, 
                    //  CompletionHandler<Void,? super A> handler)
                    // 也就是它的CompletionHandler 的A型参数是由这里的调用方法
                    //的第二个参数决定。即是connector。客户端连接器。
                    // V型为null
                    connector.connect(new InetSocketAddress(ip, port),
                            connector, new AioConnectHandler(i));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public void work() throws Exception{
        AioTcpClient client = new AioTcpClient();
        client.start("localhost", 9008);
    }

    public void send() throws UnsupportedEncodingException{
        AsynchronousSocketChannel socket=sockets.get("0");
        String sendString=jt.getText();
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));        
        socket.write(clientBuffer, clientBuffer, new AioSendHandler(socket));
    }
    public   void createPanel() {
        me=this;
        JFrame f = new JFrame("Wallpaper");
        f.getContentPane().setLayout(new BorderLayout());       
        
        JPanel p=new JPanel(new FlowLayout(FlowLayout.LEFT));        
        JButton bt=new JButton("点我");
        p.add(bt);
        me=this;
        bt.addActionListener(new ActionListener(){

            @Override
            public void actionPerformed(ActionEvent e) {
               try {
                    me.send();
                 
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        
        });
        
        bt=new JButton("结束");
        p.add(bt);
        me=this;
        bt.addActionListener(new ActionListener(){
            @Override
            public void actionPerformed(ActionEvent e) {                 
            }
        
        });
 
        f.getContentPane().add(jt,BorderLayout.CENTER);
        f.getContentPane().add(p, BorderLayout.EAST);
        
        f.setSize(450, 300);
        f.setDefaultCloseOperation (JFrame.EXIT_ON_CLOSE);
        f.setLocationRelativeTo (null);
        f.setVisible (true);
    }
      
    public static void main(String[] args) {
        javax.swing.SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {     
                AioTcpClient d = null;
                try {
                    d = new AioTcpClient();
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
                
                d.createPanel();
                try {
                    d.work();
                } catch (Exception ex) {
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);
                }
               
                 
            }
        });
    } 
}


AioConnectHandler.java
import java.util.concurrent.*;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AioConnectHandler implements CompletionHandler
    <Void,AsynchronousSocketChannel>
{
    private Integer content = 0;
    
    public AioConnectHandler(Integer value){
        this.content = value;
    }
 
    @Override
    public void completed(Void attachment,AsynchronousSocketChannel connector) { 
        try {  
         connector.write(ByteBuffer.wrap(String.valueOf(content).getBytes())).get();
         startRead(connector); 
        } catch (ExecutionException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException ep) { 
            ep.printStackTrace(); 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) { 
        exc.printStackTrace(); 
    } 
    
    //这不是 CompletionHandler接口的方法。
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        //read的原型是
        //read(ByteBuffer dst, A attachment,
        //    CompletionHandler<Integer,? super A> handler) 
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。
        // V型是存有read的连接情况的参数
        socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); 
        try { 
            
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    }
 
}


AioReadHandler.java
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
 
public class AioReadHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 
 
    public void cancelled(ByteBuffer attachment) { 
        System.out.println("cancelled"); 
    } 
 
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); 
 
    @Override
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    @Override
    public void failed(Throwable exc, ByteBuffer buf) { 
        System.out.println(exc); 
    } 

     
}


AioSendHandler.java(与服务端的写相同)
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;


public class AioSendHandler implements CompletionHandler
    <Integer,ByteBuffer>
{ 
    private AsynchronousSocketChannel socket; 
 
    public AioSendHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 

    @Override
    public void completed(Integer i, ByteBuffer buf) {
        if (i > 0) { 
            socket.write(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
        
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("cancelled"); 
    }
    
}

  • src.rar (8.3 KB)
  • 下载次数: 212