Netty实现群聊系统
程序员文章站
2022-04-19 11:37:46
1 需求(1) 编写一个 Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)(2) 实现多人群聊(3) 服务器端:可以监测用户上线,离线,并实现消息转发功能(4) 客户端:通过 channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用(5) 户发送的消息(有服务器转发得到)目的:进一步理解Net非阻塞网络编程机制2 Code2.1 GroupChatConstspublic final class GroupChatConsts { private G...
1 需求
(1) 编写一个 Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
(2) 实现多人群聊
(3) 服务器端:可以监测用户上线,离线,并实现消息转发功能
(4) 客户端:通过 channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用
(5) 户发送的消息(有服务器转发得到)
目的:进一步理解Net非阻塞网络编程机制
2 Code
2.1 GroupChatConsts
public final class GroupChatConsts {
private GroupChatConsts() {
}
public static final Integer port = 8888;
public static final String host = "127.0.0.1";
}
2.2 GroupChatServer
(1) GroupChatServerHandler
package com.rosh.netty.chat.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description:
* @Author: rosh
* @Date: 2020/12/29 22:59
*/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个channel组,管理所有的channel
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 当建立连接时,第一个被执行
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//通知其他客户,有人加入该聊天室
channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 加入聊天 \n");
//加入聊天
channelGroup.add(channel);
}
/**
* 表示channel处于一个活跃的状态
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 上线了 ~");
}
/**
* 表示channel处于不活动的状态
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 离线了 ~");
}
/**
* channel断开连接会被触发
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 离开了\n");
}
/**
* 转发消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush("【客户】" + sdf.format(new Date()) + channel.remoteAddress() + "发送了消息: " + msg + "\n");
} else {
ch.writeAndFlush("【自己】" + sdf.format(new Date()) + "发送了消息: " + msg + "\n");
}
});
}
/**
* 发生异常时 关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
(2) GroupChatServerChannelInitializer
public class GroupChatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解码器
pipeline.addLast("decoder", new StringDecoder());
//编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己业务handler
pipeline.addLast("serverHandler", new GroupChatServerHandler());
}
}
(3) GroupChatServerHandler
public class GroupChatServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
//nio
.channel(NioServerSocketChannel.class)
//bossGroup,option,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。
.option(ChannelOption.SO_BACKLOG, 128)
//workerGroup,option,是否启动心跳包活机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new GroupChatServerChannelInitializer());
//绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(GroupChatConsts.port).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.3 GroupChatClient
(1) GroupChatClientHandler
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
(2) GroupChatClientChannelInitializetr
public class GroupChatClientChannelInitializetr extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解码器
pipeline.addLast("decoder", new StringDecoder());
//编码器
pipeline.addLast("encoder", new StringEncoder());
//自定义handler
pipeline.addLast("clientHandler", new GroupChatClientHandler());
}
}
(3) GroupChatClient
public class GroupChatClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new GroupChatClientChannelInitializetr());
ChannelFuture channelFuture = bootstrap.connect(GroupChatConsts.host, GroupChatConsts.port).sync();
//输入信息
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\n");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
3 测试
(1) 启动服务端,启动3个客户端
(2) 发送消息
本文地址:https://blog.csdn.net/qq_34125999/article/details/111939897