欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty简单入门:获取请求、多客户端连接与通信、心跳检测、长链接

程序员文章站 2024-03-23 09:18:40
...

目的及介绍

项目源码

第一个Netty程序:使用Netty来获取请求

目的及介绍

  • Netty是一款类似于Tomcat的服务器,它更关注网络编程,相对来说网络通信性能更高。
  • 搭建一款简单的Netty程序,只编写简单的服务端程序,通过浏览器或者bash的curl方式来感受一下netty的通信模式

搭建、设计思路

  • 一个简单且完整的Netty服务器端程序,包括一个Server、ServerInitializer、ServerHandle
  • Server主要用于接收请求、ServerInitializer用于建立pipeline、ServerHandle用于对channel进行handle处理
    • Server:
      • 包含两个EventLoopGroup,bossExecutors用于接收外部传入的请求,然后交给workExecutors去执行。
      • ServerBootstrap用于启动该项目
        • serverBootstrap建一个组,组里有boss和work,然后通过反射获取一个channel,对channel建立handle,一般这个handle是我们自己实现。
        • serverBootstrap需要绑定端口。
        • serverBootstrap的channel执行完需要关闭。
      • 优雅的关闭boss和work
    • ServerInitializer:
      • 继承自ChannelInitializer,需要建立pipeline
      • 通过addLast,将执行操作增加到pipeline的结尾
    • ServerHandle:
      • 继承自SimpleChannelInboundHandler,在channelRead0方法内写具体逻辑
      • handle是一个回调方法,将接收到的msg转化为httpRequest
      • 将需要发送的信息,通过Unpooled.copiedBuffer转化为一个ByteBuf
      • 将这个ByteBuf通过DefaultFullHttpResponse包装,同时设置请求头headers
      • 最后,使用writeAndFlush的方式发送消息
  • 启动服务,通过浏览器,或者使用curl访问localhost:8899,就可以拿到你想要的信息

具体步骤

  • Server:
public class TestServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossExecutors = new NioEventLoopGroup();
        EventLoopGroup workExecutors = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossExecutors,workExecutors).channel(NioServerSocketChannel.class).
                    childHandler(new TestServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossExecutors.shutdownGracefully();
            workExecutors.shutdownGracefully();
        }
    }
}
  • ServerInitializer:
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("httpServerCodec",new HttpServerCodec());
        pipeline.addLast("testHttpServerHandler",new TestHttpServerHandle());
    }
}
  • ServerHandle:
public class TestHttpServerHandle extends SimpleChannelInboundHandler<HttpObject> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) msg;
            System.out.println("请求方法名:" + httpRequest.method().name());

            ByteBuf content = Unpooled.copiedBuffer("ljfirst", CharsetUtil.UTF_8);

            FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK, content);
            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            ctx.writeAndFlush(fullHttpResponse);
        }
    }
}

排坑细节

  • ctx.writeAndFlush(fullHttpResponse),这里不要填写成msg了,不是发生消息,而是发送包装好的消息。
  • ServerHandle继承自SimpleChannelInboundHandler,此处是一个范性,最好填入HttpObject。
  • TestHttpServerHandle类中,对favicon.ico的拦截,需要注意此处的 “/”,写错了不拦截

第二个Netty程序:多客户端连接与通信

目的及介绍

  • 编写一个多客户端的程序,与服务器端通信
  • 满足某客户端上线,服务器端能通知其他客户端:上线通知
  • 满足某客户发送消息,服务器端能转发至其他客户端

搭建、设计思路

  • 客户端设计
    • 为了能获取服务器发送的上线通知和其他客户端发送来的广播信息。客户端只需要在MutilClientHandler的channelRead0回调中输出信息就行。
    • 编解码工作需要在MutilClientInitializer中进行
    • 为了能发送消息,客户端必须有个死循环,不断从system.in获取用户输入的消息
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
    channel.writeAndFlush(br.readLine()+"\r\n");
}
  • 服务器端设计
    • 编解码工作需要在ServerInitializer中进行
    • 当客户端上线时,会触发ServerHandle的channelActive方法,此时可以发送广播消息,通知其他客户端,通知的方法用channelGroup的广播方法channelGroup.writeAndFlush
    • 当客户端发送消息时,会触发ServerHandle的channelRead0方法,需要写个方法判断信息来源,并发送消息至其他客户端
channelGroup.forEach(dfg -> {
  if (channel != dfg) {
      dfg.writeAndFlush(channel.remoteAddress() + "发送来消息:" + msg +"\n");
  } else {
    channel.writeAndFlush("[自己]发送来消息:" + msg +"\n");
  }
});

具体步骤

  • 这里只显示handler的处理方法,其他代码见github
  • MutilClientHandler
public class MutilClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("client output:" + msg);
    }
}
  • ServerHandle
public class ServerHandle extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(dfg -> {
            if (channel != dfg) {
                dfg.writeAndFlush(channel.remoteAddress() + "发送来消息:" + msg +"\n");
            } else {
                channel.writeAndFlush("[自己]发送来消息:" + msg +"\n");
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "channelActive");
        channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线channelActive"+"\n");
        channelGroup.add(channel);
        System.out.println("channelGroup.size():" + channelGroup.size());
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "-----channelInactive");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "handlerAdded");
        channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线"+"\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "handlerRemoved");
        channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经下线"+"\n");
        System.out.println("channelGroup.size():" + channelGroup.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

排坑细节

  • idea如何想启动多客户端,需要在run/debug configurations中勾选allow parallel run
  • 大坑:其他用户上线后,根本没有收到上线通知,或者没有收到其他客户端发送的消息
    • 因为ServerInitializer设置来编解码方式,
    • 所以ServerHandle,必须在msg后面显式的加上"\n" 或者"\t"
    • 不设置则不会收到消息,服务端会一直存储着该消息

第三个Netty程序:心跳检测

目的及介绍

  • 模仿一个服务器端检测客服端心跳的机制
  • 场景:在长链接情况下,服务器端需要通过心跳机制来确认客服端存活情况。

搭建、设计思路

  • 服务器端
    • 跟之前的设计一样,由server、serverinitializer、serverhandler组成
    • 看代码会发现server多了一个handler(new LoggingHandler(LogLevel.INFO)),用于拦截日志多
    • 多了一个IdleStateHandler,它的参数是(读,写,all,时间单位)。
    • 自己写的ServerHandle03,主要是判断event.state()的状态,然后简单打印输出。
  • 客户端
    • 代码同02
  • 测试设计
    • 通过心跳检测读操作:设置读写检测时间为(5,7,10),启动服务器端和客服端,客户短在5s内没有输入,服务器端显示读超时。
    • 通过心跳检测写操作:设置读写检测时间为(5,7,10),启动服务器端和客服端,客户短在5s内连续输入,服务器端没有输出,于是服务器端显示写超时。
    • 通过心跳检测读写操作:设置读写检测时间为(10,7,5),启动服务器端和客服端,客户短在5s内没有输入,服务器端显示读写超时,即只要读写有一个超时,即报错。

具体步骤

  • 服务器端的Initializer
public class ServerInitializer03 extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
        pipeline.addLast(new ServerHandle03());
    }
}
  • 服务器端的handler
public class ServerHandle03 extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "read idel";
                    break;
                case WRITER_IDLE:
                    eventType = "write idel";
                    break;
                case ALL_IDLE:
                    eventType = "read and write idel";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超时事件" + eventType);
            ctx.channel().close();
        }
    }
}

排坑细节

  • 端口号是否占用
  • 代码要区分ChannelHandlerContext ctx, Object evt的使用区别。

第四个Netty程序:长链接的使用

目的及介绍

  • 通过WebSocket模仿TCP长链接

搭建、设计思路

  • 在service的childHandler的pipeline中加入HttpObjectAggregator和WebSocketServerProtocolHandler
  • HttpObjectAggregator主要用于对信息按照指定大小分块
  • WebSocketServerProtocolHandler主要用于指定访问路径
  • 收到的消息在TextWebSocketFrameHandle04中输出

具体步骤

  • 服务端Server04的代码
public class Server04 {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossExecutors = new NioEventLoopGroup();
        EventLoopGroup workExecutors = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossExecutors,workExecutors).channel(NioServerSocketChannel.class).
                    //handler是用于处理bossExecutors
                    //childHandler是用于处理workExecutors
                    handler(new LoggingHandler(LogLevel.INFO)).
                    childHandler(new WebSocketInitializer04());

            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9050)).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossExecutors.shutdownGracefully();
            workExecutors.shutdownGracefully();
        }
    }
}
  • 服务端WebSocketInitializer04的代码
public class WebSocketInitializer04 extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ljfirst"));
        pipeline.addLast(new TextWebSocketFrameHandle04());
    }
}
  • 客户端模拟交互的网页
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>long connection</title>
</head>
<body>
<script type="text/javascript">
    var socket;
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:9050/ljfirst");

        socket.onmessage = function (event) {
            var ta = document.getElementById("responsetext");
            ta.value = ta.value + "\n" + event.data;
        }

        socket.onopen = function (event) {
            var tt = document.getElementById("responsetext");
            tt.value = "连接开启";
        }

        socket.onclose = function (event) {
            var clt = document.getElementById("responsetext");
            clt.value = clt.value + "\n" + "连接断掉";
        }
    } else {
        alert("浏览器不支持websocket")
    }

    function send(msg) {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(msg);
        }else {
            alert("连接尚未开启");
        }
    }

</script>


<form onsubmit="return false;">

    <!-- 此处的msg 关联上方的send(msg)-->
    <textarea name="msg" style="width: 400px; height: 200px"></textarea>
    <input type="button"  onclick="send(this.form.msg.value)" value="发送">

    <h3>服务端输出:</h3>
    <textarea id="responsetext" style="width: 400px; height: 200px"></textarea>
    <input type="button" onclick="javascript: document.getElementById('responsetext').value=''" value="清空内容">


</form>
</body>
</html>

排坑细节

  • pipeline.addLast(new HttpObjectAggregator(8192));这一句中的8192是分块大小
  • html网页中form的textarea中的name 与 script中function的name需要对应