服务端与多客户端的通信案例(思路与实现)
题目:
使用基于tcp协议的编程模型实现多人同时在线聊天和传输文件,
要求每个客户端将发送的聊天内容和文件发送到服务器,
服务器接收到后转发给当前所有在线的客户端。
(文末附完整代码下载链接)
文章目录
一、分解题目(需求分析阶段)
- 要使用TCP协议的编程模型
- 要实现多人同时聊天
- 要支持发送消息和传输文件
- 客户端要把聊天内容发送到服务器
- 服务器要接收到客户端聊天内容
- 服务器要将聊天内容群发给所有在线客户端
二、确定实现方式(概要设计阶段)
- 要使用TCP协议的编程模型
ServerSocket与Socket通信实现
- 要实现多人同时聊天
创建多个Socket并启动,同时与服务器通信
- 要支持发送消息和传输文件
方式1:每次发送一个消息对象,该对象包括消息类型、消息内容(我选择的)
方式2:每次发送经过俩次通信,先发消息类型,再发消息内容(这个相对简单一些,遇到的问题可能少一些)
- 客户端要把聊天内容发送到服务器
客户端用Socket提供的输出流发送聊天内容
- 服务器要接收到客户端聊天内容
服务端用Socket提供的输入流接收聊天内容
- 服务器要将聊天内容群发给所有在线客户端
ServerSocket没有提供监控所有在线客户端的方法,我选择用一个集合保存所有连接成功后的客户端,客户端下线后及时更新该集合,然后遍历该集合所有客户端给他们群发消息
三、确定实现细节(详细设计阶段)
- ServerSocket与Socket通信模型
TCP协议的的通信模型,就是一边创建服务器等待客户端连接,另一边创建客户端并进行连接,这里都是基本固定的写法
- 创建多个Socket并启动,同时与服务器通信
理论上应该是在一个局域网内多个不同IP的计算机分别启动客户端来访问我本地的服务器,但是由于条件有限,只能自己本地模拟多台客户端访问本地服务器,也就是直接拷贝多份客户端代码,分别启动
- 每次发送一个消息对象,该对象包括消息类型、消息内容
创建一个用户消息类,消息类型、消息内容作为成员变量
- 客户端用Socket提供的输出流发送聊天内容
这里也是基本固定的写法,创建一个对象处理流ObjectOutputStream对象oos,参数用每个socket提供的唯一的outputStream流;
然后使用oos的writeObject方法,将客户端在控制台输入的聊天内容封装成一个用户消息对象,当作参数写入到输出流中。
(客户端接收消息与下方服务端接收消息一样的实现方式)
- 服务端用Socket提供的输入流接收聊天内容
创建对象处理流ObjectInputStream对象ois,参数为socket的inputStream流;
然后使用ois的readObject方法,将客户端发送来的消息读取出来。
(服务器发送消息与上方客户端发送消息一样的实现方式)
- 用集合保存所有连接成功后的客户端,客户端下线后及时更新该集合,然后遍历该集合所有客户端给他们群发消息
在服务端创建一个List集合clients,用于存放所有连接成功的socket对象;
有客户端上线,clients.add(新的客户端);
当有客户端下线后,在该客户端关闭程序之前,clients.remove(下线的客户端);
有客户端发来消息,则遍历clients,给当前所有在线客户端发送该消息内容,达到消息共享的目的。
- 发送文件的处理方式
因为题目没有要求发送的文件要如何处理,所以此处选择将客户端输入的要发送的文件路径下的文件,拷贝到一个固定的服务器路径下(模拟服务器存储文件),群发消息的时候只发文件路径名
- 如何真正实现服务器与多台客户端的并行通信呢
这里出现了并行任务,也给意味着,这道题目里面肯定要用到多线程,接下来分别从服务端和客户端的角度考虑一下,哪些地方应该采用多线程实现?
服务端:
首先服务器要模拟始终运行,并且支持客户端随时连接,那么服务端必须要用一个无限循环
.
假设服务端只有一个线程,也就是main函数启动后的主线程,此时的服务器运行情况大概如下图:
根据上图分析,服务端只有一个主线程是行不通的。
.
同时也得到一个思路:
accept是一个阻塞方法,readObject也是一个阻塞方法,俩者不能同时执行,在一个主线程中有先后的执行顺序。但是现在的需求是既要支持客户端随时连接,又要保证服务端与每一个客户端能够正常通信,通信与等待这俩个是并行任务,而且服务端收发消息和服务端等待其他客户端连接是完全不相关的俩个任务。
.
此时便可得出结论:
(1) 服务器端需要1个单独的线程无限循环,等待客户端连接,连接成功后该线程重新进入等待状态
(2) 同时还需要为每个连接成功的客户端都创建一个专门的服务线程,负责与每个客户端进行通信
客户端:
因为客户端要模拟不断与服务器通信,而不是发送一次就结束程序,所以客户端连接成功后,与服务器通信的代码要用无限循环;
.
假设客户端此时只有一个主线程负责与服务器通信,此时客户端与服务器通信的大概如下图:
根据上图分析:
客户端采用单线程实现多客户端聊天,并立即群发给其他客户端这个功能,会出现延迟情况。
.
同时也得到一个思路:
客户端的输出属于阻塞方法,一旦客户端不给服务器发送消息,程序就无法向下执行,客户端也就无法读取到服务器的信息,只有当客户端发送一条消息后,程序才能执行到readObject那块,但此时读取到的消息不一定是客户端刚发过去的消息,而可能是其他客户端在他没有聊天的时间段向服务器发送的消息,服务器转发给了他,这出现了明显的延时现象,没法达到真正的多客户端即时共享消息的目的。
.
这样的单线程实现用在一个客户端的情况下,完全OK,因为一个客户端的时候,如果客户端不发消息,服务器不会主动向客户端响应消息。
.
但是多客户端的时候,考虑到客户端既要支持随时发送消息,还要支持随时读取消息,而且客户端发送消息和客户端接收其他客户端共享的消息是互不影响的俩个事情,这明显又是俩个可以并行的任务,于是乎,客户端这里,也应该使用多线程来实现
.
此时便可得出结论:
(1) 每个客户端都需要一个线程用来连接服务器,并且无限循环向服务器发送消息;
(2) 同时还需要为每个客户端都创建一个接收消息的线程,负责即时接收到所有客户端发送的消息
- 功能模块划分
上面罗列的都是每一个功能点的具体实现方式,但是实际开发的时候需要将各个功能整合到各个不同的模块,才能更好的进行代码开发
如下是我自己整理的这个需求的功能模块图
四、代码实现(编码阶段)
编码阶段,按照详细设计中总结的模块进行开发,每个模块直接上代码,代码中有注释
注意:
代码中除了中文注释以外,对代码的注释都是一些我遇到的报错的地方,是因为调试过程中些现那段代码有问题,就给注释掉了,换了一种新的方式去实现。
1、消息对象类的设计与开发
import java.io.Serializable;
/**
* @program:
* @description: 消息对象类
* @author:
* @create: 2020-11-24 15:12
**/
public class UserMessage implements Serializable {
//实现serializable接口后必须指定序列化版本号,用于序列化与反序列化时验证对象
private static final long serialVersionUID = -7815896088464512553L;
private String msgType; //消息类型: File|文件;String|普通字符串文本
private String msgBody; //消息内容: 文件路径/普通文本内容
public UserMessage() {
}
public UserMessage(String msgType, String msgBody) {
this.msgType = msgType;
this.msgBody = msgBody;
}
public String getMsgType() {
return msgType;
}
public void setMsgType(String msgType) {
this.msgType = msgType;
}
public String getMsgBody() {
return msgBody;
}
public void setMsgBody(String msgBody) {
this.msgBody = msgBody;
}
@Override
public String toString() {
return "UserMessage{" +
"msgType='" + msgType + '\'' +
", msgBody='" + msgBody + '\'' +
'}';
}
}
2、服务端等待线程(主线程)
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* @program:
* @description: 服务器主线程
* @author:
* @create: 2020-11-24 14:59
**/
public class ChatServer {
public static List<Socket> clients = new ArrayList<>(); //声明一个共用的客户端集合,存储所有在线客户端
public static void main(String[] args) {
startService();
}
public static void startService(){
ServerSocket ss = null;
int threadNum = 0;
try {
//1、创建一个服务器对象,并指名服务器的端口
ss = new ServerSocket(8889);
//2、不断等待客户端连接(服务端看门线程)
while(true) {
System.out.println("等待客户端连接...");
Socket client = ss.accept();
threadNum++;
//将新连接的客户端保存到list中,用于后续群发给所有客户端消息
clients.add(client);
//只要有一个客户端连接成功,就新创建一个线程接收客户端消息
new ServerThread(client,"线程" + threadNum).start();
System.out.println("【连接成功】客户端" + client.getPort() + ",已分配[线程" + threadNum + "]与其通信");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
ss.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3、服务端通信线程
import UserMessage;
import java.io.*;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
/**
* @program:
* @description: 服务器通信线程
* @author:
* @create: 2020-11-24 17:22
**/
public class ServerThread extends Thread{
//存放每个客户端与对应的唯一Object输出流对象(因为每个socket的输出流只能获取一次,所以需要建个映射关系去取每个socket对应的输出流)
private static Map<Socket,ObjectOutputStream> oosMap = new HashMap<>();
private Socket client;
private boolean startFlag; //后加,用于处理客户端下线时的通信报错问题
public ServerThread(Socket client, String threadName) {
super(threadName);
this.client = client;
this.startFlag = true;
}
public void setStartFlag(boolean startFlag) {
this.startFlag = startFlag;
}
/**
* 接收客户端的消息线程,并在收到任何客户端消息后群发消息给所有客户端
*/
@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
String str = "";
UserMessage message = null;
try {
ois = new ObjectInputStream(client.getInputStream());
oos = new ObjectOutputStream(client.getOutputStream());
oosMap.put(client,oos);
while(startFlag) {
//读取客户端发送的消息
UserMessage msg = (UserMessage) ois.readObject();
if ("File".equals(msg.getMsgType())) {
str = "客户端" + client.getPort() + "发送文件:" + msg.getMsgBody();
message = new UserMessage("File", str);
//将文件存储在本地固定的服务器路径下
saveClientFileToLocal(new File(msg.getMsgBody()));
} else{
str = "客户端" + client.getPort() + "发送消息:" + msg.getMsgBody();
message = new UserMessage("String", str);
}
//群发给所有客户端
sendClientGroups(message);
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
if(ois != null){
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//此时不能关闭客户端,因为服务器群发完该客户端要下线的'bye'的消息后,客户端还没有立即接收到,此时关闭客户端读取时会报Socket Closed异常,对客户端的关闭可以放在客户端接收线程接收到'bye'的消息之后,由客户端接收线程来负责关闭client
// try {
// client.close();
// System.out.println("client已关闭" + client.getPort());
// } catch (IOException e) {
// e.printStackTrace();
// }
}
}
/**
* 向所有客户端群发消息
* @param message
*/
public void sendClientGroups(UserMessage message){
ObjectOutputStream oos = null;
try {
System.out.println("------ " + Thread.currentThread().getName() + "群发客户端start ------");
//遍历在线客户端列表,群发该message消息
// for (Socket client : ChatServer.clients) {
// oosMap.get(client).writeObject(message);
// System.out.println("【消息转发成功】转发端口" + client.getPort() + "的客户端: " + message );
// }
//改为普通for循环,解决迭代器或者增强for循环时报的ConcurrentModificationException异常,不允许循环中修改集合的元素
for (int i = 0; i < ChatServer.clients.size(); i++) {
oosMap.get(ChatServer.clients.get(i)).writeObject(message);
System.out.println("【消息转发成功】转发端口" + ChatServer.clients.get(i).getPort() + "的客户端: " + message );
}
System.out.println("------ " + Thread.currentThread().getName() + "群发客户端end ------");
String str = "客户端" + client.getPort() +"发送消息:bye";
if(str.equals(message.getMsgBody())){
startFlag = false;
ChatServer.clients.remove(client);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(null != oos){
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 根据客户端发来的文件路径,将对应文件保存到本地的固定路径下(模拟服务器端存储文件)
*/
public void saveClientFileToLocal(File sourceFile){
//指名服务器文件的本地存储目录: d:/homework_server/客户端端口号
File serverDir = new File("d:/homework_server/" + client.getPort());
if(!serverDir.exists()) serverDir.mkdirs();
//拼接得到服务器下存储的目标文件路径
File desFile = new File(serverDir.getPath() + "/" + sourceFile.getName());
//写入文件
BufferedInputStream bis = null;
BufferedOutputStream bos = null;
try {
bis = new BufferedInputStream(new FileInputStream(sourceFile));
bos = new BufferedOutputStream(new FileOutputStream(desFile));
byte[] bArr = new byte[1024];
int res = 0;
while((res = bis.read(bArr)) != -1){
bos.write(bArr,0,res);
}
System.out.println("【File保存成功】文件名:" + sourceFile.getName() + " | 保存路径:" + desFile);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(null != bis){
try {
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(null != bos){
try {
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Override
public String toString() {
return "ServerThread{" +
"client=" + client.getPort() +
", startFlag=" + startFlag +
'}';
}
}
4、客户端发送线程(主线程)
import UserMessage;
import ChatServer;
import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Scanner;
/**
* @program:
* @description: 客户端发送线程(主线程)
* @author:
* @create: 2020-11-24 14:59
**/
public class ChatClient {
private static boolean startFlag = true; //后加,用于处理客户端下线时的通信报错问题
public static void main(String[] args) {
lunchClient();
}
public static void lunchClient(){
Socket client = null;
Scanner sc = new Scanner(System.in);
ObjectOutputStream oos = null;
UserMessage message = null;
try {
//1、创建一个客户端连接,并指定服务器IP和端口
client = new Socket("127.0.0.1", 8889);
System.out.println(client.getLocalPort() + "连接服务器成功!");
//2、为该客户端启动一个消息接收线程
ClientReceiveThread receiveThread = new ClientReceiveThread(client);
receiveThread.start();
//3、主线程无限循环,来做该客户端的消息发送线程
oos = new ObjectOutputStream(client.getOutputStream());
while (startFlag) {
System.out.println("********* 请输入要发送的消息类型...(0:文本内容 1:文件) *********");
String type = sc.next();
if("0".equals(type)){
System.out.println("********* 请输入要发送的消息内容... *********");
String msgBody = sc.next();
if("bye".equals(msgBody)){
//receiveThread.stop();//终止该客户端的接收线程,该方式不安全,已被弃用,不再使用该方式关闭接收线程,改为用startFlag标识关闭
startFlag = false; //执行完本次消息发送后,退出循环,关闭客户端
}
message = new UserMessage("String", msgBody);
}else if("1".equals(type)){
System.out.println("请输入要发送的本地文件路径:");
String filePath = new Scanner(System.in).nextLine();
//校验文件路径是否正确,是否为文件
File file = new File(filePath);
if(!file.exists() || !file.isFile()){
System.out.println("输入路径不存在或者该路径不是文件!");
continue;
}
message = new UserMessage("File", filePath);
}else{
System.out.println("输入消息类型不存在!请重新输入...");
continue;
}
oos.writeObject(message); //向服务器发送消息
System.out.println("消息发送成功!");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//释放资源
if(null != oos){
try {
//oos.close();
client.shutdownOutput(); //此种关闭流的方式,是单方面关闭输出流,client的输入流可以继续使用,也不会导致client关闭
} catch (IOException e) {
e.printStackTrace();
}
}
sc.close();
}
}
}
5、客户端接收线程
import UserMessage;
import ChatServer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Socket;
/**
* @program:
* @description:客户端接收线程
* @author:
* @create: 2020-11-26 17:06
**/
public class ClientReceiveThread extends Thread{
private Socket client;
private boolean startFlag = true; //后加,用于正常关闭线程,而不是像线程的stop方法一样强制关闭
public ClientReceiveThread() {
}
public ClientReceiveThread(Socket client) {
this.client = client;
}
/**
* 接收服务器发送的消息
*/
@Override
public void run(){
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(client.getInputStream());
while(startFlag){
UserMessage message = (UserMessage) ois.readObject();
System.out.println("---【服务器消息】" + message.getMsgBody());
String str = "客户端" + client.getLocalPort() +"发送消息:bye";
//如果是客户端下线的消息,则修改标识,正常结束线程
if(str.equals(message.getMsgBody())){
startFlag = false;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
if(null != ois){
try {
//此时关闭ois相当于关闭了客户端的oos、ois、socket,因为这是客户端退出之前的最后一步,所以在这里关闭oos、ois、socket都不会报错,而在程序执行到这里之前提前关闭oos是不允许的(也就是在服务端通信线程那里去关闭),因为那时关闭的话,这里的ois就没法读取内容了
ois.close(); //关闭客户端输入流,此种关闭方式会导致客户端的输入输出流都不再使用。也会自动关闭掉client
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
五、调试程序(测试阶段)
启动服务端,再拷贝并启动多个客户端,不断测试各种聊天情况下程序是否正常,出现问题就及时定位与调整代码。
这里记录一下我调试过程中遇到的一些难缠的问题及解决办法
-
消息发送过程中报EOFException
- 这个异常是Object处理流报的异常,就是在读取对象的时候,如果流中没有对象就会报错
问题原因:个别代码位置未放对,导致客户端/服务端重复读取了ObjectInputStream流
解决办法:调整错误代码位置,保证读取流时不会重复(我这里的改动是将客户端发送线程中新建客户端接收线程的代码移到while循环之上,避免重复聊天,重复创建接收线程,而导致多个相同客户端的接收线程并发读取,从而报了EOF异常)
- 这个异常是Object处理流报的异常,就是在读取对象的时候,如果流中没有对象就会报错
-
有客户端下线后,服务端报ConcurrentModificationException
- 问题原因:最开始都是用增强for循环去转发消息,希望实现有客户端下线后,服务端通信线程接收到’bye’的消息后并转发所有客户端后,将该客户端从clients移除。但是实际上这样的操作会导致程序报ConcurrentModificationException异常,因为增强for循环底层是采用的iterator实现,而iterator本身是不支持在循环过程中对集合元素进行修改的。所以这里会爆了错
- 解决办法:既然增强for循环,迭代器循环都不行,就换成了普通的for循环去遍历群发消息
-
有客户端下线后,服务端/客户端会报错,要么报EOFException,要么报Socket Closed异常
- 问题原因:因为客户端的输入输出流、以及客户端没有按照正确的顺序关闭,导致报错
- 解决办法:
客户端发送bye之后,
服务器接收并转发给所有客户端,转发完之后通信线程将循环标识改为false,禁止该客户端对应的通信线程下次继续运行,但此时不在finally中关闭客户端输出流oos,以及客户端socket本身
客户端接收线程接收到服务器发来的bye之后,他负责把客户端的输入流ois关掉,至于是否显式关闭socket,也不用太关注,因为关闭客户端ois后会触发socket关闭
六、提交给出题人(交付阶段)
将最终开发完的程序代码及运行视频提供给出题人
七、出题人验证程序(验收阶段)
出题人自己运行程序验证,或者看我发的完整运行视频,判断程序是否实现全部需求中功能
八、程序的维护(维护阶段)
因为只是一个练习题目,所以这里不考虑维护问题
文章内容输出来源:拉勾教育_大数据开发高新训练营第3期
本文地址:https://blog.csdn.net/qq_19314763/article/details/110261054
上一篇: 大数据--Flink入门
下一篇: SENT协议学习