NIO编程(netty的使用)
NIO编程(netty的使用)
1.传统io(使用socket网络编程测试)
服务端监听客户端发送的数据
public class IOServer {
public static void main(String[] args) throws IOException {
//
ServerSocket serverSocket = new ServerSocket(9002);
while (true){
//用阻塞方法获取一个新连接
final Socket accept = serverSocket.accept();
//有新连接的时候创建一个线程,获取该连接中的数据
new Thread(){
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
//获取当前线程的名字
String currentName = Thread.currentThread().getName();
//开始读取该连接中的数据
try {
InputStream is = accept.getInputStream();
byte[] bytes = new byte[1024];
//此处有一个死循环,当没有数据的时候,循环仍在继续,浪费系统性能
while (true){
int len;
while ((len = is.read(bytes))!=-1){
System.out.println("线程"+currentName+":"+new String(bytes, "utf-8"));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
客户端向服务端发送数据(5个线程向服务端发送数据)
public class IOClient {
public static void main(String[] args) {
for (int i=0;i<5;i++){
WriteData writeData = new WriteData();
writeData.start();
}
}
public static class WriteData extends Thread{
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
try {
Socket socket = new Socket("127.0.0.1", 9002);
OutputStream os = socket.getOutputStream();
while (true){
os.write(("测试数据").getBytes());
os.flush();
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。如果在用户数量较少的情况下运行是没有问题的,但是对于用户数量比较多的业务来说,服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了
2.NIO
Netty框架包含如下的组件:
-
ServerBootstrap :用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。
-
Bootstrap:不接受新的连接,并且是在父通道类完成一些操作,一般用于客户端的。
-
Channel:对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配和封装的组件。
-
EventLoop:处理所有注册其上的channel的I/O操作。通常情况一个EventLoop可为多个channel提供服务。
-
EventLoopGroup:包含有多个EventLoop的实例,用来管理 event Loop 的组件,可以理解为一个线程池,内部维护了一组线程。
-
ChannelHandler和ChannelPipeline:例如一个流水线车间,当组件从流水线头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线尾部时商品组装完成。流水线相当于
ChannelPipeline
,流水线工人相当于ChannelHandler
,源头的组件当做event。 -
ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler添加到channel的ChannelPipeline处理链路中。
-
ChannelFuture:与jdk中线程的Future接口类似,即实现并行处理的效果。可以在操作执行成功或失败时自动触发监听器中的事件处理方法。
public class NettyServer {
public static void main(String[] args) {
//用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* EventLoopGroup 包含有多个EventLoop的实例,用来管理 event Loop 的组件,可以理解为一个线程池,内部维护了一组线程
* EventLoop 处理所有注册其上的channel的I/O操作。通常情况一个EventLoop可为多个channel提供服务
*/
//接受新连接的线程(相当于selector)
NioEventLoopGroup boos = new NioEventLoopGroup();
//读取数据的线程
NioEventLoopGroup worker = new NioEventLoopGroup();
//服务端执行
serverBootstrap
.group(boos,worker)
//对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配和封装的组件
.channel(NioServerSocketChannel.class)
//ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler添加到channel的ChannelPipeline处理链路中
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 组件从流水线头部进入,流水线上的工人按顺序对组件进行加工
// 流水线相当于ChannelPipeline
// 流水线工人相当于ChannelHandler
//pipeline代表流水线,addLast添加工人
//StringDecoder 解码器
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
//自定义工人,需要我们告诉他干啥事
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("输出为:" +s);
}
});
}
}).bind(9002);
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//不接受新的连接,并且是在父通道类中完成一些操作,通常用于客户端中
Bootstrap bootstrap = new Bootstrap();
//EventLoopGroup中包含了多个EventLoop实例,用来管理EventLoop的组件
NioEventLoopGroup group = new NioEventLoopGroup();
//客户端执行
bootstrap
.group(group)
//对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配和封装的组件
.channel(NioSocketChannel.class)
//ChannelInitializer 用于对刚创建的channel进行初始化,将ChannelHandler添加到channel的ChannelPipeline处理链路中
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 组件从流水线头部进入,流水线上的工人按顺序对组件进行加工
// 流水线相当于ChannelPipeline
// 流水线工人相当于ChannelHandler
//pipeline代表流水线,addLast添加工人
//StringEncoder 编码器
channel.pipeline().addLast(new StringEncoder());
}
});
//客户端连接服务端
Channel channel = bootstrap.connect("127.0.0.1", 9002).channel();
while (true){
channel.writeAndFlush("测试数据");
Thread.sleep(2000);
}
}
}
3.NIO结合websocket使用
自定义一个工人
//TextWebSocketFrame 为websocket服务的传输格式
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 存放websocket连接的map,key为用户id
*/
public static ConcurrentHashMap<String, Channel> userChannelMap=new ConcurrentHashMap<>();
/**
* 用户请求websocket服务端执行的方法(用户主动)
* @param channelHandlerContext
* @param textWebSocketFrame
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext,
TextWebSocketFrame textWebSocketFrame) throws Exception {
//约定用户第一次请求携带的数据:{"userId":"1"}
//获取通信的数据
String json = textWebSocketFrame.text();
Map<String,String> parseObject = JSON.parseObject(json, Map.class);
String userId = parseObject.get("userId");
//从map中取出连接
Channel channel = userChannelMap.get(userId);
if (channel==null){
//第一次请求的时候需要建立websocket连接
channel = channelHandlerContext.channel();
//放入map中
userChannelMap.put(userId,channel);
}
//不是第一次请求,已有websocket连接
//将消息推送到客户端
Map<String,Integer> data=new HashMap<>();
data.put("sysNoticeCount",2);
Result<Map<String, Integer>> result = new Result<>(true, StatusCode.OK, "测试消息成功", data);
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(result)));
}
}
netty服务
public class NettyServer {
public void start(int port){
System.out.println("netty开始启动。。。。");
//创建一个用于接收新连接的对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用于接收新连接
NioEventLoopGroup boos = new NioEventLoopGroup();
//用于操作连接的数据
NioEventLoopGroup worker = new NioEventLoopGroup();
//开始一个服务
serverBootstrap
.group(boos,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//请求消息解码器
channel.pipeline().addLast(new HttpServerCodec());
//将多个消息转换为单一的request或者response对象
channel.pipeline().addLast(new HttpObjectAggregator(65536));
//处理websocket的消息事件
channel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//自定义一个工人
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
channel.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
使用netty最主要的就是要知道怎么使用工人
下面是三个和websocket有关的netty已经写好的工人
HttpServerCodec
请求消息解码器
HttpObjectAggregator
将多个消息转换为单一的request或者response对象
WebSocketServerProtocolHandler
处理websocket的消息事件
使用spring自动创建一个实例
@Configuration
public class NettyConfig {
@Bean("nettyServer")
public NettyServer createNettyServer(){
NettyServer nettyServer = new NettyServer();
//开启一个线程为netty服务
new Thread(){
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
//启动netty
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}
上一篇: 推荐与时间相关函数大全
下一篇: 淘宝ip地址查询类分享_PHP教程