Netty 系列八(基于 WebSocket 的简单聊天室).
一、前言
之前写过一篇 spring 集成 websocket 协议的文章 —— spring消息之websocket ,所以对于 websocket 协议的介绍就不多说了,可以参考这篇文章。这里只做一些补充说明。另外,netty 对 websocket 协议的支持要比 spring 好太多了,用起来舒服的多。
websocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。
由 ietf 发布的 websocket rfc,定义了 6 种帧, netty 为它们每种都提供了一个 pojo 实现。下表列出了这些帧类型,并描述了它们的用法。
二、聊天室功能说明
1、a、b、c 等所有用户都可以加入同一个聊天室。
2、a 发送的消息,b、c 可以同时收到,但是 a 收不到自己发送的消息。
3、当用户长时间没有发送消息,系统将把他踢出聊天室。
三、聊天室功能实现
1、netty 版本
<dependency> <groupid>io.netty</groupid> <artifactid>netty-all</artifactid> <version>5.0.0.alpha2</version> </dependency>
2、处理 http 协议的 channelhandler —— 非 websocket 协议的请求,返回 index.html 页面
public class httprequesthandler extends simplechannelinboundhandler<fullhttprequest> { private final string wsuri; private static file index; static { url location = httprequesthandler.class.getprotectiondomain().getcodesource().getlocation(); try { string path = location.touri() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); index = new file(path); } catch (urisyntaxexception e) { e.printstacktrace(); } } public httprequesthandler(string wsuri) { this.wsuri = wsuri; } @override protected void messagereceived(channelhandlercontext ctx, fullhttprequest request) throws exception { // 如果请求了websocket,协议升级,增加引用计数(调用retain()),并将他传递给下一个 channelhandler // 之所以需要调用 retain() 方法,是因为调用 channelread() 之后,资源会被 release() 方法释放掉,需要调用 retain() 保留资源 if (wsuri.equalsignorecase(request.uri())) { ctx.firechannelread(request.retain()); } else { //处理 100 continue 请求以符合 http 1.1 规范 if (httpheaderutil.is100continueexpected(request)) { send100continue(ctx); } // 读取 index.html randomaccessfile randomaccessfile = new randomaccessfile(index, "r"); httpresponse response = new defaulthttpresponse(request.protocolversion(), httpresponsestatus.ok); httpheaders headers = response.headers(); //在该 http 头信息被设置以后,httprequesthandler 将会写回一个 httpresponse 给客户端 headers.set(httpheadernames.content_type, "text/html; charset=utf-8"); boolean keepalive = httpheaderutil.iskeepalive(request); if (keepalive) { headers.setlong(httpheadernames.content_length, randomaccessfile.length()); headers.set(httpheadernames.connection, httpheadervalues.keep_alive); } ctx.write(response); //将 index.html 写给客户端 if (ctx.pipeline().get(sslhandler.class) == null) { ctx.write(new defaultfileregion(randomaccessfile.getchannel(), 0, randomaccessfile.length())); } else { ctx.write(new chunkedniofile(randomaccessfile.getchannel())); } //写 lasthttpcontent 并冲刷至客户端,标记响应的结束 channelfuture channelfuture = ctx.writeandflush(lasthttpcontent.empty_last_content); if (!keepalive) { channelfuture.addlistener(channelfuturelistener.close); } } } private void send100continue(channelhandlercontext ctx) { fullhttpresponse response = new defaultfullhttpresponse(httpversion.http_1_1, httpresponsestatus.continue); ctx.writeandflush(response); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { cause.printstacktrace(); ctx.close(); }
3、处理 websocket 协议的 channelhandler —— 处理 textwebsocketframe 的消息帧
/** * websocket 帧:websocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧 */ public class textwebsocketframehandler extends simplechannelinboundhandler<textwebsocketframe> { private final channelgroup group; public textwebsocketframehandler(channelgroup group) { this.group = group; } @override protected void messagereceived(channelhandlercontext ctx, textwebsocketframe msg) throws exception { //增加消息的引用计数(保留消息),并将他写到 channelgroup 中所有已经连接的客户端 channel channel = ctx.channel(); //自己发送的消息不返回给自己 group.remove(channel); group.writeandflush(msg.retain()); group.add(channel); } @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { //是否握手成功,升级为 websocket 协议 if (evt == websocketserverprotocolhandler.serverhandshakestateevent.handshake_complete) { // 握手成功,移除 httprequesthandler,因此将不会接收到任何消息 // 并把握手成功的 channel 加入到 channelgroup 中 ctx.pipeline().remove(httprequesthandler.class); group.writeandflush(new textwebsocketframe("client " + ctx.channel() + " joined")); group.add(ctx.channel()); } else if (evt instanceof idlestateevent) { idlestateevent stateevent = (idlestateevent) evt; if (stateevent.state() == idlestate.reader_idle) { group.remove(ctx.channel()); ctx.writeandflush(new textwebsocketframe("由于您长时间不在线,系统已自动把你踢下线!")).addlistener(channelfuturelistener.close); } } else { super.usereventtriggered(ctx, evt); } } }
websocket 协议升级完成之后, websocketserverprotocolhandler 将会把 httprequestdecoder 替换为 websocketframedecoder,把 httpresponseencoder 替换为websocketframeencoder。为了性能最大化,它将移除任何不再被 websocket 连接所需要的 channelhandler。这也包括了 httpobjectaggregator 和 httprequesthandler 。
4、chatserverinitializer —— 多个 channelhandler 合并成 channelpipeline 链
public class chatserverinitializer extends channelinitializer<channel> { private final channelgroup group; private static final int read_idle_time_out = 60; // 读超时 private static final int write_idle_time_out = 0;// 写超时 private static final int all_idle_time_out = 0; // 所有超时 public chatserverinitializer(channelgroup group) { this.group = group; } @override protected void initchannel(channel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast(new httpservercodec()); pipeline.addlast(new chunkedwritehandler()); pipeline.addlast(new httpobjectaggregator(64 * 1024)); // 处理那些不发送到 /ws uri的请求 pipeline.addlast(new httprequesthandler("/ws")); // 如果被请求的端点是 "/ws",则处理该升级握手 pipeline.addlast(new websocketserverprotocolhandler("/ws")); // //当连接在60秒内没有接收到消息时,进会触发一个 idlestateevent 事件,被 heartbeathandler 的 usereventtriggered 方法处理 pipeline.addlast(new idlestatehandler(read_idle_time_out, write_idle_time_out, all_idle_time_out, timeunit.seconds)); pipeline.addlast(new textwebsocketframehandler(group)); } }
tips:上面这些开箱即用 channelhandler 的作用,我就不一一介绍了,可以参考。
5、引导类 chatserver
public class chatserver { private final channelgroup channelgroup = new defaultchannelgroup(immediateeventexecutor.instance); private final eventloopgroup group = new nioeventloopgroup(); private channel channel; public channelfuture start(inetsocketaddress address) { serverbootstrap bootstrap = new serverbootstrap(); bootstrap.group(group) .channel(nioserversocketchannel.class) .childhandler(new chatserverinitializer(channelgroup)); channelfuture channelfuture = bootstrap.bind(address); channelfuture.syncuninterruptibly(); channel = channelfuture.channel(); return channelfuture; } public void destroy() { if (channel != null) { channel.close(); } channelgroup.close(); group.shutdowngracefully(); } public static void main(string[] args) { final chatserver chatserver = new chatserver(); channelfuture channelfuture = chatserver.start(new inetsocketaddress(9999)); // 返回与当前java应用程序关联的运行时对象 runtime.getruntime().addshutdownhook(new thread() { @override public void run() { chatserver.destroy(); } }); channelfuture.channel().closefuture().syncuninterruptibly(); } }
三、效果展示
在浏览器中输入 http://127.0.0.1:9999 即可看到预先准备好的 index.html 页面;访问 ws://127.0.0.1:9999/ws (可随意找一个 websocket 测试工具测试)即可加入聊天室。
有点 low 的聊天室总算是完成了,算是 netty 对 http 协议和 websocket 协议的一次实践吧!虽然功能欠缺,但千里之行,始于足下!不积硅步,无以至千里;不积小流,无以成江海!
参考资料:《netty in action》
演示源代码:https://github.com/jmcuixy/nettydemo/tree/master/src/main/java/org/netty/demo/chatroom
上一篇: 低像素CIS传感器缺货严重 上下游全面涨价多达40%
下一篇: 23种设计模式之——代理模式