java 之异步套接字编程实例(AIO)
程序员文章站
2022-05-30 23:14:04
...
本文重点以“淘宝邓悟”中学习理解整理而来。更改了客户端为swing应用程序,并增加了服务端与客户端之间相互向对方发信息的功能。为了便 于阅读,用自已观察总结性的理解,进行了啰嗦的注解。
http://blog.sina.com.cn/s/blog_71ad0d3f01019y1c.html
异步socket编程,一样分成客户端与服务端。
AsynchronousServerSocketChannel -------服务端socket;
AsynchronousSocketChannel------客户端socket.
AsynchronousChannelGroup-----socket管理器。服务端socket与客户端socket都由它生成。它的管理需要线程池。它的工作方式之一是把必要的资源交给客户端与服务端的处理器,并调用该处理器进行工作。
ExecutorService-----线程池。是socket管理器需要的东西。
CompletionHandler-------处理器。它有两个泛型参数A、V。这是个回调函数所用的标准接口。Socket管理器 会把相关实参放到这个A,V的参数中,让用户处理后,然后调用这个处理器的方法进行执行。如果用户有一个方法中的参数的类型是该处理器,那么在其他地方再次调用这个方法,尽管方法不同,但是传给该方法的CompletionHandler的处理器的A、V参数 却是不相同的,不仅是值不同,类型也有可能完全不同。这是学习中的难点。
练习中总结:除了服务端与客户端初始化时差别很大,但是在各自与对方通信中,所使用的类都是 客户端socket类。
下面的例子,展示了异步方式的服务端与客户端相互通信的例子。客户端是swing程序。
调试使用方法:先运行服务器,再运行客服端。在客户端上的文本框中输入字符点“点我”按钮,立即通过0号套接字向服务器发送信息。在调试台中,可看到服务器与客户端的通信情况。
下面是我测试时服务端的信息
下面是我测试时客户端的信息
debug:
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:0
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:sd
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1111111111
例子说明:
服务端与客户端各有四个类。
服务端:
AioTcpServer---服务器主类。它与客户端通信由回调连接处理器AioAcceptHandler完成。
AioAcceptHandler-----连接处理器。处理服务器与客户端的通信。具体的读操作,回调AioReadHandler处理器
AioReadHandler-----读处理器。处理连接处理器交给的 读任务。即具体的读客户端的信息由它完成。如果服务器要回应该客户端,需要回调AioWriteHandler写处理器。
AioWriteHandler----写处理器。完成读处理器交给的任务,即向客户端回应消息。
客户端:
AioTcpClient-------客户端主类。它与服务器的通讯由回调AioConnectHandler连接处理器完成。具体写信息由客户端socket回调AioSendHandler完成。具体读信息由客户端socket回调AioReadHandler完成。
AioConnectHandler-------连接处理器。完成与服务器的通讯。具体的读信息由客户端socket回调读处理器AioReadHandler完成,即完成读取服务器的信息。
AioReadHandler-------读处理器,完成读取服务端信息。
AioSendHandler----写处理器,向服务端发送信息。
1.9.3.1 服务端
AioTcpServer.java
AioAcceptHandler.java
AioReadHandler.java
AioWriteHandler.java
1.9.3.2 客户端
AioTcpClient.java
AioConnectHandler.java
AioReadHandler.java
AioSendHandler.java(与服务端的写相同)
http://blog.sina.com.cn/s/blog_71ad0d3f01019y1c.html
异步socket编程,一样分成客户端与服务端。
AsynchronousServerSocketChannel -------服务端socket;
AsynchronousSocketChannel------客户端socket.
AsynchronousChannelGroup-----socket管理器。服务端socket与客户端socket都由它生成。它的管理需要线程池。它的工作方式之一是把必要的资源交给客户端与服务端的处理器,并调用该处理器进行工作。
ExecutorService-----线程池。是socket管理器需要的东西。
CompletionHandler-------处理器。它有两个泛型参数A、V。这是个回调函数所用的标准接口。Socket管理器 会把相关实参放到这个A,V的参数中,让用户处理后,然后调用这个处理器的方法进行执行。如果用户有一个方法中的参数的类型是该处理器,那么在其他地方再次调用这个方法,尽管方法不同,但是传给该方法的CompletionHandler的处理器的A、V参数 却是不相同的,不仅是值不同,类型也有可能完全不同。这是学习中的难点。
练习中总结:除了服务端与客户端初始化时差别很大,但是在各自与对方通信中,所使用的类都是 客户端socket类。
下面的例子,展示了异步方式的服务端与客户端相互通信的例子。客户端是swing程序。
调试使用方法:先运行服务器,再运行客服端。在客户端上的文本框中输入字符点“点我”按钮,立即通过0号套接字向服务器发送信息。在调试台中,可看到服务器与客户端的通信情况。
下面是我测试时服务端的信息
引用
debug:
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1606
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1607
收到/127.0.0.1:1606的消息:0
收到/127.0.0.1:1607的消息:1
收到/127.0.0.1:1606的消息:sd
收到/127.0.0.1:1606的消息:1111111111
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1606
AioAcceptHandler.completed called
有客户端连接:/127.0.0.1:1607
收到/127.0.0.1:1606的消息:0
收到/127.0.0.1:1607的消息:1
收到/127.0.0.1:1606的消息:sd
收到/127.0.0.1:1606的消息:1111111111
下面是我测试时客户端的信息
引用
debug:
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:0
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:sd
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1111111111
例子说明:
服务端与客户端各有四个类。
服务端:
AioTcpServer---服务器主类。它与客户端通信由回调连接处理器AioAcceptHandler完成。
AioAcceptHandler-----连接处理器。处理服务器与客户端的通信。具体的读操作,回调AioReadHandler处理器
AioReadHandler-----读处理器。处理连接处理器交给的 读任务。即具体的读客户端的信息由它完成。如果服务器要回应该客户端,需要回调AioWriteHandler写处理器。
AioWriteHandler----写处理器。完成读处理器交给的任务,即向客户端回应消息。
客户端:
AioTcpClient-------客户端主类。它与服务器的通讯由回调AioConnectHandler连接处理器完成。具体写信息由客户端socket回调AioSendHandler完成。具体读信息由客户端socket回调AioReadHandler完成。
AioConnectHandler-------连接处理器。完成与服务器的通讯。具体的读信息由客户端socket回调读处理器AioReadHandler完成,即完成读取服务器的信息。
AioReadHandler-------读处理器,完成读取服务端信息。
AioSendHandler----写处理器,向服务端发送信息。
1.9.3.1 服务端
AioTcpServer.java
import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * AIO异步socket通讯,分成 用于服务端的socekt与用于客户端的socket,当然这两者都是<br> * 异步的。两者使用时,都用到了同样的异步通道管理器,异步通道管理器通过线程池管理。<br> * 异步通道管理器,可以生成服务端socket与客户端socket。 * * 使用服务端socket或客户端socket都需要一个操作处理器(CompletionHandler),<br> *当有信息时异步通道管理器会把 相关信息传递给操作作处理器。 * * 操作处理器的方法是同一方法,但方法的参数是泛型,随着调用它的方法不同而改变。<br> * * 在AIO中,CompletionHandler这个操作处理器方法,是个泛型接口,当回调函数用。<br> * 使用CompletionHandler的方法,约定是把该方法前一个参数实例传递给A型参数<br> * (attachment),CompletionHandler的另一个参数将是存有该方法的使用情况的实例。 * */ public class AioTcpServer implements Runnable { private AsynchronousChannelGroup asyncChannelGroup; private AsynchronousServerSocketChannel listener; public AioTcpServer(int port) throws Exception { //创建线程池 ExecutorService executor = Executors.newFixedThreadPool(20); //异步通道管理器 asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); //创建 用在服务端的异步Socket.以下简称服务器socket。 //异步通道管理器,会把服务端所用到的相关参数 listener = AsynchronousServerSocketChannel.open(asyncChannelGroup). bind(new InetSocketAddress(port)); } public void run() { try { AioAcceptHandler acceptHandler = new AioAcceptHandler(); //为服务端socket指定接收操作对象.accept原型是: //accept(A attachment, CompletionHandler<AsynchronousSocketChannel, // ? super A> handler) //也就是这里的CompletionHandler的A型参数是实际调用accept方法的第一个参数 //即是listener。另一个参数V,就是原型中的客户端socket listener.accept(listener, new AioAcceptHandler()); Thread.sleep(400000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("finished server"); } } public static void main(String... args) throws Exception { AioTcpServer server = new AioTcpServer(9008); new Thread(server).start(); } }
AioAcceptHandler.java
import client.AioSendHandler; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; //这里的参数受实际调用它的函数决定。本例是服务端socket.accetp调用决定 public class AioAcceptHandler implements CompletionHandler <AsynchronousSocketChannel, AsynchronousServerSocketChannel > { private AsynchronousSocketChannel socket; @Override public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel attachment) { //注意第一个是客户端socket,第二个是服户端socket try { System.out.println("AioAcceptHandler.completed called"); attachment.accept(attachment, this); System.out.println("有客户端连接:" + socket.getRemoteAddress().toString() ); startRead(socket); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { exc.printStackTrace(); } //不是CompletionHandler的方法 public void startRead(AsynchronousSocketChannel socket) { ByteBuffer clientBuffer = ByteBuffer.allocate(1024); //read的原型是 //read(ByteBuffer dst, A attachment, // CompletionHandler<Integer,? super A> handler) //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。 // V型是存有read的连接情况的参数 AioReadHandler rd=new AioReadHandler(socket); socket.read(clientBuffer, clientBuffer, rd); try { } catch (Exception e) { e.printStackTrace(); } } }
AioReadHandler.java
import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.logging.Level; import java.util.logging.Logger; //这里的参数型号,受调用它的函数决定。这里是受客户端socket.read调用 public class AioReadHandler implements CompletionHandler <Integer,ByteBuffer> { private AsynchronousSocketChannel socket; public String msg; public AioReadHandler(AsynchronousSocketChannel socket) { this.socket = socket; } private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); @Override public void completed(Integer i, ByteBuffer buf) { if (i > 0) { buf.flip(); try { msg=decoder.decode(buf).toString(); System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + msg ); buf.compact(); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } socket.read(buf, buf, this); try { write(socket); } catch (UnsupportedEncodingException ex) { Logger.getLogger(AioReadHandler.class.getName()).log(Level.SEVERE, null, ex); } } else if (i == -1) { try { System.out.println("客户端断线:" + socket.getRemoteAddress().toString()); buf = null; } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("cancelled"); } //不是CompletionHandler的方法 public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{ String sendString="服务器回应,你输出的是:"+msg; ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8")); socket.write(clientBuffer, clientBuffer, new AioWriteHandler(socket)); } }
AioWriteHandler.java
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AioWriteHandler implements CompletionHandler <Integer,ByteBuffer> { private AsynchronousSocketChannel socket; public AioWriteHandler(AsynchronousSocketChannel socket) { this.socket = socket; } @Override public void completed(Integer i, ByteBuffer buf) { if (i > 0) { socket.write(buf, buf, this); } else if (i == -1) { try { System.out.println("对端断线:" + socket.getRemoteAddress().toString()); buf = null; } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("cancelled"); } }
1.9.3.2 客户端
AioTcpClient.java
import java.awt.BorderLayout; import java.awt.FlowLayout; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JTextField; public class AioTcpClient { public static JTextField jt=new JTextField(); public static ConcurrentHashMap<String,AsynchronousSocketChannel> sockets =new ConcurrentHashMap<>(); static AioTcpClient me; private AsynchronousChannelGroup asyncChannelGroup; public AioTcpClient() throws Exception { //创建线程池 ExecutorService executor = Executors.newFixedThreadPool(20); //创建异眇通道管理器 asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); } private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); public void start(final String ip, final int port) throws Exception { // 启动20000个并发连接,使用20个线程的池子 for (int i = 0; i < 2; i++) { try { //客户端socket.当然它是异步方式的。 AsynchronousSocketChannel connector = null; if (connector == null || !connector.isOpen()) { //从异步通道管理器处得到客户端socket connector = AsynchronousSocketChannel.open(asyncChannelGroup); sockets.putIfAbsent(String.valueOf(i), connector); connector.setOption(StandardSocketOptions.TCP_NODELAY, true); connector.setOption(StandardSocketOptions.SO_REUSEADDR, true); connector.setOption(StandardSocketOptions.SO_KEEPALIVE, true); //开始连接服务器。这里的的connect原型是 // connect(SocketAddress remote, A attachment, // CompletionHandler<Void,? super A> handler) // 也就是它的CompletionHandler 的A型参数是由这里的调用方法 //的第二个参数决定。即是connector。客户端连接器。 // V型为null connector.connect(new InetSocketAddress(ip, port), connector, new AioConnectHandler(i)); } } catch (IOException e) { e.printStackTrace(); } } } public void work() throws Exception{ AioTcpClient client = new AioTcpClient(); client.start("localhost", 9008); } public void send() throws UnsupportedEncodingException{ AsynchronousSocketChannel socket=sockets.get("0"); String sendString=jt.getText(); ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8")); socket.write(clientBuffer, clientBuffer, new AioSendHandler(socket)); } public void createPanel() { me=this; JFrame f = new JFrame("Wallpaper"); f.getContentPane().setLayout(new BorderLayout()); JPanel p=new JPanel(new FlowLayout(FlowLayout.LEFT)); JButton bt=new JButton("点我"); p.add(bt); me=this; bt.addActionListener(new ActionListener(){ @Override public void actionPerformed(ActionEvent e) { try { me.send(); } catch (Exception ex) { Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex); } } }); bt=new JButton("结束"); p.add(bt); me=this; bt.addActionListener(new ActionListener(){ @Override public void actionPerformed(ActionEvent e) { } }); f.getContentPane().add(jt,BorderLayout.CENTER); f.getContentPane().add(p, BorderLayout.EAST); f.setSize(450, 300); f.setDefaultCloseOperation (JFrame.EXIT_ON_CLOSE); f.setLocationRelativeTo (null); f.setVisible (true); } public static void main(String[] args) { javax.swing.SwingUtilities.invokeLater(new Runnable() { @Override public void run() { AioTcpClient d = null; try { d = new AioTcpClient(); } catch (Exception ex) { Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex); } d.createPanel(); try { d.work(); } catch (Exception ex) { Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex); } } }); } }
AioConnectHandler.java
import java.util.concurrent.*; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AioConnectHandler implements CompletionHandler <Void,AsynchronousSocketChannel> { private Integer content = 0; public AioConnectHandler(Integer value){ this.content = value; } @Override public void completed(Void attachment,AsynchronousSocketChannel connector) { try { connector.write(ByteBuffer.wrap(String.valueOf(content).getBytes())).get(); startRead(connector); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException ep) { ep.printStackTrace(); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { exc.printStackTrace(); } //这不是 CompletionHandler接口的方法。 public void startRead(AsynchronousSocketChannel socket) { ByteBuffer clientBuffer = ByteBuffer.allocate(1024); //read的原型是 //read(ByteBuffer dst, A attachment, // CompletionHandler<Integer,? super A> handler) //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。 // V型是存有read的连接情况的参数 socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); try { } catch (Exception e) { e.printStackTrace(); } } }
AioReadHandler.java
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class AioReadHandler implements CompletionHandler <Integer,ByteBuffer> { private AsynchronousSocketChannel socket; public AioReadHandler(AsynchronousSocketChannel socket) { this.socket = socket; } public void cancelled(ByteBuffer attachment) { System.out.println("cancelled"); } private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); @Override public void completed(Integer i, ByteBuffer buf) { if (i > 0) { buf.flip(); try { System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); buf.compact(); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } socket.read(buf, buf, this); } else if (i == -1) { try { System.out.println("对端断线:" + socket.getRemoteAddress().toString()); buf = null; } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer buf) { System.out.println(exc); } }
AioSendHandler.java(与服务端的写相同)
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; public class AioSendHandler implements CompletionHandler <Integer,ByteBuffer> { private AsynchronousSocketChannel socket; public AioSendHandler(AsynchronousSocketChannel socket) { this.socket = socket; } @Override public void completed(Integer i, ByteBuffer buf) { if (i > 0) { socket.write(buf, buf, this); } else if (i == -1) { try { System.out.println("对端断线:" + socket.getRemoteAddress().toString()); buf = null; } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("cancelled"); } }