重温Java NIO Socket
程序员文章站
2022-04-30 11:29:45
...
好多年都没用Java里面的NIO写Socket应用了,Mina等框架封装了太多东西,现在重新写个NIO Socket的小例子回顾下。其实NIO写正确还是挺不容易的,太多的东西要注意,这个例子太过简单,要想在生产中使用还有更多的东西要注意,比如读和写由于是非阻塞的,每次操作了多少数据是没保证的,读的数据要进行累积拼接,业务逻辑应在线程池中处理等等。。。
public class TestNioServer { private Selector selector; private List<SocketChannel> channels = new ArrayList<SocketChannel>(); private final byte[] resp; ByteBuffer readBuffer = ByteBuffer.allocate(1024); public TestNioServer(int port) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server is listening at : " + port); StringBuilder sb = new StringBuilder(); for (int i=0; i<1000; i++) { sb.append("how are you? "); } resp = sb.toString().getBytes(); } public void work() throws IOException { while(true) { selector.select(); //在此处阻塞 Set<SelectionKey> keys = selector.selectedKeys(); /* 此处不可放到另外的线程中处理,原因至少有两个 1. selector.select()阻塞时刚刚accept的连接是无法进行注册的, 注册行为会因select()阻塞,具体请看register方法的说明。 2. 如果读或者写事件没被及时处理会导致无数读或者写触发事件*/ handleSelectionKey(keys, false); } } private void handleSelectionKey(Set<SelectionKey> keys, boolean tryAnsyc) throws IOException { if (keys == null) { System.out.println("keys == null"); return; } Iterator<SelectionKey> itr = keys.iterator(); while (itr.hasNext()) { SelectionKey key = itr.next(); itr.remove(); System.out.println(key); if (key.isAcceptable()) { System.out.println("isAcceptable"); //处理接入的连接 SocketChannel ch = ((ServerSocketChannel)key.channel()).accept(); //此处与read和wirte处不同 ch.configureBlocking(false);//默认是阻塞的 channels.add(ch); //如果想异步地执行handleSelectionKey,可以使用wakeup来解除selector.select()阻塞,否则后面的注册会阻塞 if (tryAnsyc) selector.wakeup(); ch.register(selector, SelectionKey.OP_READ); } //一个SelectionKey可以同时有多个状态,因此此处不可用else if (key.isReadable()) { System.out.println("isReadable"); //处理读数据,每个channel的数据需要进行累积,业务逻辑应在另外的线程中处理 SocketChannel socketChannel = (SocketChannel)key.channel(); int len = socketChannel.read(readBuffer);//此处非阻塞,读到多少数据没任何保证 readBuffer.flip(); byte[] b = new byte[readBuffer.limit()]; readBuffer.get(b); System.out.println("len : " + len + " : " + new String(b)); readBuffer.clear(); //回点数据 socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ, ByteBuffer.wrap(resp)); } if (key.isWritable()) { System.out.println("isWritable"); //处理写数据,也可以放在另外的线程中 SocketChannel socketChannel = (SocketChannel)key.channel(); ByteBuffer buf = (ByteBuffer)key.attachment(); while (buf.hasRemaining()) { int writtenLen = socketChannel.write(buf);//非阻塞,每次写多少不能保证 System.out.println("writtenLen=" + writtenLen); } //改成注册读事件,否则会有无数写事件被触发,因为只要是IO处于可写状态就会触发 socketChannel.register(selector, SelectionKey.OP_READ); } if (key.isConnectable()) { //客户端用的,此处ignore } } } public void broadcast() throws ClosedChannelException { for (SocketChannel ch : channels) { ch.register(selector, SelectionKey.OP_WRITE, ByteBuffer.wrap(resp)); //如果selector.select()处在阻塞状态,那么新的register将不会影响到它, //因此需要在此处将其唤醒,以使register生效 selector.wakeup(); } } public static void main(String[] args) throws IOException, InterruptedException { int port = 1234; final TestNioServer server = new TestNioServer(port); new Thread() { public void run() { try { server.work(); } catch (IOException e) { e.printStackTrace(); } } }.start(); Thread.sleep(15000); System.out.println("想要广播?"); server.broadcast(); } }
客户端为了简单,使用的是阻塞模式:
public class TestSocketClient { public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { int port = 1234; Socket socket = new Socket(); socket.connect(new InetSocketAddress(InetAddress.getByName("localhost"), port)); final InputStream is = socket.getInputStream(); OutputStream os = socket.getOutputStream(); new Thread () { public void run() { for (;;) { byte[] b = new byte[1024]; try { is.read(b); } catch (IOException e) { e.printStackTrace(); break; } System.out.println(new String(b)); } } }.start(); for (int i=0; i<3; i++) { os.write(new byte[]{65}); os.flush(); System.out.println("written"); Thread.sleep(1000); } } }
上一篇: ehcache-1.4-beta2 发布
下一篇: oracle之日期函数