Netty简单入门:获取请求、多客户端连接与通信、心跳检测、长链接
程序员文章站
2024-03-23 09:18:40
...
目的及介绍
- Netty是一款类似于Tomcat的服务器,它更关注网络编程,相对来说网络通信性能更高。
- 本文主要介绍Netty的简单入门,内容包括:
项目源码
第一个Netty程序:使用Netty来获取请求
目的及介绍
- Netty是一款类似于Tomcat的服务器,它更关注网络编程,相对来说网络通信性能更高。
- 搭建一款简单的Netty程序,只编写简单的服务端程序,通过浏览器或者bash的curl方式来感受一下netty的通信模式
搭建、设计思路
- 一个简单且完整的Netty服务器端程序,包括一个Server、ServerInitializer、ServerHandle
- Server主要用于接收请求、ServerInitializer用于建立pipeline、ServerHandle用于对channel进行handle处理
- Server:
- 包含两个EventLoopGroup,bossExecutors用于接收外部传入的请求,然后交给workExecutors去执行。
- ServerBootstrap用于启动该项目
- serverBootstrap建一个组,组里有boss和work,然后通过反射获取一个channel,对channel建立handle,一般这个handle是我们自己实现。
- serverBootstrap需要绑定端口。
- serverBootstrap的channel执行完需要关闭。
- 优雅的关闭boss和work
- ServerInitializer:
- 继承自ChannelInitializer,需要建立pipeline
- 通过addLast,将执行操作增加到pipeline的结尾
- ServerHandle:
- 继承自SimpleChannelInboundHandler,在channelRead0方法内写具体逻辑
- handle是一个回调方法,将接收到的msg转化为httpRequest
- 将需要发送的信息,通过Unpooled.copiedBuffer转化为一个ByteBuf
- 将这个ByteBuf通过DefaultFullHttpResponse包装,同时设置请求头headers
- 最后,使用writeAndFlush的方式发送消息
- Server:
- 启动服务,通过浏览器,或者使用curl访问localhost:8899,就可以拿到你想要的信息
具体步骤
- Server:
public class TestServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossExecutors = new NioEventLoopGroup();
EventLoopGroup workExecutors = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossExecutors,workExecutors).channel(NioServerSocketChannel.class).
childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossExecutors.shutdownGracefully();
workExecutors.shutdownGracefully();
}
}
}
- ServerInitializer:
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpServerCodec",new HttpServerCodec());
pipeline.addLast("testHttpServerHandler",new TestHttpServerHandle());
}
}
- ServerHandle:
public class TestHttpServerHandle extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("请求方法名:" + httpRequest.method().name());
ByteBuf content = Unpooled.copiedBuffer("ljfirst", CharsetUtil.UTF_8);
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, content);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
ctx.writeAndFlush(fullHttpResponse);
}
}
}
排坑细节
- ctx.writeAndFlush(fullHttpResponse),这里不要填写成msg了,不是发生消息,而是发送包装好的消息。
- ServerHandle继承自SimpleChannelInboundHandler,此处是一个范性,最好填入HttpObject。
- TestHttpServerHandle类中,对favicon.ico的拦截,需要注意此处的 “/”,写错了不拦截
第二个Netty程序:多客户端连接与通信
目的及介绍
- 编写一个多客户端的程序,与服务器端通信
- 满足某客户端上线,服务器端能通知其他客户端:上线通知
- 满足某客户发送消息,服务器端能转发至其他客户端
搭建、设计思路
- 客户端设计
- 为了能获取服务器发送的上线通知和其他客户端发送来的广播信息。客户端只需要在MutilClientHandler的channelRead0回调中输出信息就行。
- 编解码工作需要在MutilClientInitializer中进行
- 为了能发送消息,客户端必须有个死循环,不断从system.in获取用户输入的消息
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
channel.writeAndFlush(br.readLine()+"\r\n");
}
- 服务器端设计
- 编解码工作需要在ServerInitializer中进行
- 当客户端上线时,会触发ServerHandle的channelActive方法,此时可以发送广播消息,通知其他客户端,通知的方法用channelGroup的广播方法channelGroup.writeAndFlush
- 当客户端发送消息时,会触发ServerHandle的channelRead0方法,需要写个方法判断信息来源,并发送消息至其他客户端
channelGroup.forEach(dfg -> {
if (channel != dfg) {
dfg.writeAndFlush(channel.remoteAddress() + "发送来消息:" + msg +"\n");
} else {
channel.writeAndFlush("[自己]发送来消息:" + msg +"\n");
}
});
具体步骤
- 这里只显示handler的处理方法,其他代码见github
- MutilClientHandler
public class MutilClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client output:" + msg);
}
}
- ServerHandle
public class ServerHandle extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(dfg -> {
if (channel != dfg) {
dfg.writeAndFlush(channel.remoteAddress() + "发送来消息:" + msg +"\n");
} else {
channel.writeAndFlush("[自己]发送来消息:" + msg +"\n");
}
});
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "channelActive");
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线channelActive"+"\n");
channelGroup.add(channel);
System.out.println("channelGroup.size():" + channelGroup.size());
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "-----channelInactive");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "handlerAdded");
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线"+"\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "handlerRemoved");
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经下线"+"\n");
System.out.println("channelGroup.size():" + channelGroup.size());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
排坑细节
- idea如何想启动多客户端,需要在run/debug configurations中勾选allow parallel run
-
大坑:其他用户上线后,根本没有收到上线通知,或者没有收到其他客户端发送的消息
- 因为ServerInitializer设置来编解码方式,
- 所以ServerHandle,必须在msg后面显式的加上"\n" 或者"\t"
- 不设置则不会收到消息,服务端会一直存储着该消息
第三个Netty程序:心跳检测
目的及介绍
- 模仿一个服务器端检测客服端心跳的机制
- 场景:在长链接情况下,服务器端需要通过心跳机制来确认客服端存活情况。
搭建、设计思路
- 服务器端
- 跟之前的设计一样,由server、serverinitializer、serverhandler组成
- 看代码会发现server多了一个handler(new LoggingHandler(LogLevel.INFO)),用于拦截日志多
- 多了一个IdleStateHandler,它的参数是(读,写,all,时间单位)。
- 自己写的ServerHandle03,主要是判断event.state()的状态,然后简单打印输出。
- 客户端
- 代码同02
- 测试设计
- 通过心跳检测读操作:设置读写检测时间为(5,7,10),启动服务器端和客服端,客户短在5s内没有输入,服务器端显示读超时。
- 通过心跳检测写操作:设置读写检测时间为(5,7,10),启动服务器端和客服端,客户短在5s内连续输入,服务器端没有输出,于是服务器端显示写超时。
- 通过心跳检测读写操作:设置读写检测时间为(10,7,5),启动服务器端和客服端,客户短在5s内没有输入,服务器端显示读写超时,即只要读写有一个超时,即报错。
具体步骤
- 服务器端的Initializer
public class ServerInitializer03 extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
pipeline.addLast(new ServerHandle03());
}
}
- 服务器端的handler
public class ServerHandle03 extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "read idel";
break;
case WRITER_IDLE:
eventType = "write idel";
break;
case ALL_IDLE:
eventType = "read and write idel";
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件" + eventType);
ctx.channel().close();
}
}
}
排坑细节
- 端口号是否占用
- 代码要区分ChannelHandlerContext ctx, Object evt的使用区别。
第四个Netty程序:长链接的使用
目的及介绍
- 通过WebSocket模仿TCP长链接
搭建、设计思路
- 在service的childHandler的pipeline中加入HttpObjectAggregator和WebSocketServerProtocolHandler
- HttpObjectAggregator主要用于对信息按照指定大小分块
- WebSocketServerProtocolHandler主要用于指定访问路径
- 收到的消息在TextWebSocketFrameHandle04中输出
具体步骤
- 服务端Server04的代码
public class Server04 {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossExecutors = new NioEventLoopGroup();
EventLoopGroup workExecutors = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossExecutors,workExecutors).channel(NioServerSocketChannel.class).
//handler是用于处理bossExecutors
//childHandler是用于处理workExecutors
handler(new LoggingHandler(LogLevel.INFO)).
childHandler(new WebSocketInitializer04());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9050)).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossExecutors.shutdownGracefully();
workExecutors.shutdownGracefully();
}
}
}
- 服务端WebSocketInitializer04的代码
public class WebSocketInitializer04 extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/ljfirst"));
pipeline.addLast(new TextWebSocketFrameHandle04());
}
}
- 客户端模拟交互的网页
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>long connection</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:9050/ljfirst");
socket.onmessage = function (event) {
var ta = document.getElementById("responsetext");
ta.value = ta.value + "\n" + event.data;
}
socket.onopen = function (event) {
var tt = document.getElementById("responsetext");
tt.value = "连接开启";
}
socket.onclose = function (event) {
var clt = document.getElementById("responsetext");
clt.value = clt.value + "\n" + "连接断掉";
}
} else {
alert("浏览器不支持websocket")
}
function send(msg) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(msg);
}else {
alert("连接尚未开启");
}
}
</script>
<form onsubmit="return false;">
<!-- 此处的msg 关联上方的send(msg)-->
<textarea name="msg" style="width: 400px; height: 200px"></textarea>
<input type="button" onclick="send(this.form.msg.value)" value="发送">
<h3>服务端输出:</h3>
<textarea id="responsetext" style="width: 400px; height: 200px"></textarea>
<input type="button" onclick="javascript: document.getElementById('responsetext').value=''" value="清空内容">
</form>
</body>
</html>
排坑细节
- pipeline.addLast(new HttpObjectAggregator(8192));这一句中的8192是分块大小
- html网页中form的textarea中的name 与 script中function的name需要对应