WebSocket和Socket实现聊天群发
介绍:
前面写过一篇简单的websocke实现服务端。这一篇就不在说什么基础的东西主要是来用实例说话,主要是讲一下实现单聊和群组聊天和所有群发的思路设计。
直接不懂的可以看一下上一篇简单版本再来看也行:
实现效果:
本示例主要实现了个什么东西哪,我们都使用qq或者其他的聊天工具,所有下面我说的大家也都懂。就不啰嗦废话了。
首先说实现6个主要的功能:
- 单聊:可以指定人进行聊天。
- 群发:这个的意思就是当前服务器内的所有人包含自己,这个就跟一个推送效果一样。
- 开启连接(客户端):通知除自己以外的所有用户
- 关闭连接(客户端):通知除自己以外的所有用户
- 群组A:实现一个群组名字为A
- 群组B:实现一个群组名字为B
好了基本就是这个大致功能。下面看下最终效果吧:
以上是第一个图先进入了A群组,后面两个在B群组。然后A有进入了B群组,所有第一张图可以收到所有聊天,但是后面两张只能收到B群组的聊天。
开始撸代码(socket版)
因为是在上面说道的文章改造的,所有基本的三连击(开启服务,开启监听,接受事件)我就不介绍了。
思路分析
我们既然实现的是聊天,那么跟谁聊天当然是其他人,所以我们应该有其他人,可是问题又来了我们登录了如何确认记录状态哪,我登录之后我可以跟服务器通讯,怎么找到其他人进行通讯哪?我就是想到的是使用字典Dictionary来进行存储,为什么用字典而不用list是因为,字典中是键值储存,我们把键当作人,然后值存储这个人的通讯连接,这样我只要知道这个人就在里面找到这个人,然后就取到这个人的连接就可以通讯了。
//建立登录用户记录信息 public static Dictionary<string, Socket> ListUser = new Dictionary<string, Socket>();
注:写完这个之后我们老大看了下我的代码说你这个存在一个问题:线程安全,确实的Dictionary不是线程安全,当时写的时候没多想,他说完我就想起来了,以前用Paralle时候用到的线程安全类ConcurrentBag和ConcurrentDictionary,在这了当然可以改成:
//建立登录用户记录信息 public static ConcurrentDictionary<string, Socket> ListUser = new ConcurrentDictionary<string, Socket>();
好了我们可以进行通讯了,可以找到指定的人进行通讯了,那当然所有人的通讯也可以解决了。所有我就直接说下开启连接和关闭连接的通知。我在消息接受和消息发送的时候定义了自己的规则:
开启连接:我在发送的时候最前面带:login字符串告诉消息接受我现在是登录,你告诉别人吧。
关闭连接:退出的时候没有发送字符串所以为空
ws.send("login,我已经连接上了!!!");
ws.close();
alert("关闭了通讯")
然后我在消息处理增加了判断处理:
if (string.IsNullOrEmpty(resultList[0])) { //退出 SignOut(myClientSocket.RemoteEndPoint.ToString()); ListUser.Remove(myClientSocket.RemoteEndPoint.ToString()); myClientSocket.Shutdown(SocketShutdown.Both); myClientSocket.Close(); Debug.WriteLine("当前退出用户:" + myClientSocket.RemoteEndPoint.ToString()); } else if (resultList[0] == "login") { //登录 Login(myClientSocket.RemoteEndPoint.ToString()); ListUser.Add(myClientSocket.RemoteEndPoint.ToString(), myClientSocket); Debug.WriteLine("当前登录用户:" + myClientSocket.RemoteEndPoint.ToString()); }
大致其他的思路也是这个样子:单聊,群发,群组都是定义相应的规则来进行判断然后进行单独的业务。
全部判断逻辑代码
这里是写在了服务端的消息接受ReceiveMessage方法内,这个方法是一个统一的发送接受方法。想看原方法的请看上一篇:
我这里只是写了我要做的效果,当然可以自己随便修改的。
var resultStr = AnalyzeClientData(result, receiveNumber); string[] resultList = resultStr.Split(','); //string sendMsg = $"你({myClientSocket.RemoteEndPoint.ToString()}):" + resultList[1] + "【服务端回复】"; //myClientSocket.Send(SendMsg(sendMsg));//取消对自己提示发送给别人 if (string.IsNullOrEmpty(resultList[0])) { //退出 SignOut(myClientSocket.RemoteEndPoint.ToString()); ListUser.Remove(myClientSocket.RemoteEndPoint.ToString()); myClientSocket.Shutdown(SocketShutdown.Both); myClientSocket.Close(); Debug.WriteLine("当前退出用户:" + myClientSocket.RemoteEndPoint.ToString()); } else if (resultList[0] == "login") { //登录 Login(myClientSocket.RemoteEndPoint.ToString()); ListUser.Add(myClientSocket.RemoteEndPoint.ToString(), myClientSocket); Debug.WriteLine("当前登录用户:" + myClientSocket.RemoteEndPoint.ToString()); } else if (resultList[0] == "all") { //群发所有用户 GroupChat(myClientSocket.RemoteEndPoint.ToString(), resultList[1]); } else if (resultList[0] == "groupA") { //群组发送 GroupChatA("groupA", myClientSocket.RemoteEndPoint.ToString(), resultList[1]); } else if (resultList[0] == "groupB") { //群组发送 GroupChatA("groupB", myClientSocket.RemoteEndPoint.ToString(), resultList[1]); } else { //单聊 SingleChat(myClientSocket.RemoteEndPoint.ToString(), resultList[0], resultList[1]); }
逻辑判断完成就进入相应的业务方法了,下面我把每一个业务方法放上来。
开启连接
#region 登录提示别人 public void Login(string userId) { if (ListUser.Count() > 0) { foreach (var item in ListUser) { if (item.Key != userId) { Socket socket = item.Value; try { socket.Send(SendMsg($"用户({userId})登录了")); } catch (Exception e) { Debug.WriteLine("该用户已掉线:" + item.Key); //用户已掉线就删除掉 ListUser.Remove(item.Key); } } } } } #endregion
关闭连接
#region 退出提示别人 public void SignOut(string userId) { if (ListUser.Count() > 0) { foreach (var item in ListUser) { if (item.Key != userId) { Socket socket = item.Value; try { socket.Send(SendMsg($"用户({userId})退出了")); } catch (Exception e) { Debug.WriteLine("该用户已掉线:" + item.Key); //用户已掉线就删除掉 ListUser.Remove(item.Key); } } } } } #endregion
单聊
#region 单聊 public void SingleChat(string userIdA, string userIdB, string msg) { Socket socket = ListUser[userIdB]; if (socket != null) { try { socket.Send(SendMsg($"用户({userIdA}=>{userIdB}):{msg}")); } catch (Exception e) { Debug.WriteLine("该用户已掉线:" + userIdB); //用户已掉线就删除掉 ListUser.Remove(userIdB); } } } #endregion
群发所有人
#region 群发 public void GroupChat(string userId, string msg) { if (ListUser.Count() > 0) { foreach (var item in ListUser) { if (item.Key != userId) { Socket socket = item.Value; try { socket.Send(SendMsg($"用户({userId}=>{item.Key}):{msg}")); } catch (Exception e) { Debug.WriteLine("该用户已掉线:" + item.Key); //用户已掉线就删除掉 ListUser.Remove(item.Key); } } } } } #endregion
群组实现
#region 实现群组 //群组记录分类 List<GroupHelp> groupList = new List<GroupHelp>(); public void GroupChatA(string groupName, string userId, string msg) { if (string.IsNullOrEmpty(groupName)) { return; } //判断自己是否在群组 GroupHelp isEisx = groupList.Where(b => b.userId == userId && b.Name == groupName).FirstOrDefault(); if (isEisx == null) { groupList.Add(new GroupHelp() { Name = groupName, userId = userId }); } //根据群组名称判断是否存在群组 var nowGroupList = groupList.Where(b => b.Name == groupName).ToList(); foreach (var itemG in nowGroupList) { Socket socket = ListUser[itemG.userId]; try { socket.Send(SendMsg($"用户({userId}=>{itemG.userId}):{msg}")); } catch (Exception e) { Debug.WriteLine("该用户已掉线:" + itemG.userId); //用户已掉线就删除掉 ListUser.Remove(itemG.userId); } } } #endregion
数据处理方法
#region 打包请求连接数据 /// <summary> /// 打包请求连接数据 /// </summary> /// <param name="handShakeBytes"></param> /// <param name="length"></param> /// <returns></returns> private byte[] PackageHandShakeData(byte[] handShakeBytes, int length) { string handShakeText = Encoding.UTF8.GetString(handShakeBytes, 0, length); string key = string.Empty; Regex reg = new Regex(@"Sec\-WebSocket\-Key:(.*?)\r\n"); Match m = reg.Match(handShakeText); if (m.Value != "") { key = Regex.Replace(m.Value, @"Sec\-WebSocket\-Key:(.*?)\r\n", "$1").Trim(); } byte[] secKeyBytes = SHA1.Create().ComputeHash(Encoding.ASCII.GetBytes(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); string secKey = Convert.ToBase64String(secKeyBytes); var responseBuilder = new StringBuilder(); responseBuilder.Append("HTTP/1.1 101 Switching Protocols" + "\r\n"); responseBuilder.Append("Upgrade: websocket" + "\r\n"); responseBuilder.Append("Connection: Upgrade" + "\r\n"); responseBuilder.Append("Sec-WebSocket-Accept: " + secKey + "\r\n\r\n"); return Encoding.UTF8.GetBytes(responseBuilder.ToString()); } #endregion #region 处理接收的数据 /// <summary> /// 处理接收的数据 /// 参考 http://www.cnblogs.com/smark/archive/2012/11/26/2789812.html /// </summary> /// <param name="recBytes"></param> /// <param name="length"></param> /// <returns></returns> private string AnalyzeClientData(byte[] recBytes, int length) { int start = 0; // 如果有数据则至少包括3位 if (length < 2) return ""; // 判断是否为结束针 bool IsEof = (recBytes[start] >> 7) > 0; // 暂不处理超过一帧的数据 if (!IsEof) return ""; start++; // 是否包含掩码 bool hasMask = (recBytes[start] >> 7) > 0; // 不包含掩码的暂不处理 if (!hasMask) return ""; // 获取数据长度 UInt64 mPackageLength = (UInt64)recBytes[start] & 0x7F; start++; // 存储4位掩码值 byte[] Masking_key = new byte[4]; // 存储数据 byte[] mDataPackage; if (mPackageLength == 126) { // 等于126 随后的两个字节16位表示数据长度 mPackageLength = (UInt64)(recBytes[start] << 8 | recBytes[start + 1]); start += 2; } if (mPackageLength == 127) { // 等于127 随后的八个字节64位表示数据长度 mPackageLength = (UInt64)(recBytes[start] << (8 * 7) | recBytes[start] << (8 * 6) | recBytes[start] << (8 * 5) | recBytes[start] << (8 * 4) | recBytes[start] << (8 * 3) | recBytes[start] << (8 * 2) | recBytes[start] << 8 | recBytes[start + 1]); start += 8; } mDataPackage = new byte[mPackageLength]; for (UInt64 i = 0; i < mPackageLength; i++) { mDataPackage[i] = recBytes[i + (UInt64)start + 4]; } Buffer.BlockCopy(recBytes, start, Masking_key, 0, 4); for (UInt64 i = 0; i < mPackageLength; i++) { mDataPackage[i] = (byte)(mDataPackage[i] ^ Masking_key[i % 4]); } return Encoding.UTF8.GetString(mDataPackage); } #endregion #region 发送数据 /// <summary> /// 把发送给客户端消息打包处理(拼接上谁什么时候发的什么消息) /// </summary> /// <returns>The data.</returns> /// <param name="message">Message.</param> private byte[] SendMsg(string msg) { byte[] content = null; byte[] temp = Encoding.UTF8.GetBytes(msg); if (temp.Length < 126) { content = new byte[temp.Length + 2]; content[0] = 0x81; content[1] = (byte)temp.Length; Buffer.BlockCopy(temp, 0, content, 2, temp.Length); } else if (temp.Length < 0xFFFF) { content = new byte[temp.Length + 4]; content[0] = 0x81; content[1] = 126; content[2] = (byte)(temp.Length & 0xFF); content[3] = (byte)(temp.Length >> 8 & 0xFF); Buffer.BlockCopy(temp, 0, content, 4, temp.Length); } return content; } #endregion
javascript代码
function webSocketClose() { ws.close(); alert("关闭了通讯") } //单聊 function send() { var msg = document.getElementById("message").value; var data = ""+document.getElementById("userId").value +","+ msg if (msg == "" || msg == undefined) { alert("请填写发送内容!") return; } ws.send(data); } //群发(所有用户) function sendGroup() { var msg = document.getElementById("message").value; var data = "all," + msg if (msg == "" || msg == undefined) { alert("请填写发送内容!") return; } ws.send(data); } //群组发送A function sendGroupA() { var msg = document.getElementById("message").value; var data = "groupA," + msg if (msg == "" || msg == undefined) { alert("请填写发送内容!") return; } ws.send(data); } //群组发送A function sendGroupB() { var msg = document.getElementById("message").value; var data = "groupB," + msg if (msg == "" || msg == undefined) { alert("请填写发送内容!") return; } ws.send(data); }
写在最后
这个就是我不是根据seesion来进行判断用户的,所有每当刷新了页面也就相当于退出了当前用户,还是需要重新开启连接的,这就是一个基本思路实现。还有待完善和不足。还请见谅。代码基本就差不多了。
源码放在了gitHub: