欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty 系列八(基于 WebSocket 的简单聊天室).

程序员文章站 2022-04-25 18:07:49
一、前言 之前写过一篇 Spring 集成 WebSocket 协议的文章 —— Spring消息之WebSocket ,所以对于 WebSocket 协议的介绍就不多说了,可以参考这篇文章。这里只做一些补充说明。另外,Netty 对 WebSocket 协议的支持要比 Spring 好太多了,用起 ......

一、前言

    之前写过一篇 spring 集成 websocket 协议的文章 —— spring消息之websocket ,所以对于 websocket 协议的介绍就不多说了,可以参考这篇文章。这里只做一些补充说明。另外,netty 对 websocket 协议的支持要比 spring 好太多了,用起来舒服的多。

    websocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

    由 ietf 发布的 websocket rfc,定义了 6 种帧, netty 为它们每种都提供了一个 pojo 实现。下表列出了这些帧类型,并描述了它们的用法。

Netty 系列八(基于 WebSocket 的简单聊天室).

二、聊天室功能说明

    1、a、b、c 等所有用户都可以加入同一个聊天室。

    2、a 发送的消息,b、c 可以同时收到,但是 a 收不到自己发送的消息。

    3、当用户长时间没有发送消息,系统将把他踢出聊天室。

Netty 系列八(基于 WebSocket 的简单聊天室).   Netty 系列八(基于 WebSocket 的简单聊天室).

三、聊天室功能实现

    1、netty 版本

<dependency>
     <groupid>io.netty</groupid>
     <artifactid>netty-all</artifactid>
     <version>5.0.0.alpha2</version>
</dependency>

    2、处理 http 协议的 channelhandler —— 非 websocket 协议的请求,返回 index.html 页面

public class httprequesthandler extends simplechannelinboundhandler<fullhttprequest> {

    private final string wsuri;
    private static file index;

    static {
        url location = httprequesthandler.class.getprotectiondomain().getcodesource().getlocation();
        try {
            string path = location.touri() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            index = new file(path);
        } catch (urisyntaxexception e) {
            e.printstacktrace();
        }
    }

    public httprequesthandler(string wsuri) {
        this.wsuri = wsuri;
    }


    @override
    protected void messagereceived(channelhandlercontext ctx, fullhttprequest request) throws exception {
        // 如果请求了websocket,协议升级,增加引用计数(调用retain()),并将他传递给下一个 channelhandler
        // 之所以需要调用 retain() 方法,是因为调用 channelread() 之后,资源会被 release() 方法释放掉,需要调用 retain() 保留资源
        if (wsuri.equalsignorecase(request.uri())) {
            ctx.firechannelread(request.retain());
        } else {
            //处理 100 continue 请求以符合 http 1.1 规范
            if (httpheaderutil.is100continueexpected(request)) {
                send100continue(ctx);
            }
            // 读取 index.html
            randomaccessfile randomaccessfile = new randomaccessfile(index, "r");
            httpresponse response = new defaulthttpresponse(request.protocolversion(), httpresponsestatus.ok);
            httpheaders headers = response.headers();
            //在该 http 头信息被设置以后,httprequesthandler 将会写回一个 httpresponse 给客户端
            headers.set(httpheadernames.content_type, "text/html; charset=utf-8");
            boolean keepalive = httpheaderutil.iskeepalive(request);
            if (keepalive) {
                headers.setlong(httpheadernames.content_length, randomaccessfile.length());
                headers.set(httpheadernames.connection, httpheadervalues.keep_alive);
            }
            ctx.write(response);
            //将 index.html 写给客户端
            if (ctx.pipeline().get(sslhandler.class) == null) {
                ctx.write(new defaultfileregion(randomaccessfile.getchannel(), 0, randomaccessfile.length()));
            } else {
                ctx.write(new chunkedniofile(randomaccessfile.getchannel()));
            }
            //写 lasthttpcontent 并冲刷至客户端,标记响应的结束
            channelfuture channelfuture = ctx.writeandflush(lasthttpcontent.empty_last_content);
            if (!keepalive) {
                channelfuture.addlistener(channelfuturelistener.close);
            }
        }

    }

    private void send100continue(channelhandlercontext ctx) {
        fullhttpresponse response = new defaultfullhttpresponse(httpversion.http_1_1, httpresponsestatus.continue);
        ctx.writeandflush(response);
    }

    @override
    public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
        cause.printstacktrace();
        ctx.close();
    }

    3、处理 websocket 协议的 channelhandler —— 处理 textwebsocketframe 的消息帧

/**
 * websocket 帧:websocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧
 */
public class textwebsocketframehandler extends simplechannelinboundhandler<textwebsocketframe> {

    private final channelgroup group;

    public textwebsocketframehandler(channelgroup group) {
        this.group = group;
    }

    @override
    protected void messagereceived(channelhandlercontext ctx, textwebsocketframe msg) throws exception {
        //增加消息的引用计数(保留消息),并将他写到 channelgroup 中所有已经连接的客户端
        channel channel = ctx.channel();
        //自己发送的消息不返回给自己
        group.remove(channel);
        group.writeandflush(msg.retain());
        group.add(channel);
    }

    @override
    public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
        //是否握手成功,升级为 websocket 协议
        if (evt == websocketserverprotocolhandler.serverhandshakestateevent.handshake_complete) {
            // 握手成功,移除 httprequesthandler,因此将不会接收到任何消息
            // 并把握手成功的 channel 加入到 channelgroup 中
            ctx.pipeline().remove(httprequesthandler.class);
            group.writeandflush(new textwebsocketframe("client " + ctx.channel() + " joined"));
            group.add(ctx.channel());
        } else if (evt instanceof idlestateevent) {
            idlestateevent stateevent = (idlestateevent) evt;
            if (stateevent.state() == idlestate.reader_idle) {
                group.remove(ctx.channel());
                ctx.writeandflush(new textwebsocketframe("由于您长时间不在线,系统已自动把你踢下线!")).addlistener(channelfuturelistener.close);
            }
        } else {
            super.usereventtriggered(ctx, evt);
        }
    }
}

   websocket 协议升级完成之后, websocketserverprotocolhandler 将会把 httprequestdecoder 替换为 websocketframedecoder,把 httpresponseencoder 替换为websocketframeencoder。为了性能最大化,它将移除任何不再被 websocket 连接所需要的 channelhandler。这也包括了 httpobjectaggregator 和 httprequesthandler 。

    4、chatserverinitializer —— 多个 channelhandler 合并成 channelpipeline 链

Netty 系列八(基于 WebSocket 的简单聊天室).
public class chatserverinitializer extends channelinitializer<channel> {

    private final channelgroup group;
    private static final int read_idle_time_out = 60; // 读超时
    private static final int write_idle_time_out = 0;// 写超时
    private static final int all_idle_time_out = 0; // 所有超时

    public chatserverinitializer(channelgroup group) {
        this.group = group;
    }

    @override
    protected void initchannel(channel ch) throws exception {
        channelpipeline pipeline = ch.pipeline();
        pipeline.addlast(new httpservercodec());
        pipeline.addlast(new chunkedwritehandler());
        pipeline.addlast(new httpobjectaggregator(64 * 1024));
        // 处理那些不发送到 /ws uri的请求
        pipeline.addlast(new httprequesthandler("/ws"));
        // 如果被请求的端点是 "/ws",则处理该升级握手
        pipeline.addlast(new websocketserverprotocolhandler("/ws"));
        // //当连接在60秒内没有接收到消息时,进会触发一个 idlestateevent 事件,被 heartbeathandler 的 usereventtriggered 方法处理
        pipeline.addlast(new idlestatehandler(read_idle_time_out, write_idle_time_out, all_idle_time_out, timeunit.seconds));
        pipeline.addlast(new textwebsocketframehandler(group));

    }
}
chatserverinitializer.java

tips:上面这些开箱即用 channelhandler 的作用,我就不一一介绍了,可以参考

    5、引导类 chatserver

Netty 系列八(基于 WebSocket 的简单聊天室).
public class chatserver {

    private final channelgroup channelgroup = new defaultchannelgroup(immediateeventexecutor.instance);
    private final eventloopgroup group = new nioeventloopgroup();
    private channel channel;

    public channelfuture start(inetsocketaddress address) {
        serverbootstrap bootstrap = new serverbootstrap();
        bootstrap.group(group)
                .channel(nioserversocketchannel.class)
                .childhandler(new chatserverinitializer(channelgroup));
        channelfuture channelfuture = bootstrap.bind(address);
        channelfuture.syncuninterruptibly();
        channel = channelfuture.channel();
        return channelfuture;
    }

    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        channelgroup.close();
        group.shutdowngracefully();
    }

    public static void main(string[] args) {
        final chatserver chatserver = new chatserver();
        channelfuture channelfuture = chatserver.start(new inetsocketaddress(9999));
        // 返回与当前java应用程序关联的运行时对象
        runtime.getruntime().addshutdownhook(new thread() {
            @override
            public void run() {
                chatserver.destroy();
            }
        });
        channelfuture.channel().closefuture().syncuninterruptibly();
    }

}
chatserver.java

三、效果展示

    在浏览器中输入 http://127.0.0.1:9999 即可看到预先准备好的 index.html 页面;访问 ws://127.0.0.1:9999/ws (可随意找一个 websocket 测试工具测试)即可加入聊天室。

Netty 系列八(基于 WebSocket 的简单聊天室).

有点 low 的聊天室总算是完成了,算是 netty 对 http 协议和 websocket 协议的一次实践吧!虽然功能欠缺,但千里之行,始于足下!不积硅步,无以至千里;不积小流,无以成江海!

 

参考资料:《netty in action》

演示源代码:https://github.com/jmcuixy/nettydemo/tree/master/src/main/java/org/netty/demo/chatroom