NIO-Pipe示例
程序员文章站
2022-07-13 17:03:12
...
PipeImpl解析:http://donald-draper.iteye.com/blog/2373628
前面看了SocketServerChannel,SocketChannel和DatagramChannel,从今天开始我们来看管道,先从一个实例开始:
//主程序(管道)
//Sink通道
//Source通道
启动主程序管道,控制台输出:
=========The sink is start!===========
send message to source is done...
=========The source is start!===========
message come from sink:Hello source!
前面看了SocketServerChannel,SocketChannel和DatagramChannel,从今天开始我们来看管道,先从一个实例开始:
//主程序(管道)
package nio.pipe; import java.io.IOException; import java.nio.channels.Pipe; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * PipeDemo * @author donald * 2017年4月13日 * 上午9:27:12 */ public class PipeDemo { public static void main(String[] args) { // 创建一个管道 Pipe pipe = null; try { pipe = Pipe.open(); } catch (IOException e) { e.printStackTrace(); } ExecutorService exec = Executors.newFixedThreadPool(2); exec.submit(new PipeSink(pipe.sink())); exec.submit(new PipeSource(pipe.source())); } }
//Sink通道
package nio.pipe; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Pipe; /** * SinkChannel * @author donald * 2017年4月13日 * 上午9:26:49 */ public class PipeSink implements Runnable { private Pipe.SinkChannel sinkChannel; public PipeSink(Pipe.SinkChannel sinkChannel) { this.sinkChannel = sinkChannel; } /** * */ @Override public void run() { System.out.println("=========The sink is start!==========="); try { sinkChannel.write(ByteBuffer.wrap(new String("Hello source!").getBytes("UTF-8"))); System.out.println("send message to source is done..."); } catch (IOException e) { e.printStackTrace(); } } }
//Source通道
package nio.pipe; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Pipe; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; /** * SourceChannel * @author donald * 2017年4月13日 * 上午8:56:17 */ public class PipeSource implements Runnable { private Selector selector; private Pipe.SourceChannel sourceChannel; public PipeSource(Pipe.SourceChannel sourceChannel) { this.sourceChannel = sourceChannel; try { init(); } catch (IOException e) { e.printStackTrace(); } } private void init() throws IOException{ sourceChannel.configureBlocking(false); this.selector = Selector.open(); sourceChannel.register(selector, SelectionKey.OP_READ); } @SuppressWarnings("rawtypes") @Override public void run() { System.out.println("=========The source is start!==========="); try{ while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if (key.isReadable()) read(key); } } }catch (IOException e) { e.printStackTrace(); } } /** * * @param key * @throws IOException */ private void read(SelectionKey key) throws IOException{ Pipe.SourceChannel channel = (Pipe.SourceChannel) key.channel(); ByteBuffer buf = ByteBuffer.allocate(100); channel.read(buf); byte[] data = buf.array(); String msg = new String(data,"UTF-8").trim(); System.out.println("message come from sink:"+msg); } }
启动主程序管道,控制台输出:
=========The sink is start!===========
send message to source is done...
=========The source is start!===========
message come from sink:Hello source!
上一篇: Kafka是什么