springboot项目中使用netty+websocket 实现消息推送(带校验用户是否登陆功能)
程序员文章站
2022-07-03 17:06:31
...
maven 引入包:
<!-- websocket -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
netty 启动server类:
package com.minivision.user.manage.websocket;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* webSocket启动server <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年7月22日 <br>
*/
@Component
public class NettyServer {
/**
* 调测日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketHandler.class);
/**
* websocket协议名 <br>
*/
private static final String WEBSOCKET_PROTOCOL = "WebSocket";
/**
* 端口号 <br>
*/
@Value("${websocket.netty.port:58080}")
private int port;
/**
* wesocket路径 <br>
*/
@Value("${websocket.netty.path:/websocket}")
private String websocketPath;
/**
* websokcet处理器 <br>
*/
@Autowired
private WebSocketHandler webSocketHandler;
/**
* permissionWebSocketHandler <br>
*/
@Autowired
private PermissionWebSocketHandler permissionWebSocketHandler;
/**
* bossGroup <br>
*/
private EventLoopGroup bossGroup;
/**
* parentGroup <br>
*/
private EventLoopGroup group;
/**
* 启动: <br>
*
* @author yangxiaodong<br>
* @throws InterruptedException
* @taskId <br>
* @throws InterruptedException <br>
*/
@PostConstruct
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
group = new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);// 配置TCP参数,握手字符串长度设置
sb.group(group, bossGroup) // group辅助客户端的tcp连接请求 bossGroup负责与客户端之前的读写操作
.channel(NioServerSocketChannel.class) // 配置客户端的channel类型
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ObjectEncoder());
// 以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(permissionWebSocketHandler);
ch.pipeline().addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
ch.pipeline().addLast(webSocketHandler);
}
});
// 服务器异步创建绑定
sb.bind().sync();
LOGGER.info("websocket server start");
}
/**
* 释放线程池资源: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @throws InterruptedException <br>
*/
@PreDestroy
private void destory() throws InterruptedException {
if (null != bossGroup) {
bossGroup.shutdownGracefully().sync();
}
if (null != group) {
group.shutdownGracefully().sync();
}
}
}
用户登陆验证码处理器:
通过ws后面携带的参数,进行用户登陆校验,
思路:获取到ws参数后,处理连接地址,把携带的参数去掉,否则,channel会自动断开的
package com.minivision.user.manage.websocket;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.minivision.user.manage.util.UrlUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
/**
* websocket 用户登陆验证码处理器 <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年8月1日 <br>
*/
@Component
@Sharable
public class PermissionWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 调测日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(PermissionWebSocketHandler.class);
/**
* wesocket路径 <br>
*/
@Value("${websocket.netty.path:/websocket}")
private String websocketPath;
/**
* 重新方法,获取url中参数,进行权限验证: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx 上下文
* @param msg 参数
* @throws Exception 异常<br>
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
if (StringUtils.isNotEmpty(uri)) {
Map<String, String> map = UrlUtil.parseUrl(uri);
// 获取用户token
String token = map.get("token");
AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
if (null == webSocketPerssionVerify) {
webSocketPerssionVerify = new WebSocketPerssionVerify();
webSocketPerssionVerify.setToken(token);
}
ctx.channel().attr(key).setIfAbsent(webSocketPerssionVerify);
request.setUri(websocketPath);
ctx.fireChannelRead(request.retain());
}
}
ctx.fireChannelRead(msg);
}
/**
* 工程出现异常的时候调用: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx 处理上下文
* @param cause 异常
* @throws Exception <br>
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("websocket exception", cause);
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
ctx.fireChannelRead(msg);
}
}
websocket处理器 :
处理发送消息,ws握手成功后,进行用户是否登陆判断,没有登陆,断开channel
package com.minivision.user.manage.websocket;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.minivision.user.client.TokenVerifyReq;
import com.minivision.user.client.TokenVerifyResp;
import com.minivision.user.client.TokenVerifyResp.ReplyStatus;
import com.minivision.user.client.api.TokenVerifyServiceFacade;
import com.minivision.user.manage.api.dto.MarkInMailMsgReadReqDTO;
import com.minivision.user.manage.inmail.model.InMailMsgResp;
import com.minivision.user.manage.inmail.model.MarkInMailMsgRead;
import com.minivision.user.manage.inmail.model.QueryUnreadInMailMsg;
import com.minivision.user.manage.inmail.model.SendInMailMsg;
/**
* websocket处理器 <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年7月22日 <br>
*/
@Component
@Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 调测日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketHandler.class);
/**
* redis工具类模板 <br>
*/
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 发送站内信service <br>
*/
@Autowired
private SendInMailMsgService sendInMailMsgService;
/**
* token校验facade <br>
*/
@Reference
private TokenVerifyServiceFacade tokenVerifyServiceFacade;
/**
* channel被启用的时候触发(在建立连接的时候),服务端监听到客户端活动
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("connect to client");
// 添加到channelGroup 通道组
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* channel断开时候触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("disconnect to client");
// 删除渠道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeChannelId(ctx);
}
/**
* 删除channelId: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx 上下文<br>
*/
private void removeChannelId(ChannelHandlerContext ctx) {
String userId = getUserId(ctx);
if (StringUtils.isNotEmpty(userId)) {
redisTemplate.opsForSet().remove(WebSocketConstant.BOSS_MSG_CHANNELID + userId, ctx.channel().id());
}
}
/**
* 工程出现异常的时候调用: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx 处理上下文
* @param cause 异常
* @throws Exception <br>
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("websocket exception", cause);
removeChannelId(ctx);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 握手成功以后,查询用户未读消息,发送未读消息
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
if (null != webSocketPerssionVerify) {
String token = webSocketPerssionVerify.getToken();
if (StringUtils.isEmpty(token)) {
sendUserNotLoginMsg(ctx);
return;
}
// 校验token是否登陆
TokenVerifyReq tokenVerifyReq = new TokenVerifyReq();
tokenVerifyReq.setToken(token);
TokenVerifyResp resp = tokenVerifyServiceFacade.verifyToken(tokenVerifyReq);
ReplyStatus status = resp.getStatus();
if (!status.equals(ReplyStatus.OK)) {
sendUserNotLoginMsg(ctx);
return;
}
String userId = resp.getUserId();
redisTemplate.opsForSet().add(WebSocketConstant.BOSS_MSG_CHANNELID + userId, ctx.channel().id());
redisTemplate.expire(WebSocketConstant.BOSS_MSG_CHANNELID + userId, 1, TimeUnit.DAYS);
webSocketPerssionVerify.setUserId(userId);
ctx.channel().attr(key).setIfAbsent(webSocketPerssionVerify);
// 发送未读消息
QueryUnreadInMailMsg queryUnreadInMailMsg = new QueryUnreadInMailMsg();
queryUnreadInMailMsg.setUserId(userId);
queryUnreadInMailMsg(queryUnreadInMailMsg);
} else {
sendUserNotLoginMsg(ctx);
return;
}
}
// 用于触发用户事件,包含触发读空闲、写空闲、读写空闲
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
Channel channel = ctx.channel();
removeChannelId(ctx);
// 关闭无用channel,以防资源浪费
channel.close();
}
}
}
/**
* 发送用户为登陆消息: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx <br>
*/
public void sendUserNotLoginMsg(ChannelHandlerContext ctx) {
removeChannelId(ctx);
ChannelFuture future = ctx.writeAndFlush(new CloseWebSocketFrame(InMailMsgResp.USER_NOT_LOGIN, InMailMsgResp.USER_NOT_LOGIN_MSG));
future.addListener(ChannelFutureListener.CLOSE);
}
/**
* 获取用户id: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param ctx 上下文
* @return <br>
*/
private String getUserId(ChannelHandlerContext ctx) {
AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
if (null != webSocketPerssionVerify) {
return webSocketPerssionVerify.getUserId();
}
return null;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("server receiver client msg:{}", msg.text());
}
String text = msg.text();
if (StringUtils.isNotEmpty(text)) {
JSONObject parseObject = JSONObject.parseObject(text);
String actionType = parseObject.getString("actionType");
if (WebSocketConstant.INMAIL_ACTION_TYPE_SEND.equals(actionType)) {
// 发送消息
SendInMailMsg sendInMailMsg = JSONObject.parseObject(text, SendInMailMsg.class);
sendMsg(sendInMailMsg);
} else if (WebSocketConstant.INMAIL_ACTION_TYPE_MARK.equals(actionType)) {
// 标记已读
MarkInMailMsgRead markInMailMsgRead = JSONObject.parseObject(text, MarkInMailMsgRead.class);
markMsg(markInMailMsgRead);
}
}
}
/**
* 发送消息: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param sendInMailMsg 消息
*/
private void sendMsg(SendInMailMsg sendInMailMsg) {
sendInMailMsgService.sendMsg(sendInMailMsg);
}
/**
* 标记消息已读: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param markInMailMsgRead 消息主体
*/
private void markMsg(MarkInMailMsgRead markInMailMsgRead) {
MarkInMailMsgReadReqDTO req = new MarkInMailMsgReadReqDTO();
req.setMsgIds(markInMailMsgRead.getMsgIds());
req.setUserId(markInMailMsgRead.getUserId());
sendInMailMsgService.markMsg(markInMailMsgRead);
}
/**
* 查询未读站内信信息: <br>
* yangxiaodong<br>
*
* @taskId <br>
* @param queryUnreadInMailMsg 参数<br>
*/
private void queryUnreadInMailMsg(QueryUnreadInMailMsg queryUnreadInMailMsg) {
sendInMailMsgService.queryUnreadInMailMsg(queryUnreadInMailMsg);
}
}
websocket权限校验实体类:
package com.minivision.user.manage.websocket;
/**
* websocket权限校验 <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年8月1日 <br>
*/
public class WebSocketPerssionVerify {
/**
* 用户登陆token token <br>
*/
private String token;
/**
* 用户id <br>
*/
private String userId;
/**
* get token
*
* @return Returns the token.<br>
*/
public String getToken() {
return token;
}
/**
* set token
*
* @param token The token to set. <br>
*/
public void setToken(String token) {
this.token = token;
}
/**
* get userId
*
* @return Returns the userId.<br>
*/
public String getUserId() {
return userId;
}
/**
* set userId
*
* @param userId The userId to set. <br>
*/
public void setUserId(String userId) {
this.userId = userId;
}
}
发送消息逻辑代码:
package com.minivision.user.manage.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.minivision.boss.common.util.ModelMapperUtil;
import com.minivision.constants.DigitConst;
import com.minivision.user.manage.api.InMailMessageServiceFacade;
import com.minivision.user.manage.api.dto.AddInMailMsgReqDTO;
import com.minivision.user.manage.api.dto.AddInMailMsgReqDTO.AddInMailMsg;
import com.minivision.user.manage.api.dto.MarkInMailMsgReadReqDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgReqDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgRespDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgRespDTO.InMailMsgInfo;
import com.minivision.user.manage.inmail.model.InMailMsgBody;
import com.minivision.user.manage.inmail.model.MarkInMailMsgRead;
import com.minivision.user.manage.inmail.model.QueryUnreadInMailMsg;
import com.minivision.user.manage.inmail.model.SendInMailMsg;
import com.minivision.user.manage.inmail.model.InMailMsgData;
/**
* 发送消息service <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年7月27日 <br>
*/
@Service
public class SendInMailMsgService {
/**
* redis模板工具类 <br>
*/
@SuppressWarnings("rawtypes")
@Autowired
private RedisTemplate redisTemplate;
/**
* inMailMessageServiceFacade <br>
*/
@Reference
private InMailMessageServiceFacade inMailMessageServiceFacade;
/**
* 发送前方法: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param sendMessage 消息<br>
*/
private void beforeSend(SendInMailMsg sendMessage) {
List<InMailMsgBody> contentList = sendMessage.getContentList();
if (CollectionUtils.isNotEmpty(contentList)) {
// 添加消息前缀
contentList.forEach(item -> item.setMsgId(WebSocketConstant.INMAIL_MSG_PREFIX + item.getMsgId()));
}
}
/**
* 保存消息记录: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param sendMessage 消息<br>
*/
private void saveMsgRecord(SendInMailMsg sendMessage) {
AddInMailMsgReqDTO req = new AddInMailMsgReqDTO();
List<InMailMsgBody> contentList = sendMessage.getContentList();
if (CollectionUtils.isNotEmpty(contentList)) {
List<AddInMailMsg> msgList = new ArrayList<>();
contentList.forEach(item -> {
AddInMailMsg addInMailMsg = ModelMapperUtil.strictMap(item, AddInMailMsg.class);
addInMailMsg.setMsgType(sendMessage.getSendType());
addInMailMsg.setSenderId(sendMessage.getSenderId());
addInMailMsg.setSendTime(sendMessage.getSendTime());
addInMailMsg.setReceivers(sendMessage.getReceivers());
msgList.add(addInMailMsg);
});
req.setMsgList(msgList);
inMailMessageServiceFacade.addInMailMsg(req);
}
}
/**
* 发送消息
*
* @param sendInMailMsg 发送消息体
*/
public void sendMsg(SendInMailMsg sendInMailMsg) {
beforeSend(sendInMailMsg);
if (WebSocketConstant.SEND_TYPE_ONE_TO_ONE.equals(sendInMailMsg.getSendType())
|| WebSocketConstant.SEND_TYPE_ONE_TO_MANY.equals(sendInMailMsg.getSendType())) {
send(sendInMailMsg);
} else if (WebSocketConstant.SEND_TYPE_GROUP.equals(sendInMailMsg.getSendType())) {
sendToAll(sendInMailMsg);
}
}
/**
* 查询未: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param queryUnreadInMailMsg <br>
*/
public void queryUnreadInMailMsg(QueryUnreadInMailMsg queryUnreadInMailMsg) {
ChannelGroup channelGroup = NettyConfig.getChannelGroup();
if (null != channelGroup) {
QueryUnReadInMailMsgReqDTO req = new QueryUnReadInMailMsgReqDTO();
req.setUserId(queryUnreadInMailMsg.getUserId());
QueryUnReadInMailMsgRespDTO resp = inMailMessageServiceFacade.queryUnReadInMailMsg(req);
if (resp.getSuccess()) {
sendUnreadInMailMsg(channelGroup, resp, queryUnreadInMailMsg);
}
}
}
/**
* 发送未读站内信: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param channelGroup channelGroup
* @param resp 消息resp
* @param queryUnreadInMailMsg 查询参数 <br>
*/
@SuppressWarnings("unchecked")
private void sendUnreadInMailMsg(ChannelGroup channelGroup, QueryUnReadInMailMsgRespDTO resp, QueryUnreadInMailMsg queryUnreadInMailMsg) {
List<InMailMsgInfo> msgList = resp.getMsgList();
Set<ChannelId> members = redisTemplate.opsForSet().members(getChannelId(queryUnreadInMailMsg.getUserId()));
if (CollectionUtils.isNotEmpty(members)) {
InMailMsgData unReadInMailMsg = new InMailMsgData();
unReadInMailMsg.setCount(resp.getCount());
unReadInMailMsg.setMsgList(msgList);
String jsonString = JSONObject.toJSONString(unReadInMailMsg);
for (ChannelId channelId : members) {
Channel channel = channelGroup.find(channelId);
if (null != channel) {
channelGroup.find(channelId).writeAndFlush(new TextWebSocketFrame(jsonString));
}
}
}
}
/**
* 标记消息已读: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param markInMailMsgRead 消息主体
*/
public void markMsg(MarkInMailMsgRead markInMailMsgRead) {
MarkInMailMsgReadReqDTO req = new MarkInMailMsgReadReqDTO();
req.setMsgIds(markInMailMsgRead.getMsgIds());
req.setUserId(markInMailMsgRead.getUserId());
inMailMessageServiceFacade.markInMailMsgRead(req);
}
/**
* 发送一对多消息: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param sendMessage 参数<br>
*/
@SuppressWarnings("unchecked")
private void send(SendInMailMsg sendMessage) {
ChannelGroup channelGroup = NettyConfig.getChannelGroup();
if (null != channelGroup) {
List<String> receivers = sendMessage.getReceivers();
if (CollectionUtils.isNotEmpty(receivers)) {
for (String receive : receivers) {
Set<ChannelId> members = redisTemplate.opsForSet().members(getChannelId(receive));
if (CollectionUtils.isNotEmpty(members)) {
for (ChannelId channelId : members) {
if (null != channelId) {
sendToUser(channelId, channelGroup, sendMessage);
}
}
}
}
}
}
}
/**
* 给用户发送消息: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param channelId 渠道id
* @param channelGroup channel组
* @param sendMessage 消息<br>
*/
private void sendToUser(ChannelId channelId, ChannelGroup channelGroup, SendInMailMsg sendMessage) {
if (null != channelId) {
Channel channel = channelGroup.find(channelId);
if (null != channel) {
InMailMsgData inMailMsgData = new InMailMsgData();
inMailMsgData.setCount(DigitConst.ONE);
List<InMailMsgInfo> inMailMsgInfoList = ModelMapperUtil.strictMapList(sendMessage.getContentList(), InMailMsgInfo.class);
inMailMsgData.setMsgList(inMailMsgInfoList);
ChannelFuture channelFuture = channel
.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(inMailMsgData)));
// 获取用户信息
AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
WebSocketPerssionVerify webSocketPerssionVerify = channel.attr(key).get();
String userId = webSocketPerssionVerify.getUserId();
sendMessage.setReceivers(Arrays.asList(userId));
saveMsgRecord(sendMessage);
// 消息发送成功后,保存发送记录
channelFuture.addListener(future -> {
if (future.isSuccess()) {
saveMsgRecord(sendMessage);
}
});
}
}
}
/**
* 群发消息: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param sendMessage 参数<br>
*/
private void sendToAll(SendInMailMsg sendMessage) {
ChannelGroup channelGroup = NettyConfig.getChannelGroup();
if (null != channelGroup) {
channelGroup.writeAndFlush(new TextWebSocketFrame(sendMessage.getContentString()));
saveMsgRecord(sendMessage);
}
}
/**
* 获取channelId: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @param userId 用户id
* @return <br>
*/
private String getChannelId(String userId) {
StringBuilder sb = new StringBuilder();
sb.append(WebSocketConstant.BOSS_MSG_CHANNELID).append(userId);
return sb.toString();
}
}
全局的channelgroup,用来存在channel,发送消息
package com.minivision.user.manage.websocket;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* netty配置 <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年7月22日 <br>
*/
public class NettyConfig {
/**
* 渠道组
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 私有构造方法
*/
private NettyConfig() {
}
/**
* 获取channel组: <br>
*
* @author yangxiaodong<br>
* @taskId <br>
* @return <br>
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
}
常量类:
package com.minivision.user.manage.websocket;
/**
* websocket常量类 <br>
*
* @author yangxiaodong<br>
* @version 1.0<br>
* @taskId <br>
* @CreateDate 2019年7月27日 <br>
*/
public class WebSocketConstant {
/**
* 发送的方式:1-一对一
*/
public static final String SEND_TYPE_ONE_TO_ONE = "1";
/**
* 发送的方式:2-一对多
*/
public static final String SEND_TYPE_ONE_TO_MANY = "2";
/**
* 发送的方式:3-群发
*/
public static final String SEND_TYPE_GROUP = "3";
/**
* redis websocket 用户id前缀
*/
public static final String BOSS_SOCKET_USER_ID_PREFIX = "websocket:userid:";
/**
* 执行方法:1-注册
*/
public static final String EXEC_METHOD_LOGIN = "1";
/**
* 执行方法:2-发送消息
*/
public static final String EXEC_METHOD_SEND_MSG = "2";
/**
* 执行方法:3-注销
*/
public static final String EXEC_METHOD_LOGOUT = "3";
/**
* 消息id <br>
*/
public static final String BOSS_MSG_CHANNELID = "msg:channel:";
/**
* 站内信前缀
*/
public static final String INMAIL_MSG_PREFIX = "msg_";
/**
* 动作类型:1-登陆 <br>
*/
public static final String INMAIL_ACTION_TYPE_LOGIN = "1";
/**
* 动作类型:2-发送消息<br>
*/
public static final String INMAIL_ACTION_TYPE_SEND = "2";
/**
* 动作类型:3-标记消息已读 <br>
*/
public static final String INMAIL_ACTION_TYPE_MARK = "3";
/**
* 动作类型:4-查询未读站内信 <br>
*/
public static final String INMAIL_ACTION_TYPE_QUERY_UNREAD_MSG = "4";
/**
* 私有构造方法
*/
private WebSocketConstant() {
}
}
上一篇: Web Worker理解