从BIO和NIO到Netty入门
程序员文章站
2022-06-21 13:44:28
从BIO和NIO到Netty入门一:BIO1.1:网络编程1.2:TCP通信1.3:UDP通信二:NIO三:Netty入门一:BIO1.1:网络编程概述通过计算机网络将多台计算机连接。计算机在网络中进行连接和通信时必须遵守一定规则,这种规则称为网络协议。网络通信协议有很多种,常用的TCP/IP和UDP协议。InetAddress1:概述jdk提供该类用于封装一个ip,并提供和ip相关方法。2:常用APIpublic class InetAddressTest {...
从BIO和NIO到Netty入门
一:网络编程
1.1:概述
通过计算机网络将多台计算机连接。计算机在网络中进行连接和通信时必须遵守一定规则,这种规则称为网络协议。
网络通信协议有很多种,常用的TCP/IP和UDP协议。
二:BIO
2.1:概述
jdk提供很多api,用于完成Java语言进行网络通信。如BIO通信(阻塞网络通信)和NIO通信(非阻塞网络通信)。
2.2:InetAddress
1:概述
jdk提供该类用于封装一个ip,并提供和ip相关方法。
2:常用API
public class InetAddressTest {
public static void main(String[] args) throws UnknownHostException {
//获取本地的ip封装类
InetAddress inetAddress = InetAddress.getLocalHost();
//根据ip封装类获取主机名和ip
String hostName = inetAddress.getHostName();
String ip = inetAddress.getHostAddress();
System.out.println("ip:" + ip + "===" + "hostName:" +hostName);
System.out.println("============================");
//根据主机名获取ip封装类
InetAddress bean = InetAddress.getByName("WIN-DECE6MN505D");
System.out.println(bean.getHostName() + "===" + bean.getHostAddress());
}
}
2.2:TCP通信
- 概述
是连接通信(三次握手四次挥手)。在通信的过程有严格的客户端和服务端。服务端要先起,客户端主动连接服务端。
- java(API)TCP通信(阻塞的)
1:服务端
public class UdpServer {
public static void main(String[] args) throws IOException {
//创建服务端的Socket
ServerSocket serverSocket = new ServerSocket(10086);
while (true) {
//接收数据,获取客户端的socket
Socket socket = serverSocket.accept();
new Thread() {
@Override
public void run() {
try {
//获取客输入流
InputStream inputStream = socket.getInputStream();
//创建目的地
BufferedOutputStream fileOut = new BufferedOutputStream(new FileOutputStream("uplod.jpg"));
byte[] buff = new byte[1024];
int len = -1;
while ((len = inputStream.read(buff)) != -1) {
//将文件写到目的地
fileOut.write(buff, 0, len);
}
//响应给客户端数据
OutputStream outputStream = socket.getOutputStream();
outputStream.write("文件上传成功".getBytes());
//关闭资源
inputStream.close();
fileOut.close();
outputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
2:客户端
public class UdpClient {
public static void main(String[] args) throws IOException {
//获取客户端Socket
Socket socket = new Socket("127.0.0.1", 10086);
//读取本地文件,并发送到服务端
BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("TCP图片上传案例.JPG"));
byte[] buff = new byte[1024];
OutputStream outputStream = socket.getOutputStream();
int len = -1;
while ((len = bufferedInputStream.read(buff)) != -1) {
//往客户端写入数据
outputStream.write(buff, 0, len);
}
//告诉服务端,客户端上传文件完毕
socket.shutdownOutput();
//获取响应数据
InputStream inputStream = socket.getInputStream();
byte[] info = new byte[1024];
inputStream.read(info);
System.out.println(new String(info, 0, buff.length));
//关闭资源
outputStream.close();
bufferedInputStream.close();
inputStream.close();
socket.close();
}
}
2.3:UDP通信
- 概述
是无连接通信。在数据传输时候,数据传输端和接收端不建立连接。
-
UDP传输协议图解
-
java(API)实现UDP通信(阻塞的)
1:接收端
public class UdpServer {
public static void main(String[] args) throws IOException {
//创建服务端港口
DatagramSocket serverSocket = new DatagramSocket(10086);
byte[] buff = new byte[1024];
//创建集装箱
DatagramPacket serverPacket = new DatagramPacket(buff, 1024);
//接收对象,并封装到集装箱中
serverSocket.receive(serverPacket);
//解析接受到数据
InetAddress inetAddress = serverPacket.getAddress();
byte[] data = serverPacket.getData();
int length = serverPacket.getLength();
//获取ip
String ip = inetAddress.getHostAddress();
String s = new String(data,0,length);
System.out.println("ip:" + ip + " ;" + "数据为:" + s);
//释放资源
serverSocket.close();
}
}
2:发送端
public class UdpClient {
public static void main(String[] args) throws IOException {
//创建港口
DatagramSocket clientSocket = new DatagramSocket();
byte[] buff = "hello udp".getBytes();
//创建集装箱
DatagramPacket clientPacket = new DatagramPacket(buff, buff.length
, InetAddress.getByName("127.0.0.1")
, 10086);
//发送集装箱
clientSocket.send(clientPacket);
//释放资源
clientSocket.close();
}
}
三:NIO
3.1: 概述
非阻塞网络通信,一个线程处理大量连接。jdk很多api用于网络通信。
NIO的三大核心:Channel(管道)、Buffer(缓冲区)、Selector(选择器)。NIO是基于Channel和Buffer对数据进行处理。
3.2: 常用API讲解(了解)
- Selector
监控通道是否有事件发生
public static Selector open(),得到一个选择器对象
public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKeys(),从内部集合中得到所有的 SelectionKey
- SelectionKey
1:Selector和通道注册关系
int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
2:常用方法
public abstract Selector selector(),得到与之关联的 Selector 对象
public abstract SelectableChannel channel(),得到与之关联的通道
public final Object attachment(),得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops),设置或改变监听事件
public final boolean isAcceptable(),是否可以 accept
public final boolean isReadable(),是否可以读
public final boolean isWritable(),是否可以写
- ServerSocketChannel
监听新客户端Socket连接
public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
- SocketChannel
网络IO通道,负责读写操作,且必须经过缓冲区。
public static SocketChannel open(),得到一个 SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public boolean connect(SocketAddress remote),连接服务器
public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src),往通道里写数据
public int read(ByteBuffer dst),从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close(),关闭通道
- Buffer(缓冲区,*父类)
是一个容器
ByteBuffer 类(二进制数据),方法如下:
public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
public abstract byte[] get(); 从缓冲区获得字节数据
public final byte[] array(); 把缓冲区数据转换成字节数组
public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
3.3: NIO通信案例
1:服务端
public class NioServer {
public static void main(String[] args) throws Exception {
//1.得到老大 ServerSocketChennal
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2.得到监控通道对象Selector
Selector selector = Selector.open();
//3.对老大进行配置(ip,非堵塞)
serverSocketChannel.bind(new InetSocketAddress(10086));
serverSocketChannel.configureBlocking(false);
//4.将老大注册给监听器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//5.处理业务
while (true) {
//监控客户端
if(selector.select(2000) == 0){
//没有客户端连接,异步处理别的事情
System.out.println("没有客户端连接,去处理别的事情了");
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if(selectionKey.isAcceptable()){
//客户端连接请求
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//注册到selector
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if(selectionKey.isReadable()){
//读取客户端数据
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
channel.read(buffer);
System.out.println("客户端发来数据:" + new String(buffer.array()));
}
//将key从集合中移除
iterator.remove();
}
}
}
}
2:客户端
public class NioClient {
public static void main(String[] args) throws IOException {
//获取管道
SocketChannel socketChannel = SocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 10086);
//管道设置非堵塞
socketChannel.configureBlocking(false);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("连接不成功处理别的事情");
}
}
//存到缓存区
ByteBuffer buffer = ByteBuffer.wrap("我是客户端,我向服务端发送数据".getBytes());
//发送数据
socketChannel.write(buffer);
System.in.read();
}
}
3.4: 网络聊天
- 服务端
public class ChatServer {
private Selector selector;
private ServerSocketChannel listenerChannel;
private static final int PORT = 9999; //服务器端口
public ChatServer() {
try {
// 得到选择器
selector = Selector.open();
// 打开监听通道
listenerChannel = ServerSocketChannel.open();
// 绑定端口
listenerChannel.bind(new InetSocketAddress(PORT));
// 设置为非阻塞模式
listenerChannel.configureBlocking(false);
// 将选择器绑定到监听通道并监听 accept 事件
listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("Chat Server is ready.......");
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
try {
//不停轮询
while (true) {
//获取就绪 channel
if (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 监听到 accept
if (key.isAcceptable()) {
SocketChannel sc = listenerChannel.accept();
//非阻塞模式
sc.configureBlocking(false);
//注册到选择器上并监听 read
sc.register(selector, SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress().toString().substring(1) + "上线了...");
//将此对应的 channel 设置为 accept,接着准备接受其他客户端请求
key.interestOps(SelectionKey.OP_ACCEPT);
}
//监听到 read
if (key.isReadable()) {
readMsg(key); //读取客户端发来的数据
}
//一定要把当前 key 删掉,防止重复处理
iterator.remove();
}
} else {
System.out.println("独自在寒风中等候...");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void readMsg(SelectionKey key) {
SocketChannel channel = null;
try {
// 得到关联的通道
channel = (SocketChannel) key.channel();
//设置 buffer 缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//从通道中读取数据并存储到缓冲区中
int count = channel.read(buffer);
//如果读取到了数据
if (count > 0) {
//把缓冲区数据转换为字符串
String msg = new String(buffer.array());
printInfo(msg);
//将关联的 channel 设置为 read,继续准备接受数据
key.interestOps(SelectionKey.OP_READ);
BroadCast(channel, msg); //向所有客户端广播数据
}
buffer.clear();
} catch (IOException e) {
try {
//当客户端关闭 channel 时,进行异常如理
printInfo(channel.getRemoteAddress().toString().substring(1) + "下线了...");
key.cancel(); //取消注册
channel.close(); //关闭通道
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
public void BroadCast(SocketChannel except, String msg) throws IOException {
System.out.println("发送广播...");
//广播数据到所有的 SocketChannel 中
for (SelectionKey key : selector.keys()) {
Channel targetchannel = key.channel();
//排除自身
if (targetchannel instanceof SocketChannel && targetchannel != except) {
SocketChannel dest = (SocketChannel) targetchannel;
//把数据存储到缓冲区中
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//往通道中写数据
dest.write(buffer);
}
}
}
private void printInfo(String str) { //往控制台打印消息
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
- 客户端
public class ChatClient {
private final String HOST = "127.0.0.1"; //服务器地址
private int PORT = 9999; //服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String userName;
public ChatClient() throws IOException {
//得到选择器
selector = Selector.open();
//连接远程服务器
socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//注册选择器并设置为 read
socketChannel.register(selector, SelectionKey.OP_READ);
//得到客户端 IP 地址和端口信息,作为聊天用户名使用
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("---------------Client(" + userName + ") is ready---------------");
}
//向服务器端发送数据
public void sendMsg(String msg) throws Exception {
//如果控制台输入 bye 就关闭通道,结束聊天
if (msg.equalsIgnoreCase("bye")) {
socketChannel.close();
socketChannel = null;
return;
}
msg = userName + "说: " + msg;
try {
//往通道中写数据
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
//从服务器端接收数据
public void receiveMsg() {
try {
int readyChannels = selector.select();
if (readyChannels > 0) { //有可用通道
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = (SelectionKey) keyIterator.next();
if (sk.isReadable()) {
//得到关联的通道
SocketChannel sc = (SocketChannel) sk.channel();
//得到一个缓冲区
ByteBuffer buff = ByteBuffer.allocate(1024);
//读取数据并存储到缓冲区
sc.read(buff);
//把缓冲区数据转换成字符串
String msg = new String(buff.array());
System.out.println(msg.trim());
}
keyIterator.remove(); //删除当前 SelectionKey,防止重复处理
}
} else {
System.out.println("人呢?都去哪儿了?没人聊天啊...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 聊天端
public class TestChat {
public static void main(String[] args) throws Exception {
//创建一个聊天客户端对象
final ChatClient chatClient = new ChatClient();
new Thread() { //单独开一个线程不断的接收服务器端广播的数据
public void run() {
while (true) {
chatClient.receiveMsg();
try { //间隔 3 秒
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
Scanner scanner = new Scanner(System.in);
//在控制台输入数据并发送到服务器端
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
chatClient.sendMsg(msg);
}
}
}
四:Netty入门
4.1:Netty整体架构
4.1.1:概述
Netty框架是Java开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。
Netty是基于NIO网络编程框架,用于简化NIO编程。
4.1.2:线程模型
Netty 抽象出两组线程池:
BossGroup 专门负责接收客户端连接
WorkerGroup 专门负责网络读写操作。
NioEventLoop 表示一个不断循环执行处理任务的线程。
每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
4.1.3:异步模型
Netty的异步模型是建立在 future 和 callback 的之上的。Future在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程。
在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。
4.2:核心API
4.2.1:ChannelHandler和其实现类
定义很多事件处理方法。
public void channelActive(ChannelHandlerContext ctx),通道就绪事件
public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
4.2.2:Pipeline和ChannelPipeline
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。
方法:
ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置
4.2.3:ChannelHandlerContext
事 件 处 理 器 上 下 文 对 象,ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用
方法:
ChannelFuture close(),关闭通道
ChannelOutboundInvoker flush(),刷新
ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
4.2.4:ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。
4.2.5:ChannelFuture
表示 Channel 中异步 I/O 操作的结果。I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作的处理状态。
常用方法
Channel channel(),返回当前正在进行 IO 操作的通道
ChannelFuture sync(),等待异步操作执行完毕
4.2.6:EventLoopGroup 和其实现类 NioEventLoopGroup
EventLoopGroup 是一组 EventLoop 的抽象。
4.2.7: ServerBootstrap和Bootstrap
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置。
Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。
常用方法:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop
public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
public <T> B option(ChannelOption<T> option, T value),用来给 ServerChannel 添加配置
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value),用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的 handler)
public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端
4.2.8:Unpooled类
Netty 提供的一个专门用来操作缓冲区的工具类。
常用方法
public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
4.3:入门案例
4.3.1:NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Server: " + ctx);
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发来的消息 : " + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕事件
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("就是没钱", CharsetUtil.UTF_8));
}
/**
* 异常发生事件
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4.3.2:NettyServer
public class NettyServer {
public static void main(String[] args) throws Exception {
//1.创建一个线程组:用来处理网络事件(接受客户端连接)
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建一个线程组:用来处理网络事件(处理通道 IO 操作)
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务器端启动助手来配置参数
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) //4.设置两个线程组 EventLoopGroup
.channel(NioServerSocketChannel.class)//5.使用 NioServerSocketChannel 作为服务器端通道实现
.option(ChannelOption.SO_BACKLOG, 128) //6.设置线程队列中等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //7.保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //8.创建一个通道初始化对象
public void initChannel(SocketChannel sc) {
//9.往 Pipeline 链中添加自定义的业务处理 handler
sc.pipeline().addLast(new NettyServerHandler()); //服务器端业务处理类
System.out.println(".......Server is ready.......");
}
});
//10.启动服务器端并绑定端口,等待接受客户端连接(非阻塞)
ChannelFuture cf = b.bind(9999).sync();
System.out.println("......Server is Starting......");
//11.关闭通道,关闭线程池
cf.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
4.3.3:NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪事件
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Client: " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("老板,还钱吧", CharsetUtil.UTF_8));
}
/**
* 通道读取数据事件
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("服务器端发来的消息 : " + in.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕事件
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 异常发生事件
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
4.3.4:NettyClient
public class NettyClient {
public static void main(String[] args) throws Exception {
//1.创建一个 EventLoopGroup 线程组
EventLoopGroup group = new NioEventLoopGroup();
//2.创建客户端启动助手
Bootstrap b = new Bootstrap();
b.group(group) //3.设置 EventLoopGroup 线程组
.channel(NioSocketChannel.class) //4.使用 NioSocketChannel 作为客户端通道实现
.handler(new ChannelInitializer<SocketChannel>() { //5.创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel sc) {
//6.往 Pipeline 链中添加自定义的业务处理 handler
sc.pipeline().addLast(new NettyClientHandler()); //客户端业务处理类
System.out.println("......Client is ready.......");
}
});
//7.启动客户端,等待连接上服务器端(非阻塞)
ChannelFuture cf = b.connect("127.0.0.1", 9999).sync();
//8.等待连接关闭(非阻塞)
cf.channel().closeFuture().sync();
}
}
4.4:网络聊天
4.4.1:ChatServerHandler
//自定义一个服务器端业务处理类
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channels = new ArrayList<>();
@Override //通道就绪
public void channelActive(ChannelHandlerContext ctx) {
Channel incoming = ctx.channel();
channels.add(incoming);
System.out.println("[Server]:" + incoming.remoteAddress().toString().substring(1) + "在线");
}
@Override //通道未就绪
public void channelInactive(ChannelHandlerContext ctx) {
Channel incoming = ctx.channel();
channels.remove(incoming);
System.out.println("[Server]:" + incoming.remoteAddress().toString().substring(1) + "掉线");
}
@Override //读取数据
protected void channelRead0(ChannelHandlerContext ctx, String s) {
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming) { //排除当前通道
channel.writeAndFlush("[" + incoming.remoteAddress().toString().substring(1) + "]说: " + s + "\n");
}
}
}
@Override //发生异常
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel incoming = ctx.channel();
System.out.println("[Server]:" + incoming.remoteAddress().toString().substring(1) + "异常");
ctx.close();
}
}
4.4.2:ChatServer
public class ChatServer {
//服务器端端口号
private int port;
public ChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//得到 Pipeline 链
ChannelPipeline pipeline = ch.pipeline();
//往 Pipeline 链中添加一个解码器
pipeline.addLast("decoder", new StringDecoder());
//往 Pipeline 链中添加一个编码器
pipeline.addLast("encoder", new StringEncoder());
//往 Pipeline 链中添加一个自定义的业务处理对象
pipeline.addLast("handler", new ChatServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("Netty Chat Server 启动......");
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("Netty Chat Server 关闭......");
}
}
public static void main(String[] args) throws Exception {
new ChatServer(9999).run();
}
}
4.4.3:ChatClientHandler
//自定义一个客户端业务处理类
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s.trim());
}
}
4.4.4:ChatClient
//聊天程序客户端
public class ChatClient {
private final String host; //服务器端 IP 地址
private final int port; //服务器端端口号
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); //得到 Pipeline 链
//往 Pipeline 链中添加一个解码器
pipeline.addLast("decoder", new StringDecoder());
//往 Pipeline 链中添加一个编码器
pipeline.addLast("encoder", new StringEncoder());
//往 Pipeline 链中添加一个自定义的业务处理对象
pipeline.addLast("handler", new ChatClientHandler());
}
});
Channel channel = bootstrap.connect(host, port).sync().channel();
System.out.println("--------" + channel.localAddress().toString().substring(1) + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new ChatClient("127.0.0.1", 9999).run();
}
}
本文地址:https://blog.csdn.net/qq_33208912/article/details/107644949
推荐阅读
-
《Visual C# 从入门到精通》第一章使用变量、操作符和表达式——读书笔记
-
从BIO到Netty的演变
-
文件和异常——python从编程入门到实践
-
flask从入门到精通详细笔记,文末附带视频教程和示例代码
-
文件和异常练习2——python编程从入门到实践
-
iOS开发从入门到精通-- 警告对话框UIAlertView和等待提示器UIActivityIndicatorView
-
荐 javascript从入门到跑路-----小文的js学习笔记(17)------动态属性、复制变量值、传递参数和检测类型
-
Oracle从入门到精通 限定查询和排序查询的问题
-
《Python编程:从入门到实践》个人学习笔记/心得(菜鸟瞎扯淡) Chapter2 变量和简单数据类型
-
WPF MVVM从入门到精通7:关闭窗口和打开新窗口