欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Android即时通讯系列文章番外篇(1)使用Netty框架快速搭设WebSocket服务器

程序员文章站 2022-04-23 11:52:45
...

「椎锋陷陈」微信技术号现已开通,为了获得第一手的技术文章推送,欢迎搜索关注!

前言

随着本系列所讨论技术点的逐步深入,仅靠之前提到的官方测试服务器已经不能满足我们演示的需要了,于是我们有必要尝试在本地搭建自己的WebSocket服务器,今天这篇文章就是介绍这方面的内容。

由于不属于原先的写作计划之内,同时也为了保持系列文章的连贯性,因此特意将本篇文章命名为「番外篇」。

Netty简单介绍

还记得前面的文章「
Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?
」里我们所提到的吗?WebSocket本身只是一个应用层协议,原则上只要遵循这个协议的客户端/服务端均可使用。对于客户端,前面我们已明确采用OkHttp框架来实现了,而对于服务端,我们则计划采用Netty框架来实现。

Netty是什么?Netty是一款异步的、基于事件驱动的网络应用程序框架,支持快速开发可维护的、高性能的、面向协议的服务端和客户端。

Netty封装了Java NIO API的能力,把原本在高负载下繁琐且容易出错的I/O操作,隐藏在一个简单易用的API之下。这无疑对于缺少服务端编程经验的客户端开发人员是非常友好的,只要把Netty的几个核心组件弄明白了,快速搭设一个满足本项目演示需要的WebSocket服务器基本上没什么问题。

Netty核心组件

Channel

Channel是Netty传输API的核心,被用于所有的I/O操作,Channel 接口所提供的API大大降低了Java中直接使用Socket类的复杂性。

回调

Netty在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以交由一个ChannelHandler的实现处理。

Future

Future提供了一种在操作完成时通知应用程序的方式,可以看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。

Netty提供了自己的实现——ChannelFuture,由ChannelFutureListener提供的通知机制消除了手动检查对应操作是否完成的步骤。

事件和ChannelHandler

Netty使用不同的事件来通知我们状态的改变,这使得我们能够基于已经发生的事件来触发适当的动作。

每个事件都可以被分发给ChannelHandler类,ChannelHandler类中提供了自定义的业务逻辑,架构上有助于保持业务逻辑与网络处理代码的分离。

用IntelliJ IDEA运行Netty的WebSocket演示代码

众所周知,Android Studio是基于IntelliJ IDEA开发的,因此对于习惯了用Android Studio进行开发的Android开发人员,用起IntelliJ IDEA来也几乎没有任何障碍。本篇的目的是快速搭设WebSocket服务器,因此选择直接将Netty的WebSocket演示代码拉取下来运行。在确保项目能成功运行起来的基础上,再逐步去分析演示代码。

该演示代码展示的交互效果很简单,跟前面的官方测试服务器一样,当客户端向服务端发送一个消息,服务器都会将消息原原本本地回传给客户端(没错,又是Echo Test。。。)。虽然看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。

接下来我们分别进行两端的工作:

服务端的工作:

  • IntelliJ IDEA左上角New-Project-Maven创建新工程
  • 拉取Netty的WebSocket演示代码到src目录下
  • 按Alt+Enter快捷键自动导入Netty依赖
  • 运行WebSocketServer类的main()函数

当控制台输出输出语句,即表示WebSocket服务器成功运行在本机上了:

Open your web browser and navigate to http://127.0.0.1:8080/

客户端的工作:

  • 保证手机网络与服务端在同一局域网下
  • 将要连接的WebSocket服务器地址更改为:ws://{服务端IP地址}:8080/websocket
  • 正常发送消息

从控制台可以看到,客户端成功地与WebSocket服务器建立了连接,并在发送消息后成功收到了服务器的回传消息:

Android即时通讯系列文章番外篇(1)使用Netty框架快速搭设WebSocket服务器

WebSocket演示代码分析

总的来说,Netty的WebSocket演示代码中包含了两部分核心工作,其分别的意义以及对应的类如下表所示:

核心工作 意义 对应的类
提供ChannelHandler接口实现 服务器对从客户端接收的数据的业务逻辑处理 WebSocketServerHandler
ServerBootstrap实例创建 配置服务器的启动,将服务器绑定到它要监听连接请求的端口上 WebSocketServer

我们先来看看WebSocketServerHandler类核心工作的主要代码:

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;

    // ...省去其他代码
    
    /**
     * 当有新的消息传入时都会回调
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // ...省去其他代码
        
        // 握手
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // ...省去其他代码
        
        // 对于文本帧和二进制数据帧,将数据简单地回送给了远程节点。
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }

    // ...省去其他代码
    
}

如你所见,为了处理所有接收到的数据,我们重写了WebSocketServerHandler类的channelRead()方法,重写的方法中主要处理了Http请求和WebSocket帧两种类型的数据。

Http请求类型的数据主要是为了处理客户端的握手建立连接过程,详情可参考前面的文章「
Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?
」,这里就不再展开讲了。

而WebSocket帧类型的数据主要是为了处理来自客户端主动发送的消息,我们知道,当WebSocket连接建立之后,后续的数据都是以帧的形式发送。主要包含以下几种类型的帧:

  • 文本帧
  • 二进制帧
  • Ping帧
  • Pong帧
  • 关闭帧

其中,文本帧与二进制帧同属于消息帧,Ping帧和Ping帧主要用于连接保活,关闭帧则用于关闭连接,我们这里主要关心对消息帧的处理,可以看到,我们只是将数据简单回传回了远端节点,从而实现Echo Test。

然后,我们再回过头来看WebSocketServer类的核心工作的主要代码:

ublic final class WebSocketServer {

    // ...省去其他代码
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));

    public static void main(String[] args) throws Exception {
        // ...省去其他代码

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel
             .childHandler(new WebSocketServerInitializer(sslCtx));

            // 使用指定的端口,异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
            Channel ch = b.bind(PORT).sync().channel();

            System.out.println("Open your web browser and navigate to " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            // 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
            ch.closeFuture().sync();
        } finally {
            // 关闭EventLoopGroup,释放所有的资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

我们使用ServerBootstrap引导类来完成Websocket服务器的网络层配置,随后调用bind(int inetPort)方法将进程绑定到某个指定的端口,此过程称之为引导服务器。

我们是如何将前面定义的WebSocketServerHandler与ServerBootstrap关联起来的呢?关键就在于childHandler(ChannelHandler childHandler)方法。

每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链。我们需要提供一个ChannelInitializer的实现,并在其initChannel()回调方法中,将包括WebSocketServerHandler在内的一组自定义的ChannelHandler安装到ChannelPipeline中:

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    // ...省去其他代码

    public WebSocketServerInitializer(SslContext sslCtx) {
        // ...省去其他代码
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new WebSocketServerHandler());
    }
}

将Echo形式改为Broadcast形式

我们之前讲过,现今主流的IM应用几乎都是采用服务器中转的方式来进行消息传输的,为了更好地实践这种设计,我们进一步来对WebSocket服务器进行改造,把Echo形式改为Broadcast形式,即:

当接收到某一客户端的一条消息之后,将该消息转发给服务端维护的、除发送方之外的其他客户端连接。

要实现这一功能我们需要用到ChannelGroup类,ChannelGroup负责跟踪所有活跃中的WebSocket连接,当有新的客户端通过握手成功建立连接后,我们就要把这个新的Channel添加到ChannelGroup中去。

当接收到了WebSocket消息帧数据后,就调用ChannelGroup的writeAndFlush()方法将消息传输给所有已经连接的WebSocket Channel。

ChannelGroup还允许传递过滤参数,我们可以以此过滤掉发送方的Channel。

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    // ...省去其他代码
    private final ChannelGroup group;

    public WebSocketServerHandler(ChannelGroup group) {
        this.group = group;
    }
    
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // ...省去其他代码
        if (frame instanceof TextWebSocketFrame) {
//            ctx.write(frame.retain());
            group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
//            ctx.write(frame.retain());
            group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            // 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}


运行起来之后,让多个客户端连接到此服务器,当客户端中的一个发送了一条消息后,其他连接的客户端会收到由服务器广播的这一条消息:

Android即时通讯系列文章番外篇(1)使用Netty框架快速搭设WebSocket服务器
Android即时通讯系列文章番外篇(1)使用Netty框架快速搭设WebSocket服务器

相关源码已上传到Github

总结

为了满足更多场景的演示需要,我们使用了Netty框架来快速搭建本机的WebSocket服务器。

我们基于Netty的WebSocket演示代码进行改造,核心工作包括以下两部分:

  • 配置服务器的启动,将服务器绑定到它要监听连接请求的端口上
  • 服务器对从客户端接收的数据的业务逻辑处理

我们先是以简单的Echo形式实现了客户端/服务器系统中典型的请求/响应交互模式,并进一步改用广播形式,实现了多个用户之间的相互通信。

Netty框架还有其他更丰富的内容,等待我们逐一探究……如果你也感兴趣,欢迎留意技术号「椎锋陷陈」的后续更新!