从零开始我的rpc第二篇之通信层Consumer端
首篇我提到rpc的关键是consumer和provider间进行socket通信,来进行如方法,参数等信息的传递.所以,通信层是非常的重要的,我们要保证通信的稳定,高效,正确.
我目前所了解和使用的通讯框架就是netty了,当然netty本就是一个非常著名的通讯框架,也值得深入研究.我自己也研究(就是看看)了一段时间,掌握了一些基本的使用.对用在rpc里面也就可以了.
首先,从client这边开始.来看看netty的client的基本代码:
Bootstrap boot = new Bootstrap();
boot.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipe = ch.pipeline();
pipe.addLast("lengthDecoder",
new LengthFieldBasedFrameDecoder(BeaconConstants.MAX_LEN, BeaconConstants.LEN_OFFSET,
BeaconConstants.INT_LEN))
.addLast("beaconDecoder", new NettyDecoder())
.addLast("beconEncoder", new NettyEncoder())
.addLast("beaconClientHandler", handler);
}
};);
ChannelFuture f =boot.connect(host, port).syncUninterruptibly();
这就是netty的client实现了.简单了解一下,netty在client通过Bootstrap进行引导,group函数定义了一个线程组,用来接收每一个请求.channel函数定义了具体的消息处理类,这里选择nio模式.option函数可以选择连接的一些配置定义.handler函数定义了业务处理的核心.这里通过ChannelInitializer类并重写了initChannel方法,来让用户自定义channelHandler.用户可以根据自己的业务需求,定义很多中handler,比如这里lengthDecoder是处理半包的,beaconDecoder和beconEncoder是处理编解码的,beaconClientHandler是处理具体的业务逻辑的.
然后我们看看这个beaconClientHandler的基本实现.因为consumer和provider是相互通信的.所以这里我们选择ChannelDuplexHandler
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (!ctx.channel().isActive()) {
NettyChannel.removeChannel(ctx.channel());
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
super.disconnect(ctx, promise);
if (!ctx.channel().isActive()) {
NettyChannel.removeChannel(ctx.channel());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
BeaconHandler handler = NettyChannel.getChannel(ctx.channel(), From.CLIENT);
try {
// 交给上层处理
handler.receive(msg);
} finally {
if (!ctx.channel().isActive()) {
NettyChannel.removeChannel(ctx.channel());
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
通过继承并重写这些方法来实现我们的业务逻辑,channelActive和channelInactive表示channel刚刚建立和失效的需要调用的.disconnect用于连接断开的时候调用.channelRead是接收provider的返回的信息,即我们rpc调用的结果.exceptionCaught表示遇到异常的处理.write用来我们给provider发送信息,比如方法,参数等.
再来看看编解码:
public class NettyEncoder extends MessageToByteEncoder<RpcMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
if (msg instanceof RpcRequest) {
msg.setFrom((byte) From.CLIENT.ordinal());
} else {
msg.setFrom((byte) From.SERVER.ordinal());
}
byte[] b = SpiManager.defaultSpiExtender(Serializer.class)
.serialize(msg);
// 写入请求端
out.writeByte(msg.getFrom());
// 写入长度
out.writeInt(b.length);
// 写入字节
out.writeBytes(b);
}
}
这就是netty的client端基本的代码.前面几乎是模版代码,具体代码含义可以看看netty官网的示例,这里就不细说了(强行略过是一门学问,哈哈.).我们需要明确知道的就是handler负责业务逻辑,channel负责实际的消息发送.
我们重新回到consumer上来,rpc的特点是需要进行扩展,那么netty肯定是其中一种通信框架,而不能成为我们的唯一依赖.所以我们来想想如何设计一个可扩展的通信基本结构.
不过netty很好,这里可以借用原生的netty的设计.我们也设计个channel层和handler层,channel用来封装实际发送的逻辑,而handler可以用来处理我们发送前的逻辑.思路也可以和netty一样保持清晰.
而java里面说到扩展无非是利用接口和抽象进行组合封装(即设计模式).通信层核心就是用来接收发送的,所以,我们首先抽象一个接口出来.
public interface Client extends BeaconSide {
public Object send(Object message) throws Exception;
}
client的核心工作就是发送,那么发送其实实际上是交给channel做的,所以,我们再抽象一个channel.
public interface BaseChannel {
public Object send(Object message) throws Exception;
public void receive(Object msg) throws Exception;
}
channel做什么了,发送和接收数据.
然后我们再写出我们的handler:
public interface BeaconHandler {
public void receive(Object message) throws Exception;
public Object send(Object message) throws Exception;
}
这里的handler其实和channel差不多,但是就如我们刚开始约定的一样,channel负责发送,handler负责逻辑.
好,这里的基本逻辑我们设定好了,那我们如何去实现里面的逻辑.先来看看我们clientChannel怎么去实现.
public class BeaconClientChannel extends AbstractBeaconChannel {
private static final Logger LOG = LoggerFactory.getLogger("BeaconClientChannel");
protected BaseChannel baseChannel;
public BeaconClientChannel(BaseChannel baseChannel) {
this.baseChannel = baseChannel;
}
}
基本的,我们clientChannel持有一个baseChannel,这个basechannel就是我们具体的负责发送接收操作,如果是netty的话就是channel了.
然后我们想想如何实现client的send方法,首先client有俩个方法send和receive,也就是说发送和接收结果是异步的,也就是send发送过后,是由receive方法进行监听收到server端发来的结果的.所以我们在send里面需要做的就是阻塞等待receive收到结果,那这个我们如何知道receive获取到结果和如何获取这个结果了?比如很多结果同时来到,我们怎么判断哪个结果对应到之前的哪个请求?这就是我们做send和receive需要考虑的.
首先我们这里设计俩个东西,一个装结果的map和一个执行处理的线程池:
private static final ConcurrentMap<String, CallbackListener> RESULT_MAP = new ConcurrentHashMap<>(32);
/**
* 请求端 server or client
*/
private String side;
/**
* 用于线程池中线程的计数
*/
private static final AtomicInteger COUNT = new AtomicInteger(0);
/**
* 线程池,每一个消费请求都会放入池中执行等待结果,相当于newCachedThreadPool
* coresize=处理器*3 maxsize=最大内存(mb)/2
*/
private static final ThreadPoolExecutor TASK_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() << 3, (int) (Runtime.getRuntime().maxMemory() >> 21),
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "BeaconTaskHandler-" + COUNT.getAndIncrement());
}
});
设计思想是给每个消息加一个唯一的id,然后将发送消息丢进线程池执行发送消息和等待结果:
final CallbackListener listener = new CallbackListener();
synchronized (listener) {
addListener(((RpcRequest) message).getId(), listener);
// wait次数达到一定限制后,默认超时.TODO
int retry = 1;
// 发送消息
baseChannel.send(message);
// 同步等待结果
long start = System.currentTimeMillis();
// 防止发生意外,导致一直阻塞;再等待一定时间后,以超时结束
try {
while ((result = listener.result()) == null && retry <= MAX_RETRY_NUM) {
listener.wait(SLEEP_TIME << retry);
retry++;
}
} catch (InterruptedException e) {
// 外围get超时,执行 taskFuture.cancel(true)进行中断
long cost = System.currentTimeMillis() - start;
LOG.info("Wait for {} times;cost {} ms", retry, cost);
return result;
}
long cost = System.currentTimeMillis() - start;
LOG.info("Wait for {} times;cost {} ms", retry, cost);
if (result == null) {
// 最大超时
result = new RpcResponse()
.setException(new TimeoutException("Request exceed limit time,cost time->" + cost))
.setId(((RpcRequest) message).getId());
}
}
// 已获取到结果
return result;
这里通过对listener进行同步,然后我们receive的时候收到消息,放入map中,同时notify到listener,这样我们就可以收到结果了,这里也对循环阻塞做了一些限制,防止一直阻塞,当超时的时候我们直接返回给上游超时错误.
那我们看看receive做了啥
protected void doReceive(Object message) {
// 对同一次的请求channel加锁,当收到结果时释放
setResult(((RpcResponse) message).getId(), message);
}
/**
* 提供给下层
*
* @return
*/
public void setResult(String requestId, Object result) {
CallbackListener listener;
synchronized ((listener = RESULT_MAP.get(requestId))) {
listener.onSuccess(result);
// 通知等待线程,这里只有一个线程在等待
listener.notify();
listener = null;
RESULT_MAP.remove(requestId);
}
}
baseChannel收到消息后,回调给clientChannel,这里就是上面进行说的notify操作,然后listener置空,并同时从map中删除,使命完成,非常的nice.
至此,client的功能完成了,哦,好像还有个handler
public class BeaconClientHandler extends AbstractBeaconHandler {
private BeaconClientChannel clientChannel;
public BeaconClientHandler(BeaconClientChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void receive(Object message) throws Exception {
super.receive(message);
this.clientChannel.receive(message);
}
@Override
public Object send(Object message) throws Exception {
super.send(message);
return this.clientChannel.send(message);
}
}
这里的handler很简单,只做了简单的封装转发,因为我还没想好给handler到底做哪些事情,不过handler以后可以做些数据校验,请求的统计,以及一些其他的记录操作.不过现在我确实没做啥.
好了,小结一下,这里主要讲了rpc的client的一些自己的结构设计的方面,以及简单的代码实例,并不完整,不过我觉得代码实现倒是次要的,就像我第一篇说的,只要明确你要做什么,理解了整个流程的思想,代码实现只是一个过程.
具体的实现参照我的github:https://github.com/dressrosa/beacon
上一篇: php使用ob_start()实现图片存入变量的方法_PHP
下一篇: 初识RPC