Netty学习笔记之用NIO实现一个echo服务器
需求分析
了解了NIO以及其组件,下面我要用NIO编程知识来实现一个echo服务器。
所谓echo服务器,及客户端像给服务器发送了什么消息,服务器就发回什么消息。
下面我们来尝试实现这个服务器。
代码实战
话不多说,先直接上代码。
public class MainDemo1 {
//处理拿到可读事件的socket
static class clientProcessor implements Runnable{
private Selector selector;
public clientProcessor(Selector selector){
this.selector = selector;
}
@Override
public void run() {
while (true){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if (key.isValid() == false)
{
continue;
}
if (key.isReadable())
{//代码①
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
SocketChannel clientChannel = (SocketChannel) key.channel();
int read = clientChannel.read(byteBuffer);
if (read == -1){
key.cancel();
clientChannel.close();
}else {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
System.out.println(bytes);
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException {
//新建服务端管道对象并设置为非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
//基于机器性能创建选择器数组
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
//初始化数组,依次启动线程
for (int i = 0; i < selectors.length; i++) {
final Selector selector = Selector.open();
selectors[i] = selector;
new Thread(new clientProcessor(selector)).start();
}
AtomicInteger id = new AtomicInteger();
Selector selector = Selector.open();
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
iterator.next();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.register(selectors[id.getAndIncrement()%selectors.length], SelectionKey.OP_READ);
iterator.remove();
}
}
}
}
如上代码,已经可以初步实现我们所需要的服务器,分析一下代码。
先创建一个服务端管道,服务端管道注册选择器的接入事件,然后循环select等待客户端的接入。
客户端接入后,在自己的线程内,selector返回调用后产生的选择键集合,之后遍历集合,判断事件,当是接入事件时,拿到客户端通道,再将客户端通道注册为可读事件到选择器上,再次select,等待数据的准备。
当数据准备好后,selector再次返回调用后产生的选择键集合,遍历集合,这次是可读事件,判断为可读事件后,读取客户端通道数据,并回写给客户端。
这样我们就实现了一个简单的echo服务器,但是我们现在需要给这个服务器加点功能,让它能更好的实现对客户端信息的回写。
增加需求
我们规定我们的echo服务器的实现需要有以下特点:
- 服务器原样返回客户端发送的信息。
- 客户端发送的信息以’\r’作为一个消息的结尾,一个消息的最大长度不超过128。
- 客户端会一次发送多个消息,服务端需按顺序原样返回。
我们先来分析一下需求。
针对需求第二三,客户端发送的消息需要以“/r”作为消息的结尾。所以为了把服务器的消息区分开来,我们不能以定长来处理数据了。同时我们需要考虑一下tcp的拆包和粘包。
什么是tcp的拆包和粘包呢?tcp是面向流的协议,我们无法知道一个数据包的边界,所以在接收数据时,可能会因为一次数据包过大而分次填充到socket缓冲区,这就是tcp的拆包;而多个数据包一起读取并填充到socket的缓冲区中,便称为tcp的粘包。
有了以上分析,我们可以得到以下代码:
public class MainDemo2 {
//处理拿到可读事件的socket
static class clientProcessor implements Runnable {
private Selector selector;
public clientProcessor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isValid() == false) {
continue;
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = (ByteBuffer) key.attachment();
int read = clientChannel.read(readBuffer);
if (read == -1) {
key.cancel();
clientChannel.close();
} else {
readBuffer.flip();
int position = readBuffer.position();
int limit = readBuffer.limit();
List<ByteBuffer> buffers = new ArrayList<>();
for (int i = position; i < limit; i++) {
if (readBuffer.get(i) == '\r') ;
{
ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
readBuffer.limit(i + 1);
message.put(readBuffer);
readBuffer.limit(limit);
message.flip();
buffers.add(message);
// byte[] bytes = new byte[message.limit()];
// message.get(bytes);
// System.out.println(bytes);
}
}
for (ByteBuffer message : buffers) {
//判断message是否有效
while (message.hasRemaining()) {
clientChannel.write(message);
}
}
readBuffer.compact();
}
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException {
//新建服务端管道对象并设置为非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9999));
//基于机器性能创建选择器数组
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
//初始化数组,依次启动线程
for (int i = 0; i < selectors.length; i++) {
final Selector selector = Selector.open();
selectors[i] = selector;
new Thread(new clientProcessor(selector)).start();
}
AtomicInteger id = new AtomicInteger();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
iterator.next();
SocketChannel socketChannel = serverSocketChannel.accept();
Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
socketChannel.register(selectorChild, SelectionKey.OP_READ, ByteBuffer.allocate(256));
selectorChild.wakeup();
iterator.remove();
}
}
}
}
如上代码,我们比之前的实现做了一些改动。
首先在注册可读事件到选择器时,我们带上了一个256长度的ByteBuffer。这个ByteBuffer专门服务于这个这个选择器。因为粘包和拆包的存在,一次读取可能的数据中可能有多个消息,也可能不足一个消息,所以我们选择了用一个ByteBuffer去累计,这样每次读取也会考虑到上一次读取剩下的数据。
同时我们在实现消息读取的时候,不再是定长读取,而是循环检查字节,判断消息是否结尾。每得到一个消息就存入集合中,最后遍历集合依次返回客户端。
上面的代码已经实现了我们的需求,但是却是有缺点的。客户端在写出消息时,如果客户端缓冲区已满,消息无法写出,程序会一直循环等待,很消耗性能。下面我们再来改进一下代码。
代码改进
按照上述改进思路,我们回写消息的时候,再给消息注册写入事件,优化后的代码如下:
public class MainDemo3 {
static class ChannelBuffer {
ByteBuffer readBuffer;
ByteBuffer[] writeBuffers;
List<ByteBuffer> list = new LinkedList<>();
}
//处理拿到可读事件的socket
static class clientProcessor implements Runnable {
private Selector selector;
public clientProcessor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isValid() == false) {
continue;
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
ByteBuffer readBuffer = channelBuffer.readBuffer;
int read = clientChannel.read(readBuffer);
if (read == -1) {
key.cancel();
clientChannel.close();
} else {
readBuffer.flip();
int position = readBuffer.position();
int limit = readBuffer.limit();
List<ByteBuffer> buffers = new ArrayList<>();
for (int i = position; i < limit; i++) {
if (readBuffer.get(i) == '\r') ;
{
ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
readBuffer.limit(i + 1);
message.put(readBuffer);
readBuffer.limit(limit);
message.flip();
buffers.add(message);
// byte[] bytes = new byte[message.limit()];
// message.get(bytes);
// System.out.println(bytes);
}
}
if (channelBuffer.writeBuffers == null) {
ByteBuffer[] byteBuffers = buffers.toArray(new ByteBuffer[buffers.size()]);
clientChannel.write(byteBuffers);
boolean hasRemaining = hasRemaining(byteBuffers);
if (hasRemaining) {
channelBuffer.writeBuffers = byteBuffers;
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
} else {
//还有尚未发送完全的数据,新产生的数据需要放入队列
channelBuffer.list.addAll(buffers);
}
readBuffer.compact();
}
}
if (key.isWritable()){
SocketChannel clientChannel = (SocketChannel) key.channel();
ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
ByteBuffer[] writeBuffers = channelBuffer.writeBuffers;
clientChannel.write(writeBuffers);
boolean hasRemaining = hasRemaining(writeBuffers);
if (hasRemaining==false){
channelBuffer.writeBuffers = null;
List<ByteBuffer> list = channelBuffer.list;
if (!list.isEmpty()){
writeBuffers = list.toArray(new ByteBuffer[list.size()]);
list.clear();
clientChannel.write(writeBuffers);
if (hasRemaining(writeBuffers))
{
//仍然有数据没有完全写出,保留对可写事件的关注
}
else
{
//没有数据要写出了,取消对可写事件的关注
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
else
{
//没有数据要写出了,取消对可写事件的关注
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private boolean hasRemaining(ByteBuffer[] byteBuffers) {
boolean hasRemaining = false;
for (ByteBuffer byteBuffer : byteBuffers) {
if (byteBuffer.hasRemaining()) {
hasRemaining = true;
break;
}
}
return hasRemaining;
}
}
public static void main(String[] args) throws IOException {
//新建服务端管道对象并设置为非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9999));
//基于机器性能创建选择器数组
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
//初始化数组,依次启动线程
for (int i = 0; i < selectors.length; i++) {
final Selector selector = Selector.open();
selectors[i] = selector;
new Thread(new clientProcessor(selector)).start();
}
AtomicInteger id = new AtomicInteger();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
iterator.next();
SocketChannel socketChannel = serverSocketChannel.accept();
Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
ChannelBuffer channelBuffer = new ChannelBuffer();
socketChannel.register(selectorChild, SelectionKey.OP_READ, channelBuffer);
selectorChild.wakeup();
iterator.remove();
}
}
}
}
如上代码,如果在回写消息的时候,如果一次没有写出,不再循环判断等待,而是注册写入事件到选择器。我们用一个list来维护消息回写的有序性,如果上次的数据还没回写完成,则把此次数据添加的list中等待下一次写入。
在判断为写入事件的代码中,我们先对消息进行一次回写,如果回写消息的writeBuffers已经回写完,则开始回写list中的消息,否则结束事件,等待下一次写入。
本文地址:https://blog.csdn.net/sen_sen97/article/details/109587985