欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

10、netty结合websocket完成消息的单发和群发

程序员文章站 2024-02-17 17:45:04
...

注:源代码来自享学课堂,略有修改,学习之后所做笔记,方便回顾,也给大家一个参考

服务端

主函数

和普通的服务代码相同,这里加上了ssl的支持,如果不需要ssl支持,默认是false

package com.gg.socket.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.ImmediateEventExecutor;

/**
 * @Description WebSecketServer
 * @Author honry.guan
 * @Date 2020/5/26 17:22
 */
public class WebSocketServer {
    /**
     * 创建DefaultChannelGroup,保存已有链接socketChannel,群发和一对一功能可以使用
     */
    public static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

    /**
     * 是否启用ssl,即https
     */
    public static final boolean SSL = false;

    /**
     * 通过ssl访问8443,否则就访问8080
     */
    public static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8080"));

    public static void main(String[] args) throws Exception {
        // SSL配置
        final SslContext sslContext;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslContext = null;
        }

        // 一个线程负责接受新的连接,一个负责处理读写
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WebSocketServerInitializer(sslContext, CHANNEL_GROUP));

            Channel ch = b.bind(PORT).sync().channel();
            System.out.println("打开浏览器访问: " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

WebSocketServerInitializer

websocket需要借助http握手支持

package com.gg.socket.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslContext;

/**
 * @Description WebSocketServerInitializer
 * @Author honry.guan
 * @Date 2020/5/26 17:22
 */
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    private final ChannelGroup group;

    /**
     * websocket访问路径
     */
    public static final String WEBSOCKET_PATH = "/websocket";

    /**
     * ssl支持
     */
    private final SslContext sslCtx;

    public WebSocketServerInitializer(SslContext sslCtx, ChannelGroup group) {
        this.sslCtx = sslCtx;
        this.group = group;
    }
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        if(sslCtx != null){
            pipeline.addLast(sslCtx.newHandler((socketChannel.alloc())));
        }

        //http握手支持
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(655536));

        //netty提供的,支持websocket应答数据压缩传输
        pipeline.addLast(new WebSocketServerCompressionHandler());
        //netty提供,对整个websocket的通信进行了初始化(发现http保温中国有升级为websocket的请你去,包括握手,以及后面的一些通信控制)
        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,null,true));

        //浏览器访问是,展示index页面,通过代码生成的HTML页面
        pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH));

        //对websocket的数据记性处理
        pipeline.addLast(new ProcessWsFrameHandler(group));
    }
}

ProcessWsIndexPageHandler

通过http请求访问:8080/的时候,会返回一个HTML页面

package com.gg.socket.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;

import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;

/**
 * @Description ProcessWsIndexPageHandler
 * @Author honry.guan
 * @Date 2020/5/26 17:23
 */
public class ProcessWsIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String webSocketPath;

    public ProcessWsIndexPageHandler(String websocketPath) {
        this.webSocketPath = websocketPath;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        //处理发送错误或者无法解析的http请求
        if (!fullHttpRequest.decoderResult().isSuccess()) {
            sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        //只允许get请求
        if (fullHttpRequest.method() != HttpMethod.GET) {
            sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
            return;
        }

        //发送index页面内容
        if ("/".equals(fullHttpRequest.getUri()) || "index.html".equals(fullHttpRequest.getUri())) {
            //生成websocket的访问地址,写入index页面中
            String webSocketLocation = getWebSocketLocation(channelHandlerContext.pipeline(), fullHttpRequest,
                    webSocketPath);
            System.out.println("webSocketLocation:" + webSocketLocation);

            //生成index页面的具体内容,发送浏览器
            ByteBuf byteBuf = MakeIndexPage.getContent(webSocketLocation);
            FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);

            res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
            HttpUtil.setContentLength(res, byteBuf.readableBytes());

            sendHttpResponse(channelHandlerContext, fullHttpRequest, res);
        } else {
            sendHttpResponse(channelHandlerContext, fullHttpRequest, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, NOT_FOUND));
        }
    }

    private String getWebSocketLocation(ChannelPipeline pipeline, FullHttpRequest fullHttpRequest, String path) {
        String protocol = "ws";
        if (pipeline.get(SslHandler.class) != null) {
            protocol = "wss";
        }
        return protocol + "://" + fullHttpRequest.headers().get(HttpHeaderNames.HOST) + path;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    private void sendHttpResponse(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest, FullHttpResponse fullHttpResponse) {
        //对错误的请求进行处理
        if (fullHttpResponse.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(fullHttpResponse.status().toString(), CharsetUtil.UTF_8);
            fullHttpResponse.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(fullHttpResponse, fullHttpRequest.content().readableBytes());
        }

        //发送应答
        ChannelFuture f = channelHandlerContext.channel().writeAndFlush(fullHttpResponse);
        //对于不是长链接或者错误的额请求直接关闭连接
        if (!HttpUtil.isKeepAlive(fullHttpRequest) || fullHttpResponse.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

ProcessWsFrameHandler

对浏览器或者客户端发送的消息进行处理,再对当前channel或者所有连接上的客户端拳法消息

package com.gg.socket.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import java.util.Locale;
import java.util.logging.Logger;


/**
 * @Description ProcessWsFrameHandler
 * @Author honry.guan
 * @Date 2020/5/26 17:23
 */
public class ProcessWsFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    private final ChannelGroup group;

    public ProcessWsFrameHandler(ChannelGroup group) {
        this.group = group;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 判断是否是文本帧,目前只处理文本帧
        if(frame instanceof TextWebSocketFrame){
            String request = ((TextWebSocketFrame)frame).text();
            System.out.println("服务端收到消息:channelId="+ctx.channel().id()+" 消息:" + request);

            //这个是对当前channel的单发消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame(("客户端收到单发消息:"+request).toUpperCase(Locale.CHINA)));
            //下面是群发,group保存的是添加进来的channel
            group.writeAndFlush(new TextWebSocketFrame(("客户端收到群发消息:"+request).toUpperCase(Locale.CHINA)));

        }else{
            String message = "不支持的文本类型:"+ frame.getClass().getName();
            throw new UnsupportedOperationException(message);
        }
    }

    /**
     * 重写自定义处理事件
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //检测事件,如果握手成功事件,做业务处理
        if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
            //通知所有已经连接的websocket客户端那新的客户端已经连接上了
            group.writeAndFlush(new TextWebSocketFrame("客户端:"+ctx.channel().id()+"连接上了"));
            //将新的websocket channel添加到channelgroup中,一遍他客户已接收所有的消息
            group.add(ctx.channel());
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
}

MakeIndexPage

生成html的工具类,当然也可以自己新建一个HTML文本来访问服务器

package com.gg.socket.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;

public final class MakeIndexPage {

    private static final String NEWLINE = "\r\n";

    public static ByteBuf getContent(String webSocketLocation) {
        return Unpooled.copiedBuffer(
                "<html><head><title>Web Socket Test</title></head>"
                        + NEWLINE +
                        "<body>" + NEWLINE +
                        "<script type=\"text/javascript\">" + NEWLINE +
                        "var socket;" + NEWLINE +
                        "if (!window.WebSocket) {" + NEWLINE +
                        "  window.WebSocket = window.MozWebSocket;" + NEWLINE +
                        '}' + NEWLINE +
                        "if (window.WebSocket) {" + NEWLINE +
                        "  socket = new WebSocket(\"" + webSocketLocation + "\");"
                        + NEWLINE +
                        "  socket.onmessage = function(event) {" + NEWLINE +
                        "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                        "    ta.value = ta.value + '\\n' + event.data" + NEWLINE +
                        "  };" + NEWLINE +
                        "  socket.onopen = function(event) {" + NEWLINE +
                        "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                        "    ta.value = \"Web Socket opened!\";" + NEWLINE +
                        "  };" + NEWLINE +
                        "  socket.onclose = function(event) {" + NEWLINE +
                        "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                        "    ta.value = ta.value + \"Web Socket closed\"; "
                        + NEWLINE +
                        "  };" + NEWLINE +
                        "} else {" + NEWLINE +
                        "  alert(\"Your browser does not support Web Socket.\");"
                        + NEWLINE +
                        '}' + NEWLINE +
                        NEWLINE +
                        "function send(message) {" + NEWLINE +
                        "  if (!window.WebSocket) { return; }" + NEWLINE +
                        "  if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
                        "    socket.send(message);" + NEWLINE +
                        "  } else {" + NEWLINE +
                        "    alert(\"The socket is not open.\");" + NEWLINE +
                        "  }" + NEWLINE +
                        '}' + NEWLINE +
                        "</script>" + NEWLINE +
                        "<form οnsubmit=\"return false;\">" + NEWLINE +
                        "<input type=\"text\" name=\"message\" " +
                        "value=\"Hello, World!\"/>" +
                        "<input type=\"button\" value=\"Send Web Socket Data\""
                        + NEWLINE +
                        "       οnclick=\"send(this.form.message.value)\" />"
                        + NEWLINE +
                        "<h3>Output</h3>" + NEWLINE +
                        "<textarea id=\"responseText\" " +
                        "style=\"width:500px;height:300px;\"></textarea>"
                        + NEWLINE +
                        "</form>" + NEWLINE +
                        "</body>" + NEWLINE +
                        "</html>" + NEWLINE, CharsetUtil.US_ASCII);
    }

}

生成之后的HTML代码,也可以自定义一个html文件,直接打开浏览器

<html><head><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
  window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
  socket = new WebSocket("ws://127.0.0.1:8080/websocket");
  socket.onmessage = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = ta.value + '\n' + event.data
  };
  socket.onopen = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = "Web Socket opened!";
  };
  socket.onclose = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = ta.value + "Web Socket closed"; 
  };
} else {
  alert("Your browser does not support Web Socket.");
}

function send(message) {
  if (!window.WebSocket) { return; }
  if (socket.readyState == WebSocket.OPEN) {
    socket.send(message);
  } else {
    alert("The socket is not open.");
  }
}
</script>
<form οnsubmit="return false;">
<input type="text" name="message" value="Hello, World!"/><input type="button" value="Send Web Socket Data"
       οnclick="send(this.form.message.value)" />
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>

通过浏览器直接发送和接收消息

启动服务,访问http://127.0.0.1:8080/,这个时候就能够和服务器进行通信了,如图

10、netty结合websocket完成消息的单发和群发

服务端收到消息:

channelId=c7882173 消息:Hello, World!

 

通过自定义客户端发送和接收消息

主函数

package com.gg.socket.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

/**
 * @Description WebSocketClient
 * @Author honry.guan
 * @Date 2020/5/25 10:47
 */
public class WebSocketClient {
    public static final String URL = System.getProperty("url", "ws://127.0.0.1:8080/websocket");
    public static final String SURL = System.getProperty("url", "ws://127.0.0.1:8843/websocket");

    public static void main(String[] args) throws Exception {
        URI uri = new URI(URL);
        String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
        final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
        final int port = uri.getPort();

        if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
            System.out.println("只支持ws");
            return;
        }

        final boolean ssl = "wss".equalsIgnoreCase(scheme);
        final SslContext sslCtx;
        if (ssl) {
            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));

            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(socketChannel.alloc(), host, port));
                            }
                            //http协议握手
                            p.addLast(new HttpClientCodec());
                            p.addLast(new HttpObjectAggregator(8192));
                            //支持websocket数据压缩
                            p.addLast(WebSocketClientCompressionHandler.INSTANCE);
                            p.addLast(handler);
                        }
                    });
            //连接服务器
            Channel ch = b.connect(uri.getHost(), port).sync().channel();
            //等待握手完成
            handler.handshakeFuture().sync();

            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = console.readLine();
                if (msg == null) {
                    break;
                } else if ("bye".equals(msg.toLowerCase())) {
                    ch.writeAndFlush(new CloseWebSocketFrame());
                    ch.closeFuture().sync();
                    break;
                } else if ("ping".equals(msg.toLowerCase())) {
                    WebSocketFrame frame = new PingWebSocketFrame(
                            Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
                    ch.writeAndFlush(frame);
                } else {
                    WebSocketFrame frame = new TextWebSocketFrame(msg);
                    ch.writeAndFlush(frame);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }

}

自定义handler

package com.gg.socket.client;

import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

/**
 * @Description WebSocketClientHandler
 * @Author honry.guan
 * @Date 2020/5/25 10:47
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 负责和服务器握手
     */
    private final WebSocketClientHandshaker handshaker;
    /**
     * 握手结果
     */
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接断开");
    }

    /**
     * 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        handshakeFuture = ctx.newPromise();
    }

    /**
     * 读取数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        //握手未完成,完成握手
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("完成连接");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                System.out.println("握手连接失败");
                handshakeFuture.setFailure(e);
            }
            return;
        }
        //握手完成,升级为websocket,不应该再收到http报文
        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }

        //处理websocket报文
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            System.out.println("收到消息:" + textWebSocketFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("客户端收到pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("客户端手动关闭");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

在窗口发送123

完成连接
你好
收到消息:客户端收到单发消息:你好
收到消息:客户端收到群发消息:你好