欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

一起来学Netty吧——NIO简单实现群聊

程序员文章站 2022-06-28 16:23:33
基于NIO实现群聊系统先来简单阐述一下思路:服务器端package netty.GroupChat;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.Set;/** * @ClassName GroupChatServ...

基于NIO实现群聊系统


先来简单阐述一下思路:

首先此系统是由 - 服务器端——客户端 组成。

  • 服务器端 将多个客户端 联系起来的简单 Demo。

服务器端 的 功能和作用:

  • 负责和每个客户端建立连接,并利用 监听 连接事件(SelectionKey.OP_ACCEPT)来实现用户上线提醒的功能。
  • 负责接收每个客户端发来的消息,输出到控制台 并 转发其他客户端,进而实现群聊。

客户端的实现:

  • 读取控制台的一行信息,按回车发送给服务器端。
  • 从服务器端接收其从别的客户端转发过来的信息并输出。

服务器端

package netty.GroupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @ClassName GroupChatServer
 * @Description 基于 NIO 实现的群聊系统 服务器端
 * @Author SkySong
 * @Date 2021-01-01 17:38
 */
public class GroupChatServer {
    //定义相关的属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    //构造器
    public GroupChatServer() {
        // 初始化操作

        try {
            // 获得 选择器
            selector = Selector.open();
            // 获得 ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            // 绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            // 设置非阻塞模式
            listenChannel.configureBlocking(false);
            // 将 listenChannel 注册到 selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);// 连接事件

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 监听
    public void listen() {
        try {
            while (true) {
                int count = selector.select();
                if (count > 0) {// 有相关的事件处理
                    // 通过 selector 获得 SelectionKeys 集合
                    Iterator<SelectionKey> key = selector.selectedKeys().iterator();
                    // 遍历得到的 SelectionKeys
                    while (key.hasNext()) {
                        SelectionKey clientKey = key.next();
                        if (clientKey.isAcceptable()) {// 如果为 连接事件
                            SocketChannel socketChannel = listenChannel.accept();
                            // 将 该 socketChannel 设置为非阻塞模式
                            socketChannel.configureBlocking(false);
                            // 将 该 socketChannel 注册到 selector
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            // 提示某某上线了
                            System.out.println(socketChannel.getRemoteAddress() + "上线了");
                        }
                        if (clientKey.isReadable()) {// 如果为 读取事件
                            // 处理读事件
                            readData(clientKey);
                        }
                        // 删除当前 SelectionKey (clientKey), 防止重复处理
                        key.remove();
                    }

                } else {
                    System.out.println("等待中...");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }

    /**
     * 读取消息
     * @param key 当前客户端对应的 SelectionKey
     */
    private void readData(SelectionKey key) throws IOException {
        SocketChannel socketChannel = null;
        try {
            // 获得 socketChannel
            socketChannel = (SocketChannel) key.channel();
            // 读取其中的信息
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int read = socketChannel.read(buffer);
            if (read > 0) {// read 为读取到的长度
                String msg = new String(buffer.array());
                System.out.println(msg);
                // 向其他客户端转发该消息
                sendMsgToOthers(msg, socketChannel);
            }
        } catch (IOException e) {
            System.out.println(socketChannel.getRemoteAddress() + "离线了...");
            // 取消注册
            key.cancel();
            // 关闭通道
            socketChannel.close();
        }
    }

    /**
     * 向其他客户端转发该消息
     *
     * @param msg         消息内容
     * @param selfChannel 自身通道(为了排除自己————不要在给自己发送了)
     */
    private void sendMsgToOthers(String msg, SocketChannel selfChannel) throws IOException {
        Set<SelectionKey> selectionKeys = selector.keys();
        // 遍历所有注册到 selector 上的 selectionKey
        for (SelectionKey key : selectionKeys) {
            // 通过 key 取出对应的 socketChannel
            Channel targetChannel = key.channel();
            // 排除自己
            // ServerSocketChannel 也在 selector 里,所以这里要有类型判断,否则后边类型转换会报错。
            if (targetChannel instanceof SocketChannel && selfChannel != targetChannel) {
                SocketChannel target = (SocketChannel) targetChannel;
                target.configureBlocking(false);

                // 将 msg 存储到 Buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                target.write(buffer);
            }

        }
    }

    public static void main(String[] args) {
        GroupChatServer groupChatServer = new GroupChatServer();
        //服务器端开始监听(服务器启动了!)
        groupChatServer.listen();
    }
}

简单回溯一下:(有关服务器端的点)

  • 定义的初识参数:

    服务器的 Selector 选择器:是将群真正“拉”起来的核心元件,服务器 和 客户端 的通道都注册在此 选择器上的。

    • 所以在客户端借助服务器向其他客户端转发消息时,排除自我转发的过程中,有一步类型判断,这很重要。

    ServerSocketChannel 通道:是负责和客户端建立连接的,并完成与 客户端 信息交互的功能。

    • 这里提一下 serverSocketChannel.accept() ,这个方法是会阻塞的。

      - 而我们今天研究的 NIO 是非阻塞的,里面也多次配置了 通道的非阻塞。
      - 因为在我们执行 accept() 方法之前,我们已经知道是连接操作了,所以会立马执行不会阻塞。

客户端


package netty.GroupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * @ClassName GroupChatClient
 * @Description
 * @Author SkySong
 * @Date 2021-01-01 17:38
 */
@SuppressWarnings("all")
public class GroupChatClient {
    // 定义相关属性
    private final String HOST = "127.0.0.1";// 服务器的IP地址
    private final int PORT = 6667;//           服务器的端口
    private Selector selector;//               客户端自己的Selector
    private SocketChannel socketChannel;//     负责发消息的socketChannel
    private String username;

    //构造器(初始化操作)
    public GroupChatClient() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
            //设置非阻塞模式
            socketChannel.configureBlocking(false);
            //注册通道
            socketChannel.register(selector, SelectionKey.OP_READ);// 写入事件
            //获取 username
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + "is OK .. .");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //发送数据
    public void sendInfo(String info) {
        info = username + "说:" + info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //从服务器读取其他客户端回复的消息
    public void readInfo() {
        try {
            int read = selector.select();
            if (read > 0) {// 有可用通道
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {// 如果是可读的
                        //得到相关通道
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        //得到一个buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        socketChannel.read(buffer);
                        //输出消息
                        System.out.println(new String(buffer.array()).trim());
                    }
                    iterator.remove();
                }
            } else {
                //System.out.println("没有可用通道");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        //启动客户端
        GroupChatClient chatClient = new GroupChatClient();

        //启动一个线程(用来同步服务器端的消息)
        new Thread(() -> {
            while (true) {
                chatClient.readInfo();
                try {// 每隔3秒从服务器端读取数据
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 发送数据到服务器端
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNextLine()) {//只要有下一行就读取
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}

客户端没什么好说的,这里主要解释一点:

就是为什么客户端也有自己的 Selector ?

  • 客户端似乎不需要多少连接,起码这个 Demo 里没有体现。
  • 这是不是多此一举?!
  • OH God, Please don’t ! ! !
  • 当然不是!!!

我们这个 Demo 虽然只是一个最基础的架子,但基本的延展性还是得体现一丢丢的。

我们来浅读一个场景:

当你给你的好兄弟分享文件资源♂时 ????????????

  • 就是你们两个之间正在传文件。

文件的传输一定会占用一个 SocketChannel,此时我们不得不再多搞出点通道来满足我们的其他网络社交需求。譬如:

  • 聊天
  • 发表情
  • 发图片

例如 图片 和 视频 之类的大文件,不可能一下子传过去,会占用一个通道较长时间,所以,如果只有一个的话肯定不行。

  • 那么通道多了,自然就需要一个 Selector 来统一管理一下,这合情合理。

马老师:看来是有备而来。

ResultTest(测试结果)


  • 我先启动服务器端,再依次启动三个客户端:
    一起来学Netty吧——NIO简单实现群聊

    一下启动不了3个客户端的,注意看一下配置:
    一起来学Netty吧——NIO简单实现群聊
    next:
    一起来学Netty吧——NIO简单实现群聊
    我目前的 IDEA 版本为 2019年3月的,勾选上图中的“允许平行运行”选项就OK了;
    其他版本的设置可能略有不同,有的版本是让你取消勾选一个“single run”,就是取消单独运行。

  • 让其中一个客户端发送消息:
    一起来学Netty吧——NIO简单实现群聊
    看看服务器端的反应:(验证客户端与服务器端的通讯)
    一起来学Netty吧——NIO简单实现群聊
    再看看其他客户端:(验证服务器端的转发功能)
    一起来学Netty吧——NIO简单实现群聊

  • 我们换一个客户端重复上述操作:(严谨)
    一起来学Netty吧——NIO简单实现群聊
    为了一步到位 和 缩小图片尺寸,我清理了接收端的控制台。

    马老师:来,骗!来,偷袭!!
    一起来学Netty吧——NIO简单实现群聊

情况呢,就是这么个情况。Fine,that’s all,thank you.

本文地址:https://blog.csdn.net/weixin_43415201/article/details/112210905