netty 开发入门
程序员文章站
2022-06-21 21:08:38
...
1.准备jar
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j-api.version>1.7.5</slf4j-api.version>
<logback.version>1.0.13</logback.version>
</properties>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.14.Final</version>
</dependency>
2.编写netty 服务端
package com.boce.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(80);
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChildHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)队列长度
.option(ChannelOption.SO_REUSEADDR, true) //端口可以重用
.option(ChannelOption.TCP_NODELAY, true) //实时发送
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
System.out.println("start ....");
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
package com.boce.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChildHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new DiscardServerHandler());
}
}
package com.boce.netty.server;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
static final Logger log = LoggerFactory.getLogger("DiscardServerHandler");
private int count;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("------------channelRegistered "+ctx.channel().remoteAddress().toString());
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("------------channelUnregistered======= ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
log.info("---------------------channelRead");
try {
String backMsg = (String) msg;
System.out.println("read::" + backMsg + ";this is count=" + ++count);
int wortTime = 0;
Random random = new Random();
wortTime = random.nextInt(80);
try {
Thread.sleep(wortTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = "back===================="+backMsg+System.getProperty("line.separator");
ctx.writeAndFlush(result);
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("===================channelreadComplete");
super.channelReadComplete(ctx);
ctx.close();
}
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
log.info("------channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
log.error("------exceptionCaught");
cause.printStackTrace();
ctx.close();
}
}
3.编写客户端
package com.boce.netty.client;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class TimeClient implements Runnable{
private static AtomicInteger aci = new AtomicInteger(0);
public void client() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
String host = "192.168.1.201";
int port = 8080;
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());//发送时编码格式
ch.pipeline().addLast(new StringDecoder());//接收时编码格式
ch.pipeline().addLast(new TimeClientHandler(aci.incrementAndGet()));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
@Override
public void run() {
for(int j=0;j<1;j++){
client();
}
}
public static void main(String[] args) throws Exception {
for(int s=0;s<1;s++){
TimeClient client = new TimeClient();
Thread thread = new Thread(client);
thread.start();
}
System.out.println("client end =======================================");
}
}
package com.boce.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
public TimeClientHandler(int count) {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("client ============channelread -=========");
String body = (String)msg;
System.out.println("client:::"+body);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("client==============channelReadComplete=================");
if(null !=ctx){
ctx.close();
}
};
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("client---client---channelActive");
String message ="channelactive=======================================send----->"+System.getProperty("line.separator");
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j-api.version>1.7.5</slf4j-api.version>
<logback.version>1.0.13</logback.version>
</properties>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.14.Final</version>
</dependency>
2.编写netty 服务端
package com.boce.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(80);
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChildHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)队列长度
.option(ChannelOption.SO_REUSEADDR, true) //端口可以重用
.option(ChannelOption.TCP_NODELAY, true) //实时发送
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
System.out.println("start ....");
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
package com.boce.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChildHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new DiscardServerHandler());
}
}
package com.boce.netty.server;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
static final Logger log = LoggerFactory.getLogger("DiscardServerHandler");
private int count;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("------------channelRegistered "+ctx.channel().remoteAddress().toString());
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("------------channelUnregistered======= ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
log.info("---------------------channelRead");
try {
String backMsg = (String) msg;
System.out.println("read::" + backMsg + ";this is count=" + ++count);
int wortTime = 0;
Random random = new Random();
wortTime = random.nextInt(80);
try {
Thread.sleep(wortTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = "back===================="+backMsg+System.getProperty("line.separator");
ctx.writeAndFlush(result);
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("===================channelreadComplete");
super.channelReadComplete(ctx);
ctx.close();
}
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
log.info("------channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
log.error("------exceptionCaught");
cause.printStackTrace();
ctx.close();
}
}
3.编写客户端
package com.boce.netty.client;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class TimeClient implements Runnable{
private static AtomicInteger aci = new AtomicInteger(0);
public void client() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
String host = "192.168.1.201";
int port = 8080;
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());//发送时编码格式
ch.pipeline().addLast(new StringDecoder());//接收时编码格式
ch.pipeline().addLast(new TimeClientHandler(aci.incrementAndGet()));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
@Override
public void run() {
for(int j=0;j<1;j++){
client();
}
}
public static void main(String[] args) throws Exception {
for(int s=0;s<1;s++){
TimeClient client = new TimeClient();
Thread thread = new Thread(client);
thread.start();
}
System.out.println("client end =======================================");
}
}
package com.boce.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
public TimeClientHandler(int count) {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("client ============channelread -=========");
String body = (String)msg;
System.out.println("client:::"+body);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("client==============channelReadComplete=================");
if(null !=ctx){
ctx.close();
}
};
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("client---client---channelActive");
String message ="channelactive=======================================send----->"+System.getProperty("line.separator");
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}