一起来学Netty吧——NIO简单实现群聊
基于NIO实现群聊系统
先来简单阐述一下思路:
首先此系统是由 - 服务器端——客户端 组成。
- 由 服务器端 将多个客户端 联系起来的简单 Demo。
服务器端 的 功能和作用:
- 负责和每个客户端建立连接,并利用 监听 连接事件(SelectionKey.OP_ACCEPT)来实现用户上线提醒的功能。
- 负责接收每个客户端发来的消息,输出到控制台 并 转发 给 其他客户端,进而实现群聊。
客户端的实现:
- 读取控制台的一行信息,按回车发送给服务器端。
- 从服务器端接收其从别的客户端转发过来的信息并输出。
服务器端
package netty.GroupChat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @ClassName GroupChatServer
* @Description 基于 NIO 实现的群聊系统 服务器端
* @Author SkySong
* @Date 2021-01-01 17:38
*/
public class GroupChatServer {
//定义相关的属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
//构造器
public GroupChatServer() {
// 初始化操作
try {
// 获得 选择器
selector = Selector.open();
// 获得 ServerSocketChannel
listenChannel = ServerSocketChannel.open();
// 绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
// 设置非阻塞模式
listenChannel.configureBlocking(false);
// 将 listenChannel 注册到 selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);// 连接事件
} catch (IOException e) {
e.printStackTrace();
}
}
// 监听
public void listen() {
try {
while (true) {
int count = selector.select();
if (count > 0) {// 有相关的事件处理
// 通过 selector 获得 SelectionKeys 集合
Iterator<SelectionKey> key = selector.selectedKeys().iterator();
// 遍历得到的 SelectionKeys
while (key.hasNext()) {
SelectionKey clientKey = key.next();
if (clientKey.isAcceptable()) {// 如果为 连接事件
SocketChannel socketChannel = listenChannel.accept();
// 将 该 socketChannel 设置为非阻塞模式
socketChannel.configureBlocking(false);
// 将 该 socketChannel 注册到 selector
socketChannel.register(selector, SelectionKey.OP_READ);
// 提示某某上线了
System.out.println(socketChannel.getRemoteAddress() + "上线了");
}
if (clientKey.isReadable()) {// 如果为 读取事件
// 处理读事件
readData(clientKey);
}
// 删除当前 SelectionKey (clientKey), 防止重复处理
key.remove();
}
} else {
System.out.println("等待中...");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
/**
* 读取消息
* @param key 当前客户端对应的 SelectionKey
*/
private void readData(SelectionKey key) throws IOException {
SocketChannel socketChannel = null;
try {
// 获得 socketChannel
socketChannel = (SocketChannel) key.channel();
// 读取其中的信息
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
if (read > 0) {// read 为读取到的长度
String msg = new String(buffer.array());
System.out.println(msg);
// 向其他客户端转发该消息
sendMsgToOthers(msg, socketChannel);
}
} catch (IOException e) {
System.out.println(socketChannel.getRemoteAddress() + "离线了...");
// 取消注册
key.cancel();
// 关闭通道
socketChannel.close();
}
}
/**
* 向其他客户端转发该消息
*
* @param msg 消息内容
* @param selfChannel 自身通道(为了排除自己————不要在给自己发送了)
*/
private void sendMsgToOthers(String msg, SocketChannel selfChannel) throws IOException {
Set<SelectionKey> selectionKeys = selector.keys();
// 遍历所有注册到 selector 上的 selectionKey
for (SelectionKey key : selectionKeys) {
// 通过 key 取出对应的 socketChannel
Channel targetChannel = key.channel();
// 排除自己
// ServerSocketChannel 也在 selector 里,所以这里要有类型判断,否则后边类型转换会报错。
if (targetChannel instanceof SocketChannel && selfChannel != targetChannel) {
SocketChannel target = (SocketChannel) targetChannel;
target.configureBlocking(false);
// 将 msg 存储到 Buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
target.write(buffer);
}
}
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
//服务器端开始监听(服务器启动了!)
groupChatServer.listen();
}
}
简单回溯一下:(有关服务器端的点)
-
定义的初识参数:
服务器的 Selector 选择器:是将群真正“拉”起来的核心元件,服务器 和 客户端 的通道都注册在此 选择器上的。
- 所以在客户端借助服务器向其他客户端转发消息时,排除自我转发的过程中,有一步类型判断,这很重要。
ServerSocketChannel 通道:是负责和客户端建立连接的,并完成与 客户端 信息交互的功能。
- 这里提一下 serverSocketChannel.accept() ,这个方法是会阻塞的。
- 而我们今天研究的 NIO 是非阻塞的,里面也多次配置了 通道的非阻塞。
- 因为在我们执行 accept() 方法之前,我们已经知道是连接操作了,所以会立马执行不会阻塞。
客户端
package netty.GroupChat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @ClassName GroupChatClient
* @Description
* @Author SkySong
* @Date 2021-01-01 17:38
*/
@SuppressWarnings("all")
public class GroupChatClient {
// 定义相关属性
private final String HOST = "127.0.0.1";// 服务器的IP地址
private final int PORT = 6667;// 服务器的端口
private Selector selector;// 客户端自己的Selector
private SocketChannel socketChannel;// 负责发消息的socketChannel
private String username;
//构造器(初始化操作)
public GroupChatClient() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
//设置非阻塞模式
socketChannel.configureBlocking(false);
//注册通道
socketChannel.register(selector, SelectionKey.OP_READ);// 写入事件
//获取 username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + "is OK .. .");
} catch (IOException e) {
e.printStackTrace();
}
}
//发送数据
public void sendInfo(String info) {
info = username + "说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
//从服务器读取其他客户端回复的消息
public void readInfo() {
try {
int read = selector.select();
if (read > 0) {// 有可用通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {// 如果是可读的
//得到相关通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//得到一个buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
socketChannel.read(buffer);
//输出消息
System.out.println(new String(buffer.array()).trim());
}
iterator.remove();
}
} else {
//System.out.println("没有可用通道");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//启动客户端
GroupChatClient chatClient = new GroupChatClient();
//启动一个线程(用来同步服务器端的消息)
new Thread(() -> {
while (true) {
chatClient.readInfo();
try {// 每隔3秒从服务器端读取数据
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 发送数据到服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {//只要有下一行就读取
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
客户端没什么好说的,这里主要解释一点:
就是为什么客户端也有自己的 Selector ?
- 客户端似乎不需要多少连接,起码这个 Demo 里没有体现。
- 这是不是多此一举?!
- OH God, Please don’t ! ! !
- 当然不是!!!
我们这个 Demo 虽然只是一个最基础的架子,但基本的延展性还是得体现一丢丢的。
我们来浅读一个场景:
当你给你的好兄弟分享文件资源♂时 ????????????
- 就是你们两个之间正在传文件。
文件的传输一定会占用一个 SocketChannel,此时我们不得不再多搞出点通道来满足我们的其他网络社交需求。譬如:
- 聊天
- 发表情
- 发图片
例如 图片 和 视频 之类的大文件,不可能一下子传过去,会占用一个通道较长时间,所以,如果只有一个的话肯定不行。
- 那么通道多了,自然就需要一个 Selector 来统一管理一下,这合情合理。
马老师:看来是有备而来。
ResultTest(测试结果)
-
我先启动服务器端,再依次启动三个客户端:
一下启动不了3个客户端的,注意看一下配置:
next:
我目前的 IDEA 版本为 2019年3月的,勾选上图中的“允许平行运行”选项就OK了;
其他版本的设置可能略有不同,有的版本是让你取消勾选一个“single run”,就是取消单独运行。 -
让其中一个客户端发送消息:
看看服务器端的反应:(验证客户端与服务器端的通讯)
再看看其他客户端:(验证服务器端的转发功能) -
我们换一个客户端重复上述操作:(严谨)
为了一步到位 和 缩小图片尺寸,我清理了接收端的控制台。马老师:来,骗!来,偷袭!!
情况呢,就是这么个情况。Fine,that’s all,thank you.
本文地址:https://blog.csdn.net/weixin_43415201/article/details/112210905