NIO-Reactor模式介绍 nioreactor多线程socket
程序员文章站
2022-05-04 21:40:54
...
Reactor模式常用于java nio编程中,跟生产者消费者模式有点类似,可以认为是只有一个线程的生产者消费者模型,netty底层也是使用Reactor模式作为nio部分的开发
一个简单的Reactor模式
Reactor.java
Handler.java
以上模式在正常非阻塞IO的情况效果还可以,但是要提供效率,可以使用多线程处理 io之后的业务逻辑,下面看一下多线程Reactor模式
多线程Reactor模式
ReactorWithPool.java
HandlerWithPool.java
一个简单的Reactor模式
Reactor.java
package com.gbcom.protocol.nio.core; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * 反应器模式,适合nio编码,类似事件驱动编程方式,适合非阻塞IO * * 相比较传统的生产者消费者模式,,由于结合nio,不需要开启多个消费者线程,仅仅需要开启一个Reactor线程进行轮询 * 性能会更高,(reactor 模式 必须结合 nio 也就是 非阻塞的模式使用) * * * netty 使用 reactor模式nio部分的编码 * @author SYZ * @date 2016-11-1 上午10:52:58 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.Reactor */ class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } /* * Alternatively, use explicit SPI provider: SelectorProvider p = * SelectorProvider.provider(); selector = p.openSelector(); serverSocket = * p.openServerSocketChannel(); */ // class Reactor continued public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); //select这个函数是block, Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next())); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } // class Reactor continued //接收者处理方法 class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocket.accept();//1.select()方法仅仅通知有事件到来, 真正的接受 还是要使用 accept //每个客户端 应该只执行一次,也就是 new Handler 会重新注册到该select中,以后不会再重复创建了。。。(待测试) if (c != null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } /** * Test: (Reactor.main) * @param args * @throws IOException */ public static void main(String args[]) throws IOException{ System.out.println(SelectionKey.OP_ACCEPT); Reactor reactor = new Reactor(1107); Thread t = new Thread(reactor); // t.setDaemon(true); t.start(); } }
Handler.java
package com.gbcom.protocol.nio.core; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * 请求的处理,sk.attach(this);是关键,,也就是直接添加到 选择器中,,如果有事件到来,,直接会调用run方法 * * 注意 该方法并没有开启线程处理 * * @author SYZ * @date 2016-11-1 上午11:08:04 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.Handler */ final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(0); ByteBuffer output = ByteBuffer.allocate(100); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0);//把channel 注册到选择器中 sk.attach(this);// 添加到 select中,公用 reactor中的selector线程 sk.interestOps(SelectionKey.OP_READ); sel.wakeup();// 立即执行 } boolean inputIsComplete() { return false; /* ... */ } boolean outputIsComplete() { return false; /* ... */ } void process() { System.out.println("process!"); /* ... */ } // class Handler continued public void run() { try { if (state == READING) read();//IO的read是非阻塞的,但是逻辑是同步,所以有耗时的可能 else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING;// Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); // 设计模式方式 /* * sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE); * sk.selector().wakeup(); */ } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } /** * 简单封装,并没有使用 * @author SYZ * @date 2016-11-1 上午11:21:38 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.Sender */ class Sender implements Runnable { public void run() { // ... try { socket.write(output); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (outputIsComplete()) sk.cancel(); } } }
以上模式在正常非阻塞IO的情况效果还可以,但是要提供效率,可以使用多线程处理 io之后的业务逻辑,下面看一下多线程Reactor模式
多线程Reactor模式
ReactorWithPool.java
package com.gbcom.protocol.nio.core; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * 反应器的多线程版本, * * http://gee.cs.oswego.edu Multiple Reactor Threads " Using Reactor Pools Use * to match CPU and IO rates Static or dynamic construction " Each with own * Selector, Thread, dispatch loop Main acceptor distributes to other reacto * * * 选择器也会资源紧张(未深究),所以可以创建多个 * @author SYZ * @date 2016-11-1 上午11:46:49 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.ReactorWithPool */ class ReactorWithPool implements Runnable { final Selector mainSelector; Selector[] subselectors = new Selector[10];//业务逻辑相关的选择器,,如何创建?? // also create threads int next = 0; final ServerSocketChannel serverSocket; ReactorWithPool(int port) throws IOException { mainSelector = Selector.open(); for(int i=0;i<10;i++){ //add by myself subselectors[i] = Selector.open(); } serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(mainSelector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } /* * Alternatively, use explicit SPI provider: SelectorProvider p = * SelectorProvider.provider(); selector = p.openSelector(); serverSocket = * p.openServerSocketChannel(); */ // class Reactor continued public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { mainSelector.select(); Set selected = mainSelector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next())); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } // class Reactor continued //接收者处理方法 class Acceptor implements Runnable { // inner public void run() { try { SocketChannel connection = serverSocket.accept(); if (connection != null) new Handler(subselectors[next], connection); if (++next == subselectors.length) next = 0; } catch (IOException e) { e.printStackTrace(); } } } /** * Test: (Reactor.main) * * @param args * @throws IOException */ public static void main(String args[]) throws IOException { System.out.println(SelectionKey.OP_ACCEPT); ReactorWithPool reactor = new ReactorWithPool(1107); Thread t = new Thread(reactor); t.setDaemon(true); t.start(); } }
HandlerWithPool.java
package com.gbcom.protocol.nio.core; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; /** * 使用线程池处理read之后的事情,主要包括业务逻辑, * 注意是处理IO read完之后的事情,不是处理IO的读写操作,适用于业务逻辑比较复杂的情况, * * 解决逻辑耗时问题 * * @author SYZ * @date 2016-11-1 上午11:33:03 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.HandlerWithPool */ public class HandlerWithPool implements Runnable{ // uses util.concurrent thread pool static PooledExecutor pool = new PooledExecutor(); static final int PROCESSING = 3; final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(0); ByteBuffer output = ByteBuffer.allocate(100); static final int READING = 0, SENDING = 1; int state = READING; HandlerWithPool(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this);// 添加到 select中,公用 reactor中的selector线程 sk.interestOps(SelectionKey.OP_READ); sel.wakeup();// 立即执行 } boolean inputIsComplete() { return false; /* ... */ } boolean outputIsComplete() { return false; /* ... */ } void process() { System.out.println("process!"); /* ... */ } // class Handler continued public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } synchronized void read() throws IOException, InterruptedException { socket.read(input);//非阻塞,模式,直接获取了数据, if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interestOps(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } /** * 简单封装,并没有使用 * @author SYZ * @date 2016-11-1 上午11:21:38 * @version 1.0.0 * @see com.gbcom.protocol.nio.core.Sender */ class Sender implements Runnable { public void run() { // ... try { socket.write(output); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (outputIsComplete()) sk.cancel(); } } }