欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty + spring boot + vue聊天室

程序员文章站 2022-05-17 09:29:22
...

1 架构图

netty + spring boot + vue聊天室

2 代码结构

后端代码地址:https://gitee.com/dcy421/dcy-im

前端代码地址:https://gitee.com/dcy421/hello-im
netty + spring boot + vue聊天室

3 代码说明

common 公共包
router 业务模块
server netty服务模块

注:netty基础需要自己补充,此文不做解释。

4 netty关键代码

netty启动类

绑定端口:配置文件中配置

@Component
@Slf4j
public class BootstrapServer {

    //创建两个线程组
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    @Value("${netty.port}")
    private int port;


    @PostConstruct
    public void run() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerInitializer());
        ChannelFuture channelFuture = bootstrap.bind(port).sync();
        if (channelFuture.isSuccess()) {
            log.info("Start cim server success!!!");
        }
    }

    @PreDestroy
    public void destroy() {
        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("Close cim server success!!!");
    }
}

添加处理器

添加各种处理器,不做解释,代码中有注释

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //因为是基于http协议,使用http的编码和解码器
        pipeline.addLast(new HttpServerCodec())
                //以块的方式来写的处理器
                .addLast(new ChunkedWriteHandler())
                //聚合器,使用websocket会用到
                .addLast(new HttpObjectAggregator(1024 * 64))
                // WebSocket数据压缩
                .addLast(new WebSocketServerCompressionHandler())
                // 验证token
                .addLast(new AuthHandler())
                // Netty支持websocket
                .addLast(new WebSocketServerProtocolHandler("/ws", null, true))
                // 消息处理器
                .addLast(new MsgServerHandler());
    }
}

AuthHandler 鉴权处理器

@Slf4j
public class AuthHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    public static AttributeKey<String> USER_ID = AttributeKey.valueOf("userId");
    public static AttributeKey<List<String>> GROUP_IDS = AttributeKey.valueOf("groupIds");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        String uri = request.uri();
        if (uri.contains("/ws")) {
            List<String> strs = StrUtil.split(uri, '?');
            if (strs.size() >= 2) {
                // 获取token 验证token
                String token = JwtUtil.validateToken(strs.get(1));
                if (StrUtil.isNotBlank(token)) {
                    Map map = JSON.parseObject(token, Map.class);
                    String userId = MapUtil.get(map, "id", String.class);
                    List<String> groupIds = MapUtil.get(map, "groupIds", List.class);
                    ctx.channel().attr(USER_ID).set(userId);
                    ctx.channel().attr(GROUP_IDS).set(groupIds);
                    if (SessionSocketHolder.get(userId) == null) {
                        // 绑定用户和 channel的关系
                        SessionSocketHolder.put(userId, ctx.channel());
                        // 这个人有多少个组
                        if (CollUtil.isNotEmpty(groupIds)) {
                            for (String groupId : groupIds) {
                                if (SessionSocketHolder.getGroup(groupId) == null) {
                                    // 创建不同的channelGroup
                                    ChannelGroup channels = new DefaultChannelGroup(ctx.executor());
                                    // 把自己添加进去
                                    channels.add(ctx.channel());
                                    SessionSocketHolder.put(groupId, channels);
                                } else {
                                    // 以及创建过组了,直接添加到组即可
                                    SessionSocketHolder.putChannel(groupId, ctx.channel());
                                }
                            }
                        }
                    }
                    // 传递到下一个handler:升级握手
                    ctx.fireChannelRead(request.setUri("/ws").retain());
                } else {
                    // 验证失败
                    BaseMsg baseMsg1 = new BaseMsg();
                    baseMsg1.setContent(" token验证失败,请重新登录!!! ");
                    baseMsg1.setType(MsgType.OFFLINE);
                    baseMsg1.setEvent(MsgEvent.SERVER);
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
                    ctx.close();
                }

            }
        } else {
            // 路径不对 关闭
            ctx.close();
        }
    }


}

ctx.channel().attr(USER_ID).set(userId); 此段代码,可以把当前的用户信息带到下一个handler

ws路径示例:
ws://xxx.xxx.xxx.xxx:13000/ws?eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySnNvbiI6IntcImdyb3VwSWRzXCI6W1wiMVwiLFwiMlwiLFwiM1wiLFwiNFwiXSxcImlkXCI6XCIxXCIsXCJwYXNzd29yZFwiOlwiJDJhJDEwJGpZUzRxQUZnNGpvcy9DSkZzbmYxVWVXMUlGcGpxU1U2ZlNnc012RGtXR3VMbWhjSjRvM1BHXCIsXCJwaG9uZVwiOlwiMTM4MTIzNDEyMzRcIixcInVzZXJuYW1lXCI6XCJ6aGFuZ3NhblwifSIsImV4cCI6MTU5NjA5NDc5NX0.Ljy33RDVwZKrbUE86hAsi2jwzoOXwlg3xlA8XZEHRgMtGG8EsYFzgOR4qUSym7NqtHqbfvq62u2h7hCwNXUpVA

后面的token为了安全的

代码注释已经很详细了,不在多做解释了

MsgServerHandler 处理器

@Slf4j
public class MsgServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    public static AttributeKey<String> USER_ID = AttributeKey.valueOf("userId");
    public static AttributeKey<List<String>> GROUP_IDS = AttributeKey.valueOf("groupIds");

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            String userId = ctx.channel().attr(USER_ID).get();
            BaseMsg baseMsg1 = new BaseMsg();
            baseMsg1.setContent(userId + " 上线了 ");
            baseMsg1.setType(MsgType.ONLINE);
            baseMsg1.setEvent(MsgEvent.SERVER);
            channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
            ctx.pipeline().remove(AuthHandler.class);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 建立连接
     * 每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客户端 Channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channelGroup.add(ctx.channel());
        log.info("[服务端] - " + incoming.remoteAddress() + " 加入");
        log.info("[服务端] handlerAdded {} channelGroup 长度", channelGroup.size());
    }

    /**
     * 断开连接
     * 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        log.info("[服务端] - " + incoming.remoteAddress() + " 离开");
        unBindUserAndChannel(ctx);
        log.info("[服务端] handlerRemoved {} channelGroup 长度", channelGroup.size());
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //文本消息
        String text = msg.text();
        BaseMsg baseMsg = JSON.parseObject(text, BaseMsg.class);
        switch (baseMsg.getEvent()) {
            case P2P:
                handlerSendP2PMsg(ctx, baseMsg);
                break;
            case GROUP:
                handlerSendGroupMsg(baseMsg);
                break;
        }
    }

    /**
     * 解绑 用户和 channel的关系
     *
     * @param ctx
     */
    private void unBindUserAndChannel(ChannelHandlerContext ctx) {
        // 清除离线用户
        SessionSocketHolder.remove(ctx.channel());
        String userId = ctx.channel().attr(USER_ID).get();
        // 处理组中的人员
        List<String> groupIds = ctx.channel().attr(GROUP_IDS).get();
        if (CollUtil.isNotEmpty(groupIds)) {
            for (String groupId : groupIds) {
                // 得到组中有多少人
                ChannelGroup curChannelGroup = SessionSocketHolder.getGroup(groupId);
                if (curChannelGroup.size() == 0) {
                    SessionSocketHolder.remove(groupId);
                }
            }
        }
        BaseMsg baseMsg1 = new BaseMsg();
        baseMsg1.setContent(userId + " 离线了 ");
        baseMsg1.setType(MsgType.OFFLINE);
        baseMsg1.setEvent(MsgEvent.SERVER);
        channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        log.info("[客户端] {} 异常", incoming.remoteAddress());
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 处理 私聊发送消息
     *
     * @param ctx
     * @param baseMsg
     */
    private void handlerSendP2PMsg(ChannelHandlerContext ctx, BaseMsg baseMsg) {
        BaseMsg baseMsg2 = new BaseMsg();
        baseMsg2.setUserId(baseMsg.getUserId());
        baseMsg2.setContent(baseMsg.getContent());
        baseMsg2.setReceiveUserId(baseMsg.getReceiveUserId());
        baseMsg2.setType(MsgType.TEXT);
        baseMsg2.setEvent(MsgEvent.P2P);
        // 单机版
        /*if (SessionSocketHolder.get(baseMsg.getReceiveUserId()) != null) {
            SessionSocketHolder.get(baseMsg.getReceiveUserId()).writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg2)));
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg2)));
        } else {
            // TODO 离线消息处理
        }*/
        // 集群版
        MsgProducer producer = SpringUtil.getBean(MsgProducer.class);
        producer.sendP2PMsg(JSON.toJSONString(baseMsg2));
    }

    /**
     * 处理发送群组消息
     *
     * @param baseMsg
     */
    private void handlerSendGroupMsg(BaseMsg baseMsg) {
        BaseMsg groupMsg = new BaseMsg();
        groupMsg.setUserId(baseMsg.getUserId());
        groupMsg.setGroupId(baseMsg.getGroupId());
        groupMsg.setType(MsgType.TEXT);
        groupMsg.setEvent(MsgEvent.GROUP);
        groupMsg.setContent(baseMsg.getContent());
        // 单机版
        /*ChannelGroup channelGroup1 = SessionSocketHolder.getGroup(baseMsg.getGroupId());
        if (channelGroup1 != null) {
            channelGroup1.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(groupMsg)));
        }*/
        // 集群版
        MsgProducer producer = SpringUtil.getBean(MsgProducer.class);
        producer.sendGroupMsg(JSON.toJSONString(groupMsg));

        // TODO 离线消息处理  发送消息体缓存本地 + 远程DB

    }
}

此demo 全局消息暂时没优化,后续会慢慢优化。

MsgConsumer 消费消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}", messageModel = MessageModel.BROADCASTING)
public class MsgConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("收到的消息:" + message);
        BaseMsg baseMsg = JSON.parseObject(message, BaseMsg.class);
        switch (baseMsg.getEvent()) {
            case P2P:
                // 接收人的
                Channel receiveChannel = SessionSocketHolder.get(baseMsg.getReceiveUserId());
                if (receiveChannel != null) {
                    receiveChannel.writeAndFlush(new TextWebSocketFrame(message));
                    log.info("接收人的 发送成功");
                }
                // 发送人的
                Channel userChannel = SessionSocketHolder.get(baseMsg.getUserId());
                if (userChannel != null) {
                    userChannel.writeAndFlush(new TextWebSocketFrame(message));
                    log.info("发送人的 发送成功");
                }
                break;
            case GROUP:
                // 收到消息,获取群组的人,然后推送消息
                ChannelGroup channelGroup = SessionSocketHolder.getGroup(baseMsg.getGroupId());
                if (channelGroup != null) {
                    channelGroup.writeAndFlush(new TextWebSocketFrame(message));
                }
                break;
        }

    }
}

messageModel = MessageModel.BROADCASTING 广播模式,默认集群模式
onMessage方法里面的逻辑很清晰,不做解释了。

5 前端代码

netty + spring boot + vue聊天室
前端代码不做解释,自行看代码,vue基础,vue基础,vue基础

演示效果

登录

netty + spring boot + vue聊天室
用户名:zhangsan或者lisi
密码:123456

netty + spring boot + vue聊天室
netty + spring boot + vue聊天室

好友列表

netty + spring boot + vue聊天室

群组列表

netty + spring boot + vue聊天室

发送单聊消息

netty + spring boot + vue聊天室
netty + spring boot + vue聊天室

群聊消息

netty + spring boot + vue聊天室
点击群聊,点击测试群,即可发送消息,其他群暂时没有用户

控制台消息

netty + spring boot + vue聊天室

6 运行

1、首先运行Nacos 注册中心
2、运行RocketMQ
3、运行RouterApplication 服务
4、运行ServerApplication 服务

可以多次启动ServerApplication 已测试集群模式。

7 最后寄语

第一次用netty编写聊天室,有哪里写的不好的,请大家多提意见,有时间我会慢慢优化此项目。离线消息和其他的业务暂时没处理,后续会慢慢完善。谢谢大家看完此帖子。