Netty5 AIO
程序员文章站
2022-05-04 11:02:18
...
工程结构图:
TimeServer.java文件内容如下:
AsyncTimeServerHandler.java文件内容如下:
AcceptCompletionHandler.java文件内容如下:
ReadCompletionHandler.java文件内容如下:
TimeClient.java文件内容如下:
AsyncTimeClientHandler.java文件内容如下:
运行截图:
testnetty5.rar是源代码
TimeServer.java文件内容如下:
package com.shihuan.netty.server; public class TimeServer { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }
AsyncTimeServerHandler.java文件内容如下:
package com.shihuan.netty.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel .open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); } @Override public void run() { latch = new CountDownLatch(1); doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
AcceptCompletionHandler.java文件内容如下:
package com.shihuan.netty.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
ReadCompletionHandler.java文件内容如下:
package com.shihuan.netty.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) { this.channel = channel; } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 如果没有发送完成,继续发送 if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
TimeClient.java文件内容如下:
package com.shihuan.netty.client; public class TimeClient { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start(); } }
AsyncTimeClientHandler.java文件内容如下:
package com.shihuan.netty.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }
运行截图:
testnetty5.rar是源代码
上一篇: JDK1.7 AIO
下一篇: Firefly 3.0.4 正式版发布