NIO-Pipe示例
程序员文章站
2022-04-24 14:24:30
...
PipeImpl解析:[url]http://donald-draper.iteye.com/blog/2373628[/url]
前面看了SocketServerChannel,SocketChannel和DatagramChannel,从今天开始我们来看管道,先从一个实例开始:
//主程序(管道)
//Sink通道
//Source通道
启动主程序管道,控制台输出:
=========The sink is start!===========
send message to source is done...
=========The source is start!===========
message come from sink:Hello source!
前面看了SocketServerChannel,SocketChannel和DatagramChannel,从今天开始我们来看管道,先从一个实例开始:
//主程序(管道)
package nio.pipe;
import java.io.IOException;
import java.nio.channels.Pipe;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* PipeDemo
* @author donald
* 2017年4月13日
* 上午9:27:12
*/
public class PipeDemo {
public static void main(String[] args) {
// 创建一个管道
Pipe pipe = null;
try {
pipe = Pipe.open();
} catch (IOException e) {
e.printStackTrace();
}
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.submit(new PipeSink(pipe.sink()));
exec.submit(new PipeSource(pipe.source()));
}
}
//Sink通道
package nio.pipe;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
/**
* SinkChannel
* @author donald
* 2017年4月13日
* 上午9:26:49
*/
public class PipeSink implements Runnable {
private Pipe.SinkChannel sinkChannel;
public PipeSink(Pipe.SinkChannel sinkChannel) {
this.sinkChannel = sinkChannel;
}
/**
*
*/
@Override
public void run() {
System.out.println("=========The sink is start!===========");
try {
sinkChannel.write(ByteBuffer.wrap(new String("Hello source!").getBytes("UTF-8")));
System.out.println("send message to source is done...");
} catch (IOException e) {
e.printStackTrace();
}
}
}
//Source通道
package nio.pipe;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
/**
* SourceChannel
* @author donald
* 2017年4月13日
* 上午8:56:17
*/
public class PipeSource implements Runnable {
private Selector selector;
private Pipe.SourceChannel sourceChannel;
public PipeSource(Pipe.SourceChannel sourceChannel) {
this.sourceChannel = sourceChannel;
try {
init();
} catch (IOException e) {
e.printStackTrace();
}
}
private void init() throws IOException{
sourceChannel.configureBlocking(false);
this.selector = Selector.open();
sourceChannel.register(selector, SelectionKey.OP_READ);
}
@SuppressWarnings("rawtypes")
@Override
public void run() {
System.out.println("=========The source is start!===========");
try{
while(true){
selector.select();
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
ite.remove();
if (key.isReadable()) read(key);
}
}
}catch (IOException e) {
e.printStackTrace();
}
}
/**
*
* @param key
* @throws IOException
*/
private void read(SelectionKey key) throws IOException{
Pipe.SourceChannel channel = (Pipe.SourceChannel) key.channel();
ByteBuffer buf = ByteBuffer.allocate(100);
channel.read(buf);
byte[] data = buf.array();
String msg = new String(data,"UTF-8").trim();
System.out.println("message come from sink:"+msg);
}
}
启动主程序管道,控制台输出:
=========The sink is start!===========
send message to source is done...
=========The source is start!===========
message come from sink:Hello source!