10、netty结合websocket完成消息的单发和群发
程序员文章站
2024-02-17 17:45:04
...
注:源代码来自享学课堂,略有修改,学习之后所做笔记,方便回顾,也给大家一个参考
服务端
主函数
和普通的服务代码相同,这里加上了ssl的支持,如果不需要ssl支持,默认是false
package com.gg.socket.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.ImmediateEventExecutor;
/**
* @Description WebSecketServer
* @Author honry.guan
* @Date 2020/5/26 17:22
*/
public class WebSocketServer {
/**
* 创建DefaultChannelGroup,保存已有链接socketChannel,群发和一对一功能可以使用
*/
public static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
/**
* 是否启用ssl,即https
*/
public static final boolean SSL = false;
/**
* 通过ssl访问8443,否则就访问8080
*/
public static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// SSL配置
final SslContext sslContext;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslContext = null;
}
// 一个线程负责接受新的连接,一个负责处理读写
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(sslContext, CHANNEL_GROUP));
Channel ch = b.bind(PORT).sync().channel();
System.out.println("打开浏览器访问: " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
WebSocketServerInitializer
websocket需要借助http握手支持
package com.gg.socket.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslContext;
/**
* @Description WebSocketServerInitializer
* @Author honry.guan
* @Date 2020/5/26 17:22
*/
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelGroup group;
/**
* websocket访问路径
*/
public static final String WEBSOCKET_PATH = "/websocket";
/**
* ssl支持
*/
private final SslContext sslCtx;
public WebSocketServerInitializer(SslContext sslCtx, ChannelGroup group) {
this.sslCtx = sslCtx;
this.group = group;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if(sslCtx != null){
pipeline.addLast(sslCtx.newHandler((socketChannel.alloc())));
}
//http握手支持
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(655536));
//netty提供的,支持websocket应答数据压缩传输
pipeline.addLast(new WebSocketServerCompressionHandler());
//netty提供,对整个websocket的通信进行了初始化(发现http保温中国有升级为websocket的请你去,包括握手,以及后面的一些通信控制)
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,null,true));
//浏览器访问是,展示index页面,通过代码生成的HTML页面
pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH));
//对websocket的数据记性处理
pipeline.addLast(new ProcessWsFrameHandler(group));
}
}
ProcessWsIndexPageHandler
通过http请求访问:8080/的时候,会返回一个HTML页面
package com.gg.socket.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
/**
* @Description ProcessWsIndexPageHandler
* @Author honry.guan
* @Date 2020/5/26 17:23
*/
public class ProcessWsIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String webSocketPath;
public ProcessWsIndexPageHandler(String websocketPath) {
this.webSocketPath = websocketPath;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
//处理发送错误或者无法解析的http请求
if (!fullHttpRequest.decoderResult().isSuccess()) {
sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//只允许get请求
if (fullHttpRequest.method() != HttpMethod.GET) {
sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
return;
}
//发送index页面内容
if ("/".equals(fullHttpRequest.getUri()) || "index.html".equals(fullHttpRequest.getUri())) {
//生成websocket的访问地址,写入index页面中
String webSocketLocation = getWebSocketLocation(channelHandlerContext.pipeline(), fullHttpRequest,
webSocketPath);
System.out.println("webSocketLocation:" + webSocketLocation);
//生成index页面的具体内容,发送浏览器
ByteBuf byteBuf = MakeIndexPage.getContent(webSocketLocation);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
HttpUtil.setContentLength(res, byteBuf.readableBytes());
sendHttpResponse(channelHandlerContext, fullHttpRequest, res);
} else {
sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, NOT_FOUND));
}
}
private String getWebSocketLocation(ChannelPipeline pipeline, FullHttpRequest fullHttpRequest, String path) {
String protocol = "ws";
if (pipeline.get(SslHandler.class) != null) {
protocol = "wss";
}
return protocol + "://" + fullHttpRequest.headers().get(HttpHeaderNames.HOST) + path;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private void sendHttpResponse(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest, FullHttpResponse fullHttpResponse) {
//对错误的请求进行处理
if (fullHttpResponse.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(fullHttpResponse.status().toString(), CharsetUtil.UTF_8);
fullHttpResponse.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(fullHttpResponse, fullHttpRequest.content().readableBytes());
}
//发送应答
ChannelFuture f = channelHandlerContext.channel().writeAndFlush(fullHttpResponse);
//对于不是长链接或者错误的额请求直接关闭连接
if (!HttpUtil.isKeepAlive(fullHttpRequest) || fullHttpResponse.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
ProcessWsFrameHandler
对浏览器或者客户端发送的消息进行处理,再对当前channel或者所有连接上的客户端拳法消息
package com.gg.socket.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import java.util.Locale;
import java.util.logging.Logger;
/**
* @Description ProcessWsFrameHandler
* @Author honry.guan
* @Date 2020/5/26 17:23
*/
public class ProcessWsFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final ChannelGroup group;
public ProcessWsFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 判断是否是文本帧,目前只处理文本帧
if(frame instanceof TextWebSocketFrame){
String request = ((TextWebSocketFrame)frame).text();
System.out.println("服务端收到消息:channelId="+ctx.channel().id()+" 消息:" + request);
//这个是对当前channel的单发消息
ctx.channel().writeAndFlush(new TextWebSocketFrame(("客户端收到单发消息:"+request).toUpperCase(Locale.CHINA)));
//下面是群发,group保存的是添加进来的channel
group.writeAndFlush(new TextWebSocketFrame(("客户端收到群发消息:"+request).toUpperCase(Locale.CHINA)));
}else{
String message = "不支持的文本类型:"+ frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
/**
* 重写自定义处理事件
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//检测事件,如果握手成功事件,做业务处理
if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
//通知所有已经连接的websocket客户端那新的客户端已经连接上了
group.writeAndFlush(new TextWebSocketFrame("客户端:"+ctx.channel().id()+"连接上了"));
//将新的websocket channel添加到channelgroup中,一遍他客户已接收所有的消息
group.add(ctx.channel());
}else{
super.userEventTriggered(ctx,evt);
}
}
}
MakeIndexPage
生成html的工具类,当然也可以自己新建一个HTML文本来访问服务器
package com.gg.socket.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public final class MakeIndexPage {
private static final String NEWLINE = "\r\n";
public static ByteBuf getContent(String webSocketLocation) {
return Unpooled.copiedBuffer(
"<html><head><title>Web Socket Test</title></head>"
+ NEWLINE +
"<body>" + NEWLINE +
"<script type=\"text/javascript\">" + NEWLINE +
"var socket;" + NEWLINE +
"if (!window.WebSocket) {" + NEWLINE +
" window.WebSocket = window.MozWebSocket;" + NEWLINE +
'}' + NEWLINE +
"if (window.WebSocket) {" + NEWLINE +
" socket = new WebSocket(\"" + webSocketLocation + "\");"
+ NEWLINE +
" socket.onmessage = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = ta.value + '\\n' + event.data" + NEWLINE +
" };" + NEWLINE +
" socket.onopen = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = \"Web Socket opened!\";" + NEWLINE +
" };" + NEWLINE +
" socket.onclose = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = ta.value + \"Web Socket closed\"; "
+ NEWLINE +
" };" + NEWLINE +
"} else {" + NEWLINE +
" alert(\"Your browser does not support Web Socket.\");"
+ NEWLINE +
'}' + NEWLINE +
NEWLINE +
"function send(message) {" + NEWLINE +
" if (!window.WebSocket) { return; }" + NEWLINE +
" if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
" socket.send(message);" + NEWLINE +
" } else {" + NEWLINE +
" alert(\"The socket is not open.\");" + NEWLINE +
" }" + NEWLINE +
'}' + NEWLINE +
"</script>" + NEWLINE +
"<form οnsubmit=\"return false;\">" + NEWLINE +
"<input type=\"text\" name=\"message\" " +
"value=\"Hello, World!\"/>" +
"<input type=\"button\" value=\"Send Web Socket Data\""
+ NEWLINE +
" οnclick=\"send(this.form.message.value)\" />"
+ NEWLINE +
"<h3>Output</h3>" + NEWLINE +
"<textarea id=\"responseText\" " +
"style=\"width:500px;height:300px;\"></textarea>"
+ NEWLINE +
"</form>" + NEWLINE +
"</body>" + NEWLINE +
"</html>" + NEWLINE, CharsetUtil.US_ASCII);
}
}
生成之后的HTML代码,也可以自定义一个html文件,直接打开浏览器
<html><head><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:8080/websocket");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "Web Socket opened!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "Web Socket closed";
};
} else {
alert("Your browser does not support Web Socket.");
}
function send(message) {
if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("The socket is not open.");
}
}
</script>
<form οnsubmit="return false;">
<input type="text" name="message" value="Hello, World!"/><input type="button" value="Send Web Socket Data"
οnclick="send(this.form.message.value)" />
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>
通过浏览器直接发送和接收消息
启动服务,访问http://127.0.0.1:8080/,这个时候就能够和服务器进行通信了,如图
服务端收到消息:
channelId=c7882173 消息:Hello, World!
通过自定义客户端发送和接收消息
主函数
package com.gg.socket.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
/**
* @Description WebSocketClient
* @Author honry.guan
* @Date 2020/5/25 10:47
*/
public class WebSocketClient {
public static final String URL = System.getProperty("url", "ws://127.0.0.1:8080/websocket");
public static final String SURL = System.getProperty("url", "ws://127.0.0.1:8843/websocket");
public static void main(String[] args) throws Exception {
URI uri = new URI(URL);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port = uri.getPort();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.out.println("只支持ws");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(socketChannel.alloc(), host, port));
}
//http协议握手
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
//支持websocket数据压缩
p.addLast(WebSocketClientCompressionHandler.INSTANCE);
p.addLast(handler);
}
});
//连接服务器
Channel ch = b.connect(uri.getHost(), port).sync().channel();
//等待握手完成
handler.handshakeFuture().sync();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) {
WebSocketFrame frame = new PingWebSocketFrame(
Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
}
自定义handler
package com.gg.socket.client;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
/**
* @Description WebSocketClientHandler
* @Author honry.guan
* @Date 2020/5/25 10:47
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
/**
* 负责和服务器握手
*/
private final WebSocketClientHandshaker handshaker;
/**
* 握手结果
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接断开");
}
/**
* 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
/**
* 读取数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
//握手未完成,完成握手
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("完成连接");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("握手连接失败");
handshakeFuture.setFailure(e);
}
return;
}
//握手完成,升级为websocket,不应该再收到http报文
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
//处理websocket报文
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
System.out.println("收到消息:" + textWebSocketFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("客户端收到pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("客户端手动关闭");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
在窗口发送123
完成连接
你好
收到消息:客户端收到单发消息:你好
收到消息:客户端收到群发消息:你好