欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty实现群聊系统

程序员文章站 2022-04-19 11:37:46
1 需求(1) 编写一个 Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)(2) 实现多人群聊(3) 服务器端:可以监测用户上线,离线,并实现消息转发功能(4) 客户端:通过 channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用(5) 户发送的消息(有服务器转发得到)目的:进一步理解Net非阻塞网络编程机制2 Code2.1 GroupChatConstspublic final class GroupChatConsts { private G...

1 需求

(1) 编写一个 Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
(2) 实现多人群聊
(3) 服务器端:可以监测用户上线,离线,并实现消息转发功能
(4) 客户端:通过 channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用
(5) 户发送的消息(有服务器转发得到)
目的:进一步理解Net非阻塞网络编程机制

2 Code

2.1 GroupChatConsts

public final class GroupChatConsts {

    private GroupChatConsts() {
    }


    public static final Integer port = 8888;

    public static final String host = "127.0.0.1";

}

2.2 GroupChatServer

(1) GroupChatServerHandler

package com.rosh.netty.chat.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description:
 * @Author: rosh
 * @Date: 2020/12/29 22:59
 */
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {


    /**
     * 定义一个channel组,管理所有的channel
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 当建立连接时,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //通知其他客户,有人加入该聊天室
        channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 加入聊天 \n");
        //加入聊天
        channelGroup.add(channel);
    }


    /**
     * 表示channel处于一个活跃的状态
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 上线了 ~");
    }

    /**
     *  表示channel处于不活动的状态
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 离线了 ~");
    }


    /**
     * channel断开连接会被触发
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 离开了\n");
    }

    /**
     * 转发消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("【客户】" + sdf.format(new Date()) + channel.remoteAddress() + "发送了消息: " + msg + "\n");
            } else {
                ch.writeAndFlush("【自己】" + sdf.format(new Date()) + "发送了消息: " + msg + "\n");
            }
        });
    }

    /**
     * 发生异常时 关闭通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

(2) GroupChatServerChannelInitializer

public class GroupChatServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        //解码器
        pipeline.addLast("decoder", new StringDecoder());
        //编码器
        pipeline.addLast("encoder", new StringEncoder());
        //加入自己业务handler
        pipeline.addLast("serverHandler", new GroupChatServerHandler());


    }
}

(3) GroupChatServerHandler

public class GroupChatServer {


    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    //nio
                    .channel(NioServerSocketChannel.class)
                    //bossGroup,option,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //workerGroup,option,是否启动心跳包活机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new GroupChatServerChannelInitializer());
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(GroupChatConsts.port).sync();
            //监听关闭事件
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


}

2.3 GroupChatClient

(1) GroupChatClientHandler

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

(2) GroupChatClientChannelInitializetr

public class GroupChatClientChannelInitializetr  extends ChannelInitializer<SocketChannel>{
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        //解码器
        pipeline.addLast("decoder", new StringDecoder());
        //编码器
        pipeline.addLast("encoder", new StringEncoder());
        //自定义handler
        pipeline.addLast("clientHandler", new GroupChatClientHandler());
    }
}

(3) GroupChatClient

public class GroupChatClient {

    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new GroupChatClientChannelInitializetr());
            ChannelFuture channelFuture = bootstrap.connect(GroupChatConsts.host, GroupChatConsts.port).sync();
            //输入信息
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg + "\n");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }

}


3 测试

(1) 启动服务端,启动3个客户端
Netty实现群聊系统
(2) 发送消息
Netty实现群聊系统
Netty实现群聊系统
Netty实现群聊系统

本文地址:https://blog.csdn.net/qq_34125999/article/details/111939897