欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

从零开始我的rpc第二篇之通信层Consumer端

程序员文章站 2022-06-01 21:31:58
...

首篇我提到rpc的关键是consumer和provider间进行socket通信,来进行如方法,参数等信息的传递.所以,通信层是非常的重要的,我们要保证通信的稳定,高效,正确.
我目前所了解和使用的通讯框架就是netty了,当然netty本就是一个非常著名的通讯框架,也值得深入研究.我自己也研究(就是看看)了一段时间,掌握了一些基本的使用.对用在rpc里面也就可以了.
首先,从client这边开始.来看看netty的client的基本代码:

Bootstrap boot = new Bootstrap();
        boot.group(worker)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            final ChannelPipeline pipe = ch.pipeline();
            pipe.addLast("lengthDecoder",
                    new LengthFieldBasedFrameDecoder(BeaconConstants.MAX_LEN, BeaconConstants.LEN_OFFSET,
                            BeaconConstants.INT_LEN))
                    .addLast("beaconDecoder", new NettyDecoder())
                    .addLast("beconEncoder", new NettyEncoder())
                    .addLast("beaconClientHandler", handler);
        }
    };);
ChannelFuture f =boot.connect(host, port).syncUninterruptibly();

这就是netty的client实现了.简单了解一下,netty在client通过Bootstrap进行引导,group函数定义了一个线程组,用来接收每一个请求.channel函数定义了具体的消息处理类,这里选择nio模式.option函数可以选择连接的一些配置定义.handler函数定义了业务处理的核心.这里通过ChannelInitializer类并重写了initChannel方法,来让用户自定义channelHandler.用户可以根据自己的业务需求,定义很多中handler,比如这里lengthDecoder是处理半包的,beaconDecoder和beconEncoder是处理编解码的,beaconClientHandler是处理具体的业务逻辑的.
然后我们看看这个beaconClientHandler的基本实现.因为consumer和provider是相互通信的.所以这里我们选择ChannelDuplexHandler

 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (!ctx.channel().isActive()) {
            NettyChannel.removeChannel(ctx.channel());
        }
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        super.disconnect(ctx, promise);
        if (!ctx.channel().isActive()) {
            NettyChannel.removeChannel(ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        BeaconHandler handler = NettyChannel.getChannel(ctx.channel(), From.CLIENT);
        try {
            // 交给上层处理
            handler.receive(msg);
        } finally {
            if (!ctx.channel().isActive()) {
                NettyChannel.removeChannel(ctx.channel());
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
    }

通过继承并重写这些方法来实现我们的业务逻辑,channelActive和channelInactive表示channel刚刚建立和失效的需要调用的.disconnect用于连接断开的时候调用.channelRead是接收provider的返回的信息,即我们rpc调用的结果.exceptionCaught表示遇到异常的处理.write用来我们给provider发送信息,比如方法,参数等.
再来看看编解码:

public class NettyEncoder extends MessageToByteEncoder<RpcMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
        if (msg instanceof RpcRequest) {
            msg.setFrom((byte) From.CLIENT.ordinal());
        } else {
            msg.setFrom((byte) From.SERVER.ordinal());
        }
        byte[] b = SpiManager.defaultSpiExtender(Serializer.class)
                .serialize(msg);
        // 写入请求端
        out.writeByte(msg.getFrom());
        // 写入长度
        out.writeInt(b.length);
        // 写入字节
        out.writeBytes(b);

    }

}

这就是netty的client端基本的代码.前面几乎是模版代码,具体代码含义可以看看netty官网的示例,这里就不细说了(强行略过是一门学问,哈哈.).我们需要明确知道的就是handler负责业务逻辑,channel负责实际的消息发送.
我们重新回到consumer上来,rpc的特点是需要进行扩展,那么netty肯定是其中一种通信框架,而不能成为我们的唯一依赖.所以我们来想想如何设计一个可扩展的通信基本结构.
不过netty很好,这里可以借用原生的netty的设计.我们也设计个channel层和handler层,channel用来封装实际发送的逻辑,而handler可以用来处理我们发送前的逻辑.思路也可以和netty一样保持清晰.
而java里面说到扩展无非是利用接口和抽象进行组合封装(即设计模式).通信层核心就是用来接收发送的,所以,我们首先抽象一个接口出来.

public interface Client extends BeaconSide {
    public Object send(Object message) throws Exception;
}

client的核心工作就是发送,那么发送其实实际上是交给channel做的,所以,我们再抽象一个channel.

public interface BaseChannel {


    public Object send(Object message) throws Exception;

    public void receive(Object msg) throws Exception;
}

channel做什么了,发送和接收数据.
然后我们再写出我们的handler:

public interface BeaconHandler {

    public void receive(Object message) throws Exception;

    public Object send(Object message) throws Exception;

}

这里的handler其实和channel差不多,但是就如我们刚开始约定的一样,channel负责发送,handler负责逻辑.
好,这里的基本逻辑我们设定好了,那我们如何去实现里面的逻辑.先来看看我们clientChannel怎么去实现.

public class BeaconClientChannel extends AbstractBeaconChannel {

    private static final Logger LOG = LoggerFactory.getLogger("BeaconClientChannel");

    protected BaseChannel baseChannel;

    public BeaconClientChannel(BaseChannel baseChannel) {
        this.baseChannel = baseChannel;
    }
}

基本的,我们clientChannel持有一个baseChannel,这个basechannel就是我们具体的负责发送接收操作,如果是netty的话就是channel了.
然后我们想想如何实现client的send方法,首先client有俩个方法send和receive,也就是说发送和接收结果是异步的,也就是send发送过后,是由receive方法进行监听收到server端发来的结果的.所以我们在send里面需要做的就是阻塞等待receive收到结果,那这个我们如何知道receive获取到结果和如何获取这个结果了?比如很多结果同时来到,我们怎么判断哪个结果对应到之前的哪个请求?这就是我们做send和receive需要考虑的.
首先我们这里设计俩个东西,一个装结果的map和一个执行处理的线程池:

 private static final ConcurrentMap<String, CallbackListener> RESULT_MAP = new ConcurrentHashMap<>(32);

    /**
     * 请求端 server or client
     */
    private String side;

    /**
     * 用于线程池中线程的计数
     */
    private static final AtomicInteger COUNT = new AtomicInteger(0);

    /**
     * 线程池,每一个消费请求都会放入池中执行等待结果,相当于newCachedThreadPool
     * coresize=处理器*3 maxsize=最大内存(mb)/2
     */
    private static final ThreadPoolExecutor TASK_POOL = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() << 3, (int) (Runtime.getRuntime().maxMemory() >> 21),
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "BeaconTaskHandler-" + COUNT.getAndIncrement());
                }
            });

设计思想是给每个消息加一个唯一的id,然后将发送消息丢进线程池执行发送消息和等待结果:

final CallbackListener listener = new CallbackListener();
                synchronized (listener) {
                    addListener(((RpcRequest) message).getId(), listener);
                    // wait次数达到一定限制后,默认超时.TODO
                    int retry = 1;
                    // 发送消息
                    baseChannel.send(message);
                    // 同步等待结果
                    long start = System.currentTimeMillis();
                    // 防止发生意外,导致一直阻塞;再等待一定时间后,以超时结束
                    try {
                        while ((result = listener.result()) == null && retry <= MAX_RETRY_NUM) {
                            listener.wait(SLEEP_TIME << retry);
                            retry++;
                        }
                    } catch (InterruptedException e) {
                        // 外围get超时,执行 taskFuture.cancel(true)进行中断
                        long cost = System.currentTimeMillis() - start;
                        LOG.info("Wait for {} times;cost {} ms", retry, cost);
                        return result;
                    }
                    long cost = System.currentTimeMillis() - start;
                    LOG.info("Wait for {} times;cost {} ms", retry, cost);
                    if (result == null) {
                        // 最大超时
                        result = new RpcResponse()
                                .setException(new TimeoutException("Request exceed limit time,cost time->" + cost))
                                .setId(((RpcRequest) message).getId());
                    }
                }
                // 已获取到结果
                return result;

这里通过对listener进行同步,然后我们receive的时候收到消息,放入map中,同时notify到listener,这样我们就可以收到结果了,这里也对循环阻塞做了一些限制,防止一直阻塞,当超时的时候我们直接返回给上游超时错误.
那我们看看receive做了啥

 protected void doReceive(Object message) {
        // 对同一次的请求channel加锁,当收到结果时释放
        setResult(((RpcResponse) message).getId(), message);
    }
/**
     * 提供给下层
     * 
     * @return
     */
    public void setResult(String requestId, Object result) {
        CallbackListener listener;
        synchronized ((listener = RESULT_MAP.get(requestId))) {
            listener.onSuccess(result);
            // 通知等待线程,这里只有一个线程在等待
            listener.notify();
            listener = null;
            RESULT_MAP.remove(requestId);
        }
    }

baseChannel收到消息后,回调给clientChannel,这里就是上面进行说的notify操作,然后listener置空,并同时从map中删除,使命完成,非常的nice.
至此,client的功能完成了,哦,好像还有个handler

public class BeaconClientHandler extends AbstractBeaconHandler {

    private BeaconClientChannel clientChannel;

    public BeaconClientHandler(BeaconClientChannel clientChannel) {
        this.clientChannel = clientChannel;
    }

    @Override
    public void receive(Object message) throws Exception {
        super.receive(message);
        this.clientChannel.receive(message);
    }

    @Override
    public Object send(Object message) throws Exception {
        super.send(message);
        return this.clientChannel.send(message);
    }

}

这里的handler很简单,只做了简单的封装转发,因为我还没想好给handler到底做哪些事情,不过handler以后可以做些数据校验,请求的统计,以及一些其他的记录操作.不过现在我确实没做啥.
好了,小结一下,这里主要讲了rpc的client的一些自己的结构设计的方面,以及简单的代码实例,并不完整,不过我觉得代码实现倒是次要的,就像我第一篇说的,只要明确你要做什么,理解了整个流程的思想,代码实现只是一个过程.
具体的实现参照我的github:https://github.com/dressrosa/beacon

相关标签: rpc