netty + spring boot + vue聊天室
netty + spring boot + websocket + vue聊天室
1 架构图
2 代码结构
后端代码地址:https://gitee.com/dcy421/dcy-im
前端代码地址:https://gitee.com/dcy421/hello-im
3 代码说明
common 公共包
router 业务模块
server netty服务模块
注:netty基础需要自己补充,此文不做解释。
4 netty关键代码
netty启动类
绑定端口:配置文件中配置
@Component
@Slf4j
public class BootstrapServer {
//创建两个线程组
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@PostConstruct
public void run() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(port).sync();
if (channelFuture.isSuccess()) {
log.info("Start cim server success!!!");
}
}
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("Close cim server success!!!");
}
}
添加处理器
添加各种处理器,不做解释,代码中有注释
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为是基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec())
//以块的方式来写的处理器
.addLast(new ChunkedWriteHandler())
//聚合器,使用websocket会用到
.addLast(new HttpObjectAggregator(1024 * 64))
// WebSocket数据压缩
.addLast(new WebSocketServerCompressionHandler())
// 验证token
.addLast(new AuthHandler())
// Netty支持websocket
.addLast(new WebSocketServerProtocolHandler("/ws", null, true))
// 消息处理器
.addLast(new MsgServerHandler());
}
}
AuthHandler 鉴权处理器
@Slf4j
public class AuthHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public static AttributeKey<String> USER_ID = AttributeKey.valueOf("userId");
public static AttributeKey<List<String>> GROUP_IDS = AttributeKey.valueOf("groupIds");
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String uri = request.uri();
if (uri.contains("/ws")) {
List<String> strs = StrUtil.split(uri, '?');
if (strs.size() >= 2) {
// 获取token 验证token
String token = JwtUtil.validateToken(strs.get(1));
if (StrUtil.isNotBlank(token)) {
Map map = JSON.parseObject(token, Map.class);
String userId = MapUtil.get(map, "id", String.class);
List<String> groupIds = MapUtil.get(map, "groupIds", List.class);
ctx.channel().attr(USER_ID).set(userId);
ctx.channel().attr(GROUP_IDS).set(groupIds);
if (SessionSocketHolder.get(userId) == null) {
// 绑定用户和 channel的关系
SessionSocketHolder.put(userId, ctx.channel());
// 这个人有多少个组
if (CollUtil.isNotEmpty(groupIds)) {
for (String groupId : groupIds) {
if (SessionSocketHolder.getGroup(groupId) == null) {
// 创建不同的channelGroup
ChannelGroup channels = new DefaultChannelGroup(ctx.executor());
// 把自己添加进去
channels.add(ctx.channel());
SessionSocketHolder.put(groupId, channels);
} else {
// 以及创建过组了,直接添加到组即可
SessionSocketHolder.putChannel(groupId, ctx.channel());
}
}
}
}
// 传递到下一个handler:升级握手
ctx.fireChannelRead(request.setUri("/ws").retain());
} else {
// 验证失败
BaseMsg baseMsg1 = new BaseMsg();
baseMsg1.setContent(" token验证失败,请重新登录!!! ");
baseMsg1.setType(MsgType.OFFLINE);
baseMsg1.setEvent(MsgEvent.SERVER);
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
ctx.close();
}
}
} else {
// 路径不对 关闭
ctx.close();
}
}
}
ctx.channel().attr(USER_ID).set(userId);
此段代码,可以把当前的用户信息带到下一个handler
ws路径示例:
ws://xxx.xxx.xxx.xxx:13000/ws?eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySnNvbiI6IntcImdyb3VwSWRzXCI6W1wiMVwiLFwiMlwiLFwiM1wiLFwiNFwiXSxcImlkXCI6XCIxXCIsXCJwYXNzd29yZFwiOlwiJDJhJDEwJGpZUzRxQUZnNGpvcy9DSkZzbmYxVWVXMUlGcGpxU1U2ZlNnc012RGtXR3VMbWhjSjRvM1BHXCIsXCJwaG9uZVwiOlwiMTM4MTIzNDEyMzRcIixcInVzZXJuYW1lXCI6XCJ6aGFuZ3NhblwifSIsImV4cCI6MTU5NjA5NDc5NX0.Ljy33RDVwZKrbUE86hAsi2jwzoOXwlg3xlA8XZEHRgMtGG8EsYFzgOR4qUSym7NqtHqbfvq62u2h7hCwNXUpVA
后面的token为了安全的
代码注释已经很详细了,不在多做解释了
MsgServerHandler 处理器
@Slf4j
public class MsgServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static AttributeKey<String> USER_ID = AttributeKey.valueOf("userId");
public static AttributeKey<List<String>> GROUP_IDS = AttributeKey.valueOf("groupIds");
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
String userId = ctx.channel().attr(USER_ID).get();
BaseMsg baseMsg1 = new BaseMsg();
baseMsg1.setContent(userId + " 上线了 ");
baseMsg1.setType(MsgType.ONLINE);
baseMsg1.setEvent(MsgEvent.SERVER);
channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
ctx.pipeline().remove(AuthHandler.class);
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* 建立连接
* 每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客户端 Channel
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channelGroup.add(ctx.channel());
log.info("[服务端] - " + incoming.remoteAddress() + " 加入");
log.info("[服务端] handlerAdded {} channelGroup 长度", channelGroup.size());
}
/**
* 断开连接
* 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
log.info("[服务端] - " + incoming.remoteAddress() + " 离开");
unBindUserAndChannel(ctx);
log.info("[服务端] handlerRemoved {} channelGroup 长度", channelGroup.size());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//文本消息
String text = msg.text();
BaseMsg baseMsg = JSON.parseObject(text, BaseMsg.class);
switch (baseMsg.getEvent()) {
case P2P:
handlerSendP2PMsg(ctx, baseMsg);
break;
case GROUP:
handlerSendGroupMsg(baseMsg);
break;
}
}
/**
* 解绑 用户和 channel的关系
*
* @param ctx
*/
private void unBindUserAndChannel(ChannelHandlerContext ctx) {
// 清除离线用户
SessionSocketHolder.remove(ctx.channel());
String userId = ctx.channel().attr(USER_ID).get();
// 处理组中的人员
List<String> groupIds = ctx.channel().attr(GROUP_IDS).get();
if (CollUtil.isNotEmpty(groupIds)) {
for (String groupId : groupIds) {
// 得到组中有多少人
ChannelGroup curChannelGroup = SessionSocketHolder.getGroup(groupId);
if (curChannelGroup.size() == 0) {
SessionSocketHolder.remove(groupId);
}
}
}
BaseMsg baseMsg1 = new BaseMsg();
baseMsg1.setContent(userId + " 离线了 ");
baseMsg1.setType(MsgType.OFFLINE);
baseMsg1.setEvent(MsgEvent.SERVER);
channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg1)));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
log.info("[客户端] {} 异常", incoming.remoteAddress());
cause.printStackTrace();
ctx.close();
}
/**
* 处理 私聊发送消息
*
* @param ctx
* @param baseMsg
*/
private void handlerSendP2PMsg(ChannelHandlerContext ctx, BaseMsg baseMsg) {
BaseMsg baseMsg2 = new BaseMsg();
baseMsg2.setUserId(baseMsg.getUserId());
baseMsg2.setContent(baseMsg.getContent());
baseMsg2.setReceiveUserId(baseMsg.getReceiveUserId());
baseMsg2.setType(MsgType.TEXT);
baseMsg2.setEvent(MsgEvent.P2P);
// 单机版
/*if (SessionSocketHolder.get(baseMsg.getReceiveUserId()) != null) {
SessionSocketHolder.get(baseMsg.getReceiveUserId()).writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg2)));
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(baseMsg2)));
} else {
// TODO 离线消息处理
}*/
// 集群版
MsgProducer producer = SpringUtil.getBean(MsgProducer.class);
producer.sendP2PMsg(JSON.toJSONString(baseMsg2));
}
/**
* 处理发送群组消息
*
* @param baseMsg
*/
private void handlerSendGroupMsg(BaseMsg baseMsg) {
BaseMsg groupMsg = new BaseMsg();
groupMsg.setUserId(baseMsg.getUserId());
groupMsg.setGroupId(baseMsg.getGroupId());
groupMsg.setType(MsgType.TEXT);
groupMsg.setEvent(MsgEvent.GROUP);
groupMsg.setContent(baseMsg.getContent());
// 单机版
/*ChannelGroup channelGroup1 = SessionSocketHolder.getGroup(baseMsg.getGroupId());
if (channelGroup1 != null) {
channelGroup1.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(groupMsg)));
}*/
// 集群版
MsgProducer producer = SpringUtil.getBean(MsgProducer.class);
producer.sendGroupMsg(JSON.toJSONString(groupMsg));
// TODO 离线消息处理 发送消息体缓存本地 + 远程DB
}
}
此demo 全局消息暂时没优化,后续会慢慢优化。
MsgConsumer 消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}", messageModel = MessageModel.BROADCASTING)
public class MsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到的消息:" + message);
BaseMsg baseMsg = JSON.parseObject(message, BaseMsg.class);
switch (baseMsg.getEvent()) {
case P2P:
// 接收人的
Channel receiveChannel = SessionSocketHolder.get(baseMsg.getReceiveUserId());
if (receiveChannel != null) {
receiveChannel.writeAndFlush(new TextWebSocketFrame(message));
log.info("接收人的 发送成功");
}
// 发送人的
Channel userChannel = SessionSocketHolder.get(baseMsg.getUserId());
if (userChannel != null) {
userChannel.writeAndFlush(new TextWebSocketFrame(message));
log.info("发送人的 发送成功");
}
break;
case GROUP:
// 收到消息,获取群组的人,然后推送消息
ChannelGroup channelGroup = SessionSocketHolder.getGroup(baseMsg.getGroupId());
if (channelGroup != null) {
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
}
break;
}
}
}
messageModel = MessageModel.BROADCASTING
广播模式,默认集群模式onMessage
方法里面的逻辑很清晰,不做解释了。
5 前端代码
前端代码不做解释,自行看代码,vue基础,vue基础,vue基础
演示效果
登录
用户名:zhangsan或者lisi
密码:123456
好友列表
群组列表
发送单聊消息
群聊消息
点击群聊,点击测试群,即可发送消息,其他群暂时没有用户
控制台消息
6 运行
1、首先运行Nacos 注册中心
2、运行RocketMQ
3、运行RouterApplication 服务
4、运行ServerApplication 服务
可以多次启动ServerApplication 已测试集群模式。
7 最后寄语
第一次用netty编写聊天室,有哪里写的不好的,请大家多提意见,有时间我会慢慢优化此项目。离线消息和其他的业务暂时没处理,后续会慢慢完善。谢谢大家看完此帖子。
上一篇: java爬虫
下一篇: 中成药去火不宜超一周
推荐阅读
-
Spring Boot + Vue 前后端分离开发之前端网络请求封装与配置
-
Spring boot 和Vue开发中CORS跨域问题解决
-
学会Spring Boot+Vue前后端分离开发(1)之前后端搭建
-
基于netty实现rpc框架-spring boot客户端
-
Vue项目部署在Spring Boot出现页面空白问题的解决方案
-
Spring Boot+Spring Cloud+Vue+Element项目实战 手把手教你开发权限管理系统 徐丽健著 清华大学出版社
-
Spring-Boot快速集成netty-socketio(socket服务实现,支持认证)
-
Spring Boot + Vue前后端分离项目,Maven自动打包整合
-
vue-cli+spring boot前后端分离跨域及session丢失解决办法
-
Spring Boot + Vue前后端分离(三)实现登录功能