欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty 开发入门

程序员文章站 2022-06-21 21:08:38
...
1.准备jar

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j-api.version>1.7.5</slf4j-api.version>
<logback.version>1.0.13</logback.version>
</properties>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>



<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.14.Final</version>
</dependency>


2.编写netty 服务端

package com.boce.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* Discards any incoming data.
*/
public class DiscardServer {

private int port;

public DiscardServer(int port) {
     this.port = port;
}

public void run() throws Exception {
     EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
     EventLoopGroup workerGroup = new NioEventLoopGroup(80);
     try {
         ServerBootstrap b = new ServerBootstrap(); // (2)
         b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class) // (3)
          .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
              @Override
              public void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().addLast(new ChildHandler());
              }
          })
          .option(ChannelOption.SO_BACKLOG, 128)       // (5)队列长度
          .option(ChannelOption.SO_REUSEADDR, true) //端口可以重用
          .option(ChannelOption.TCP_NODELAY, true) //实时发送
          .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

         // Bind and start to accept incoming connections.
         ChannelFuture f = b.bind(port).sync(); // (7)

         // Wait until the server socket is closed.
         // In this example, this does not happen, but you can do that to gracefully
         // shut down your server.
         f.channel().closeFuture().sync();
     } finally {
         workerGroup.shutdownGracefully();
         bossGroup.shutdownGracefully();
     }
}

public static void main(String[] args) throws Exception {
System.out.println("start ....");
     int port;
     if (args.length > 0) {
         port = Integer.parseInt(args[0]);
     } else {
         port = 8080;
     }
     new DiscardServer(port).run();
}
}



package com.boce.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ChildHandler extends ChannelInitializer<SocketChannel>{

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new DiscardServerHandler());


}

}



package com.boce.netty.server;

import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;

/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)


static final Logger log = LoggerFactory.getLogger("DiscardServerHandler");

private int count;

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("------------channelRegistered "+ctx.channel().remoteAddress().toString());
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("------------channelUnregistered======= ");
}

@Override
public void channelRead(ChannelHandlerContext ctx,  Object msg) { // (2)

log.info("---------------------channelRead");
try {
String backMsg = (String) msg;

System.out.println("read::" + backMsg + ";this is count=" + ++count);
int wortTime = 0;

Random random = new Random();
wortTime = random.nextInt(80);
try {
Thread.sleep(wortTime);
} catch (InterruptedException e) {
e.printStackTrace();
}

String result = "back===================="+backMsg+System.getProperty("line.separator");
ctx.writeAndFlush(result);

} finally {
ReferenceCountUtil.release(msg); // (2)
}

}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("===================channelreadComplete");
super.channelReadComplete(ctx);
ctx.close();
}

public void channelActive(final ChannelHandlerContext ctx) throws Exception {

log.info("------channelActive");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
log.error("------exceptionCaught");

cause.printStackTrace();
ctx.close();
}
}



3.编写客户端
package com.boce.netty.client;

import java.util.concurrent.atomic.AtomicInteger;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class TimeClient implements Runnable{

private static AtomicInteger aci = new AtomicInteger(0);

public void client() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
String host = "192.168.1.201";
int port = 8080;
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());//发送时编码格式
ch.pipeline().addLast(new StringDecoder());//接收时编码格式
ch.pipeline().addLast(new TimeClientHandler(aci.incrementAndGet()));
}
});

// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)

// Wait until the connection is closed.
f.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

@Override
public void run() {
for(int j=0;j<1;j++){
client();
}


}

public static void main(String[] args) throws Exception {

for(int s=0;s<1;s++){
TimeClient client = new TimeClient();
Thread thread = new Thread(client);
thread.start();
}


System.out.println("client end =======================================");
}


}


package com.boce.netty.client;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
 
public TimeClientHandler(int count) {
}



@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("client ============channelread -=========");
String body = (String)msg;
System.out.println("client:::"+body);
    }



public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("client==============channelReadComplete=================");
if(null !=ctx){
ctx.close();
}
};

   
   
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("client---client---channelActive");
    String message ="channelactive=======================================send----->"+System.getProperty("line.separator");
ctx.writeAndFlush(message);
   

}
   
   
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}