仿照jetty的nio原理写了个例子
程序员文章站
2022-07-13 17:02:00
...
看了好些天的nio和jetty源码,写了个例子。
太晚了,先直接贴代码了,注释不是很全。
更新了代码的解释和2个疑问在最下面
代码说起来太枯燥,我们可以吧这个server看作一辆行驶中的公交车,车上的每个乘客看成是一个客户端,同时有三个工作人员:观察员,售票员,服务员,负责维护这辆车。
1.观察员(ConnectionHander):
他就负责站在车门口盯着车外,只要发现有想要人上车(客户端发起与服务端连接的请求),就负责把这个人带进汽车。如:有个叫张三的家伙被他带上了车,他会告诉张三:“喂,和车门口的其他人(已经在_changes_con队列中的请求)一起,在车门口排队站好了,别到处跑(把连接扔进队列_changes_con),一会儿售票员会来找你登记的(注册SelectionKey.OP_READ事件)”。然后观察员大吼一声告诉售票员,别发呆(selector.select(),这个方法这里是阻塞的,所以说成发呆)啦,快来登记(selector.wakeup())新乘客。接着观察员继续傻傻的等待(阻塞的等待),一心一意的!直到他发现又有人要上车。
2.售票员(RequestHander):
售票员往车门口看了一眼(遍历_changes_con队列)。果然有个叫张三的小伙子正在门口乖乖的等着,售票员走过去说:“喂,新上来的,来登记吧”,于是,售票员就吧张三的信息登记(sc.register(selector, SelectionKey.OP_READ, id))到她的小本子(selector.keys()这个小本子是售票员专有的,其他人无法操作)上。登记的信息有:到哪一站下车(SelectionKey.OP_READ),张三携带了什么物品(第三个参数)。登记完了张三并且告诉他:“你可以去睡觉了,到站了我会通知你的。”。OK,于是张三就找到个位置,呼呼大睡。不用担心坐过了站。
登记完了新乘客,售票员还得时刻注意车外,是不是到了有人要下车的地方。
很快,到了张三要下车的地方,售票员立刻把张三的名字写在一块黑板上(selector.selectedKeys()),当然黑板上可能不止张三一个人的名字,因为有可能其他人也在这个地方下车。然后叫醒要下车的人,并一个一个更改登记信息(key.interestOps(0)),然后告诉它们:“到后门口排队站好了(_changes_req.add(key)),一会服务员会来给你们开门,让你们下去的”。最后,服务员擦掉黑板上的信息(selector.selectedKeys().clear())
3.服务员(RequestExecutor):
服务员负责处理下车(客户端提交上来的请求)这件事情。她热情的接待了张三(这里是parseRequest方法),并认真的回答了张三提出的各种问题(处理客户端的请求,这里是write这个方法,通过socketchannel向客户端写一些信息)。然后为张三打开车门,让他下车(sc.socket().close(),sc.close())。当然,如果这个时候张三反悔了,说:“我不想下车了,我再做一会儿再下去”。服务员也会热情的帮更改登记信息(interestKey方法),并告诉售票员(selector.wakeup()),:“把张三这小子的信息改一下 key.interestOps(OP)”。OK,张三又可以继续到车上睡大觉了!如果到了他要下车的地方,售票员会再次执行之前相同的操作。
遇到的问题
问题1:处理完请求后,继续注册会卡住,也就是卡在sc.register(selector, SelectionKey.OP_READ, id)
原因:socketChannel注册到selector,需要获取selector的keys的锁,而此时,selector正阻塞在selector.select()方法上面。如果没有新的请求进来,会一直持有keys的锁。注册的时候拿不到keys的锁会阻塞卡住。
解决1:再注册之前手动调用一次selector.wakeup(),让selector和注册这2个线程去竟争
解决2:考虑倒注册操作的开销比较小,完全可以放到selecotr的同一个线程里面(jetty就是这么做的)。请求处理完后,直接调用key的interestOps方法,然后selector.wakeup(),第二次select操作的时候,会重新刷新keys。注册生效。
问题2:socketchannel.read(Bytebuffer)读取含有中文的buffer,然后用CharBuffer charBuffer = decoder.decode(bbuffer)操作会抛出异常 java.nio.charset.UnmappableCharacterException: Input length = 2
原因:客户端的输入编码不对。我用的terminal的编码为UTF-8
解决:设置一下即可。
问题3:输入中文过长的时候,还是偶尔会出现java.nio.charset.MalformedInputException: Input length = 1
原因:中文占用2个字节,如果只读到奇数个字节,解码就会出错。
解决:最好一次性读完所有的输入。所以用while循环来使劲读。每次读的放到数组里面,直到读到的为0,即认为没有信息了。然后解码。见parseRequest方法
2个疑问,期待你的解答!
1.实际上,由于网络原因parseRequest里面count = sc.read(bbuffer)可能读到的确实为0,但是却不是读完了所有信息,api上说读到的为-1即可认为已经读完,可经测试,即使读完了,也没有出现-1的情况,最后一次始终为0。谁能解释一下?
2.频繁调用wakeUp。看到wakeUp的原理,发现么次都是往连接里面写入一个字节,然后selector发现有io进来了,就解除阻塞。资料上说实际应用中这个开销还是不容忽视的。有木有好的办法减少调用?
3.RequestExecutor这个撕循环,会占用100%的cpu,咋办呢?
写了一个改进版,但是问题1让然无法解决,思考中。。。。http://aids198311.iteye.com/blog/1113471
太晚了,先直接贴代码了,注释不是很全。
更新了代码的解释和2个疑问在最下面
package com.daizuan.jetty; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; /** * * * @author daizuan */ public class SimpleJettyServer { private final ConcurrentLinkedQueue<Object> _changes_con = new ConcurrentLinkedQueue<Object>(); private final ConcurrentLinkedQueue<Object> _changes_req = new ConcurrentLinkedQueue<Object>(); private static int DEFAULT_BUFFERSIZE = 16; private static String DEFAULT_CHARSET = "GBK"; private ServerSocketChannel channel; private Selector selector; private int port; private static final String EXIT = "exit"; private static final String FORMAT = "yyyy-MM-dd HH:mm:ss"; public SimpleJettyServer(int port) throws IOException{ this.port = port; this.channel = ServerSocketChannel.open(); this.selector = Selector.open(); } private String getTime() { DateFormat df = new SimpleDateFormat(FORMAT); return df.format(new Date()); } public void listen() throws IOException { // 服务器开始监听端口,提供服务 channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上 channel.configureBlocking(true); new Thread(new ConnectionHander()).start(); new Thread(new RequestExecutor()).start(); new Thread(new RequestHander()).start(); } class ConnectionHander implements Runnable { @Override public void run() { System.out.println("ConnectionHander:connection Hander start......"); while (true) { // 分发连接事件 SocketChannel sc = null; try { // 这里阻塞监听连接事件 sc = channel.accept(); sc.configureBlocking(false); _changes_con.add(sc); // 释放selector的锁,以便接收注册信息 selector.wakeup(); System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * @author daizuan 分发请求 */ class RequestHander implements Runnable { @Override public void run() { System.out.println("RequestHander:Request Hander start......"); while (true) { try { // 阻塞,直到有请求进来 selector.select(); // 因为注册信息需要获取selector的锁,所以需要放在这里处理注册信息 int changes = _changes_con.size(); Object change = null; while (changes-- > 0 && (change = _changes_con.poll()) != null) { try { if (change instanceof SocketChannel) { SocketChannel sc = (SocketChannel) change; String id = "[" + sc.socket().getRemoteSocketAddress() + "] "; sc.register(selector, SelectionKey.OP_READ, id); write(sc, "hello:" + id + ",please input something!\n"); System.out.println("a client connected!" + id); } } catch (Exception e) { e.printStackTrace(); } } Set<SelectionKey> keys = selector.selectedKeys(); // 处理请求信息,扔进请求队列 for (SelectionKey key : keys) { if (key.isReadable()) { // 取消注册事件 key.interestOps(0); _changes_req.add(key); } } selector.selectedKeys().clear(); } catch (Exception e) { e.printStackTrace(); } } } } private void close(SocketChannel sc) { if (sc != null && sc.socket() != null) { try { sc.socket().close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (sc != null) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void write(SocketChannel sc, String str) { try { sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 数组扩容 * * @param src byte[] 源数组数据 * @param size int 扩容的增加量 * @return byte[] 扩容后的数组 */ private byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; } private String parseRequest(SocketChannel sc) throws IOException { ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE); int count = 0; int off = 0; byte[] data = new byte[DEFAULT_BUFFERSIZE * 10]; bbuffer.clear(); // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完 while ((count = sc.read(bbuffer)) > 0) { bbuffer.flip(); if ((off + count) > data.length) { data = grow(data, DEFAULT_BUFFERSIZE * 10); } byte[] buf = bbuffer.array(); System.arraycopy(buf, 0, data, off, count); off += count; } if (count < 0) { return null; } byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return new String(req, DEFAULT_CHARSET).trim(); } private void interestKey(SelectionKey key, int OP) { key.interestOps(OP); selector.wakeup(); } private boolean needToCanncel(String request) { return EXIT.equals(request); } /** * @author daizuan 处理请求 */ class RequestExecutor implements Runnable { @Override public void run() { System.out.println("RequestExecutor:Request Executor start......"); while (true) { int changes = _changes_req.size(); Object change = null; while (changes-- > 0 && (change = _changes_req.poll()) != null) { try { if (change instanceof SelectionKey) { SelectionKey key = (SelectionKey) change; SocketChannel sc = (SocketChannel) key.channel(); // 解析出请求 String request = parseRequest(sc); // 客户端退出 if (request == null) { close(sc); continue; } String id = (String) key.attachment(); System.out.println("read [" + request + "] from " + id); if (needToCanncel(request)) { System.out.println(id + "I am die!"); close(sc); continue; } // 向客户端写一些信息 write(sc, "[" + getTime() + "] " + request + "\n"); // 重新设置key,需要获得锁。所以从阻塞唤醒selector interestKey(key, SelectionKey.OP_READ); // 如果不想继续,再这里关掉吧 // sc.socket().close(); // sc.close(); } } catch (Exception e) { e.printStackTrace(); } } } } } public static void main(String[] args) throws IOException { // System.out.println("server start........."); SimpleJettyServer server = new SimpleJettyServer(6789); server.listen(); // 服务器开始监听端口,提供服务 } }
代码说起来太枯燥,我们可以吧这个server看作一辆行驶中的公交车,车上的每个乘客看成是一个客户端,同时有三个工作人员:观察员,售票员,服务员,负责维护这辆车。
1.观察员(ConnectionHander):
他就负责站在车门口盯着车外,只要发现有想要人上车(客户端发起与服务端连接的请求),就负责把这个人带进汽车。如:有个叫张三的家伙被他带上了车,他会告诉张三:“喂,和车门口的其他人(已经在_changes_con队列中的请求)一起,在车门口排队站好了,别到处跑(把连接扔进队列_changes_con),一会儿售票员会来找你登记的(注册SelectionKey.OP_READ事件)”。然后观察员大吼一声告诉售票员,别发呆(selector.select(),这个方法这里是阻塞的,所以说成发呆)啦,快来登记(selector.wakeup())新乘客。接着观察员继续傻傻的等待(阻塞的等待),一心一意的!直到他发现又有人要上车。
2.售票员(RequestHander):
售票员往车门口看了一眼(遍历_changes_con队列)。果然有个叫张三的小伙子正在门口乖乖的等着,售票员走过去说:“喂,新上来的,来登记吧”,于是,售票员就吧张三的信息登记(sc.register(selector, SelectionKey.OP_READ, id))到她的小本子(selector.keys()这个小本子是售票员专有的,其他人无法操作)上。登记的信息有:到哪一站下车(SelectionKey.OP_READ),张三携带了什么物品(第三个参数)。登记完了张三并且告诉他:“你可以去睡觉了,到站了我会通知你的。”。OK,于是张三就找到个位置,呼呼大睡。不用担心坐过了站。
登记完了新乘客,售票员还得时刻注意车外,是不是到了有人要下车的地方。
很快,到了张三要下车的地方,售票员立刻把张三的名字写在一块黑板上(selector.selectedKeys()),当然黑板上可能不止张三一个人的名字,因为有可能其他人也在这个地方下车。然后叫醒要下车的人,并一个一个更改登记信息(key.interestOps(0)),然后告诉它们:“到后门口排队站好了(_changes_req.add(key)),一会服务员会来给你们开门,让你们下去的”。最后,服务员擦掉黑板上的信息(selector.selectedKeys().clear())
3.服务员(RequestExecutor):
服务员负责处理下车(客户端提交上来的请求)这件事情。她热情的接待了张三(这里是parseRequest方法),并认真的回答了张三提出的各种问题(处理客户端的请求,这里是write这个方法,通过socketchannel向客户端写一些信息)。然后为张三打开车门,让他下车(sc.socket().close(),sc.close())。当然,如果这个时候张三反悔了,说:“我不想下车了,我再做一会儿再下去”。服务员也会热情的帮更改登记信息(interestKey方法),并告诉售票员(selector.wakeup()),:“把张三这小子的信息改一下 key.interestOps(OP)”。OK,张三又可以继续到车上睡大觉了!如果到了他要下车的地方,售票员会再次执行之前相同的操作。
遇到的问题
问题1:处理完请求后,继续注册会卡住,也就是卡在sc.register(selector, SelectionKey.OP_READ, id)
原因:socketChannel注册到selector,需要获取selector的keys的锁,而此时,selector正阻塞在selector.select()方法上面。如果没有新的请求进来,会一直持有keys的锁。注册的时候拿不到keys的锁会阻塞卡住。
解决1:再注册之前手动调用一次selector.wakeup(),让selector和注册这2个线程去竟争
解决2:考虑倒注册操作的开销比较小,完全可以放到selecotr的同一个线程里面(jetty就是这么做的)。请求处理完后,直接调用key的interestOps方法,然后selector.wakeup(),第二次select操作的时候,会重新刷新keys。注册生效。
问题2:socketchannel.read(Bytebuffer)读取含有中文的buffer,然后用CharBuffer charBuffer = decoder.decode(bbuffer)操作会抛出异常 java.nio.charset.UnmappableCharacterException: Input length = 2
原因:客户端的输入编码不对。我用的terminal的编码为UTF-8
解决:设置一下即可。
问题3:输入中文过长的时候,还是偶尔会出现java.nio.charset.MalformedInputException: Input length = 1
原因:中文占用2个字节,如果只读到奇数个字节,解码就会出错。
解决:最好一次性读完所有的输入。所以用while循环来使劲读。每次读的放到数组里面,直到读到的为0,即认为没有信息了。然后解码。见parseRequest方法
2个疑问,期待你的解答!
1.实际上,由于网络原因parseRequest里面count = sc.read(bbuffer)可能读到的确实为0,但是却不是读完了所有信息,api上说读到的为-1即可认为已经读完,可经测试,即使读完了,也没有出现-1的情况,最后一次始终为0。谁能解释一下?
2.频繁调用wakeUp。看到wakeUp的原理,发现么次都是往连接里面写入一个字节,然后selector发现有io进来了,就解除阻塞。资料上说实际应用中这个开销还是不容忽视的。有木有好的办法减少调用?
3.RequestExecutor这个撕循环,会占用100%的cpu,咋办呢?
写了一个改进版,但是问题1让然无法解决,思考中。。。。http://aids198311.iteye.com/blog/1113471
上一篇: Java网络服务器编程(NIO版)
推荐阅读