java根据Netty的WebSocket协议开发
程序员文章站
2022-04-23 11:50:15
...
更多干货
转载:https://www.cnblogs.com/ananaini/p/9446062.html
1、Websocketservice
package websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* Created by lz on 2016/8/12.
*/
public class WebSocketServer {
public void run(int port) throws Exception{
//创建两组线程,监听连接和工作
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//Netty用于启动Nio服务端的启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
//注册NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//注册处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//用于Http请求的编码或者解码
pipeline.addLast("http-codec", new HttpServerCodec());
//把Http消息组成完整地HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
//向客户端发送HTML5文件
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
//实际处理的Handler
pipeline.addLast("handler", new WebSocketServerHandler());
}
});
Channel ch = b.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
ch.closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new WebSocketServer().run(port);
}
}
2、主要是处理我们系统逻辑的Handler
package websocket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
/**
* Created by lz on 2016/8/12.
*/
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
private WebSocketServerHandshaker handshaker;
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
//传统的HTTP接入
if (msg instanceof FullHttpRequest){
handleHttpRequest(ctx,(FullHttpRequest)msg);
}//WebSocket接入
else if (msg instanceof WebSocketFrame){
handleWebSocketFrame(ctx,(WebSocketFrame)msg);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
//判断是否关闭链路指令
if (frame instanceof CloseWebSocketFrame){
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
//判断是否是Ping消息
if (frame instanceof PingWebSocketFrame){
ctx.channel().write(new PongWebSocketFrame(frame.content()).retain());
return;
}
//本例程仅支持文本信息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)){
throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName()));
}
//返回应答消息
String request = ((TextWebSocketFrame) frame).text();
if (logger.isLoggable(Level.FINE)){
logger.fine(String.format("%s received %s",ctx.channel(),request));
}
ctx.channel().write(new TextWebSocketFrame(request+"欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString()));
}
private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse res){
//返回应答给客户端
if (res.getStatus().code() != 200){
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res,res.content().readableBytes());
}
//如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.getStatus().code() != 200){
f.addListener(ChannelFutureListener.CLOSE);
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{
//如果HTTP解码失败,返回HTTP异常
if (!req.getDecoderResult().isSuccess()
|| (!"websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//构造握手响应返回,本机测试
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket",null,false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else {
//把握手消息返回给客户端
handshaker.handshake(ctx.channel(),req);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
推荐阅读