欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【netty in action】学习笔记-第四章

程序员文章站 2024-02-16 09:27:10
...

netty提供了统一的API进行传输数据,这个相比于JDK的方式方便很多。比如下面是一个不用netty而使用原生的阻塞IO进行传输的例子。

public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            for(;;) {
                final Socket clientSocket = socket.accept();
                System.out.println(
                        "Accepted connection from " + clientSocket);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(
                                    Charset.forName("UTF-8")));
                            out.flush();
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

代码很好理解,为每一个新来的连接创建一个线程处理。这种方式有个比较大的问题是,客户端连接数受限于服务器所能承受的线程数。为了改进这个问题我们可以使用异步模式来重写这段代码,但是你会发现,几乎所有的代码都要重写。原生的OIO和NIO的API几乎完全不能复用。不信你看看下面这段NIO的代码,

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;){
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                //handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server =
                                (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {
                        SocketChannel client =
                                (SocketChannel) key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // ignore on close
                    }
                }
            }
        }
    }
}

这个代码不做过多解释了,毕竟我们的重点是netty不是JDK NIO。

用netty实现一个OIO的程序是下面这样的姿势:

public class NettyOioServer {
    public void server(int port)
            throws Exception {
        final ByteBuf buf =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                                ch.pipeline().addLast(
                                    new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(
                                                ChannelHandlerContext ctx)
                                                throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(
                                                            ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

然后如果要改成异步非阻塞的模式,只需要把OioEventLoopGroup改成NioEventLoopGroup,把OioServerSocketChannel改成NioServerSocketChannel,简单到令人发指。

下面是Channel的类关系图,

【netty in action】学习笔记-第四章

从这幅图看出ChannelConfigChannelPipeline都属于Channel,在代码中体现为类的成员。ChannelPipeline其实前面我们也讲过了,它实现了责任链模式,把ChannelHandler一个个串起来。通过后者我们可以拥有包括但不限于如下的功能:

  • 数据的格式转换
  • 异常通知
  • active或者inactive通知
  • EventLoop注册或者注销事件通知
  • 用户自定义事件通知

下面列举了一些Channle本身提供的重要方法。

方法名 解释
eventLoop() 返回分配到channel上的eventloop
pipeline() 返回分配到channel上的channelpipeline
isActive() 返回到channel是否连接到一个远程服务
localAddress() 返回本地绑定的socketAddress
remoteAddress() 返回远程绑定的socketAddress
write() 写入数据到远程(客户端或者服务端),数据会经过channelpipeline

有些方法我们已经在前面的示例中见过了。来看下write()方法的使用示例:

Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
        ChannelFuture cf = channel.writeAndFlush(buf);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    System.out.println("Write successful");
                } else {
                    System.err.println("Write error");
                    future.cause().printStackTrace();
                }
            }
        });

简单解释下,

buf里是要写的数据,然后调用write方法写入数据,返回一个写入的future结果。前面已经说过这个future了,我们给future添加一个监听器,以便写入成功后可以通过回调得到通知。

另外write这个方法也是线程安全的,下面是一个用多线程操作write方法的示例,

final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        final ByteBuf buf = Unpooled.copiedBuffer("your data",
                CharsetUtil.UTF_8);
        Runnable writer = new Runnable() {
            @Override
            public void run() {
                channel.write(buf.duplicate());
            }
        };
        Executor executor = Executors.newCachedThreadPool();

        // write in one thread
        executor.execute(writer);

        // write in another thread
        executor.execute(writer);
        //...
    }

netty保证write方法线程安全的原理,是将用户线程的操作封装成Task放入消息队列中,底层由同一个I/O线程负责执行,这样就实现了局部无锁化。

这部分要解释清楚需要深入到源码底层,因为本篇系列是netty in action的笔记系列就不多说了。后面可能考虑写一个源码解析系列在深入这一块。

支持的四种传输方式

NIO

这是netty最常见的使用场景。当channel状态变更时用户可以收到通知,有以下几个状态:

  • 新的channel被accept
  • channel连接成功
  • channel收到数据
  • channel发送数据

【netty in action】学习笔记-第四章

如上图所示,netty内部其实也是封装了JDK的NIO,使用selector来管理IO状态的变更。在前面的章节里我们其实给过JDK NIO的代码示例,这里就不贴出来了。

netty NIO模型里有一个不得不说的特性叫zero-file-copy,很多地方翻译成零拷贝。这种特性可以让我们直接在文件系统和网卡传输数据,避免了数据从内核空间到用户空间的拷贝。

OIO

OIO是在netty里是一种折中的存在,阻塞的方式尽管应用场景很少,但是不代表不存在。比如通过jdbc调用数据库,如果是异步的方案是不太合适的。

netty的OIO模型底层也是调用JDK,前面的笔记我们也给过示例。这种模型就是用一个线程处理监听(accetp),然后为每个成功的连接创建一个处理线程。这样做的目的是防止对于某个连接的处理阻塞影响其它连接,毕竟I/O操作是很容易引起阻塞的。

既然是阻塞的模型,netty的封装能做的工作也有限。netty只是给socket上加了SO_TIMEOUT,这样如果一个操作在超时时间内没有完成,就会抛出SocketTimeoutException,netty会捕获这个异常,然后继续后面的流程。然后就是下一个EventLoop执行,循环往复。这种处理方案弊端在于抛出异常的开销,因为异常会占用堆栈。

【netty in action】学习笔记-第四章

这个图就是对上面的概括,分配一个线程给socket,socket连接服务器然后读数据,读数据可能阻塞也可能成功。如果是前者捕获异常后再次重试。

Local In VM transport

netty包含对本地传输的支持,这个传输实现使用相同的API用于虚拟机之间的通信,传输是完全异步的。

每个Channel使用唯一的SocketAddress,客户端通过使用SocketAddress进行连接,在服务器会被注册为长期运行,一旦通道关闭,它会自动注销,客户端无法再使用它。

使用本地传输服务器的行为与其他的传输实现几乎是相同的,需要注意的一个重点是只能在本地的服务器和客户端上使用它们。

Embedded transport

Embedded transport可以让你更容易的在不同的ChannelHandler之间的交互,更多的时候它像是一个工具类。一般用于测试的场景。它自带了一个具体的Channel实现,EmbeddedChannel。比如下面是一个使用示例:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}
@Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        Assert.assertTrue(channel.writeInbound(input.retain()));
        Assert.assertTrue(channel.finish());

        ByteBuf read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        Assert.assertNull(channel.readInbound());
        buf.release();
    }

用到的几个方法解释下,

  • writeInbound 将入站消息写到EmbeddedChannel中。如果可以通过readInbound方法从EmbeddedChannel中读取数据,则返回true
  • readInbound 从EmbeddedChannel中读取入站消息。任何返回东西都经过整个ChannelPipeline。如果没有任何可供读取的,则返回null
  • finish 将EmbeddedChannel标记为完成,如果有可读取的入站或出站数据,则返回true。这个方法还将会调用EmbeddedChannel上的close方法

更多的使用细节可以去网上了解下。