欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[HBase]RPC框架之client实现 hbaserpcclient 

程序员文章站 2022-06-14 16:09:46
...

HBase RPC的client主要工作:

 

1.JDK动态代理获取代理类

2.调用对应服务方法,逻辑包装在Invoker里,新建连接,发送数据包,等待server端响应,默认超时60s

3.超时使用wait+loop检查实现

其类图如下

 [HBase]RPC框架之client实现
            
    
    
        hbaserpcclient 

0.94实现如下

HBaseRPC getProxy入口

 

  public static VersionedProtocol getProxy(
      Class<? extends VersionedProtocol> protocol,
      long clientVersion, InetSocketAddress addr, User ticket,
      Configuration conf, SocketFactory factory, int rpcTimeout)
  throws IOException {
	 //获取代理类
    VersionedProtocol proxy =
        getProtocolEngine(protocol,conf)
            .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
	 //发起rpc调用
    long serverVersion = proxy.getProtocolVersion(protocol.getName(),
                                                  clientVersion);
    if (serverVersion == clientVersion) {
      return proxy;
    }
    throw new VersionMismatch(protocol.getName(), clientVersion,
                              serverVersion);
  }

 

  public VersionedProtocol getProxy(
      Class<? extends VersionedProtocol> protocol, long clientVersion,
      InetSocketAddress addr, User ticket,
      Configuration conf, SocketFactory factory, int rpcTimeout)
    throws IOException {
	 //jdk动态代理
      VersionedProtocol proxy =
          (VersionedProtocol) Proxy.newProxyInstance(
              protocol.getClassLoader(), new Class[] { protocol },
              new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
    if (proxy instanceof VersionedProtocol) {
      long serverVersion = ((VersionedProtocol)proxy)
        .getProtocolVersion(protocol.getName(), clientVersion);
      if (serverVersion != clientVersion) {
        throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
                                      serverVersion);
      }
    }
    return proxy;
  }

 调用发起 invoker

 

 

public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
	 //调用
      HbaseObjectWritable value = (HbaseObjectWritable)
        client.call(new Invocation(method, args), address,
                    protocol, ticket, rpcTimeout);
      if (logDebug) {
        // FIGURE HOW TO TURN THIS OFF!
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }

 Invocation组装

 

 

  public Invocation(Method method, Object[] parameters) {
    this.methodName = method.getName();
    this.parameterClasses = method.getParameterTypes();
    this.parameters = parameters;
    if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
      //VersionedProtocol is exempted from version check.
      clientVersion = 0;
      clientMethodsHash = 0;
    } else {
      try {
        Field versionField = method.getDeclaringClass().getField("VERSION");
        versionField.setAccessible(true);
        this.clientVersion = versionField.getLong(method.getDeclaringClass());
      } catch (NoSuchFieldException ex) {
        throw new RuntimeException("The " + method.getDeclaringClass(), ex);
      } catch (IllegalAccessException ex) {
        throw new RuntimeException(ex);
      }
      this.clientMethodsHash = ProtocolSignature.getFingerprint(method
          .getDeclaringClass().getMethods());
    }
  }

 调用过程

 

 

  public Writable call(Writable param, InetSocketAddress addr,
                       Class<? extends VersionedProtocol> protocol,
                       User ticket, int rpcTimeout)
      throws InterruptedException, IOException {、
	//Call代表一次请求,具有唯一id
    Call call = new Call(param);
	//获取连接,此处会创建连接,初始化IO操作
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
	//发送请求
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
	//业务线程在此等待server端响应
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ignored) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }
	//如果有异常,则抛出
      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        }
        // local exception
        throw wrapException(addr, call.error);
      }
	//否则,返回调用值
      return call.value;
    }
  }

 Call初始化

 

 

    protected Call(Writable param) {
      this.param = param;
      this.startTime = System.currentTimeMillis();
	//唯一id
      synchronized (HBaseClient.this) {
        this.id = counter++;
      }
    }

 getConnection操作

 

 

 protected Connection getConnection(InetSocketAddress addr,
                                   Class<? extends VersionedProtocol> protocol,
                                   User ticket,
                                   int rpcTimeout,
                                   Call call)
                                   throws IOException, InterruptedException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    /* we could avoid this allocation for each RPC by having a
     * connectionsId object and with set() method. We need to manage the
     * refs for keys in HashMap properly. For now its ok.
     */
	//每个连接唯一标示,不同标示会建立不同连接
    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } 
	//将请求添加到内部Map,方便后续server返回数据时处理
    while (!connection.addCall(call));

    //we don't invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
	//新建连接,初始化IO
    connection.setupIOstreams();
    return connection;
  }

 新建连接过程

 

 

    protected synchronized void setupIOstreams()
        throws IOException, InterruptedException {

      if (socket != null || shouldCloseConnection.get()) {
        return;
      }

      try {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to "+remoteId);
        }
	//创建连接
        setupConnection();
	//初始化IO
        this.in = new DataInputStream(new BufferedInputStream
            (new PingInputStream(NetUtils.getInputStream(socket))));
        this.out = new DataOutputStream
            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
	//发送header,还在buffer中
        writeHeader();

        // update last activity time
	//最新活动时间
        touch();
	//启动读线程,该线程监听OS的READ事件,负责从server端读取数据
        // start the receiver thread after the socket connection has been set up
        start();
      } catch (IOException e) {
        markClosed(e);
        close();

        throw e;
      }
    }

 创建连接

 

 

    protected synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
		//默认是socketFactory是StandardSocketFactory
          this.socket = socketFactory.createSocket();
          this.socket.setTcpNoDelay(tcpNoDelay);
          this.socket.setKeepAlive(tcpKeepAlive);
          // connection time out is 20s
		//连接,直到连上,默认超时20s
          NetUtils.connect(this.socket, remoteId.getAddress(),
              getSocketTimeout(conf));
          if (remoteId.rpcTimeout > 0) {
            pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
          }
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* The max number of retries is 45,
           * which amounts to 20s*45 = 15 minutes retries.
           */
          handleConnectionFailure(timeoutFailures++, maxRetries, toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

 StandardSocketFactory创建socket

 

 

  public Socket createSocket() throws IOException {
    /*
     * NOTE: This returns an NIO socket so that it has an associated 
     * SocketChannel. As of now, this unfortunately makes streams returned
     * by Socket.getInputStream() and Socket.getOutputStream() unusable
     * (because a blocking read on input stream blocks write on output stream
     * and vice versa).
     * 
     * So users of these socket factories should use 
     * NetUtils.getInputStream(socket) and 
     * NetUtils.getOutputStream(socket) instead.
     * 
     * A solution for hiding from this from user is to write a 
     * 'FilterSocket' on the lines of FilterInputStream and extend it by
     * overriding getInputStream() and getOutputStream().
     */
	//打开一个channel
    return SocketChannel.open().socket();
  }

 连接过程

 

 

 public static void connect(Socket socket, 
                             SocketAddress endpoint, 
                             int timeout) throws IOException {
    if (socket == null || endpoint == null || timeout < 0) {
      throw new IllegalArgumentException("Illegal argument for connect()");
    }
    
    SocketChannel ch = socket.getChannel();
    
    if (ch == null) {
      // let the default implementation handle it.
      socket.connect(endpoint, timeout);
    } else {
      SocketIOWithTimeout.connect(ch, endpoint, timeout);
    }
......
  }

 

static void connect(SocketChannel channel, 
                      SocketAddress endpoint, int timeout) throws IOException {
    
	//使用NIO
    boolean blockingOn = channel.isBlocking();
    if (blockingOn) {
      channel.configureBlocking(false);
    }
    
	//尝试连接
    try { 
      if (channel.connect(endpoint)) {
        return;
      }
	//如果还没连上,则开启一个selector,监听CONNECT事件
      long timeoutLeft = timeout;
      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
      
      while (true) {
        // we might have to call finishConnect() more than once
        // for some channels (with user level protocols)
        //CONNECT是否就位
        int ret = selector.select((SelectableChannel)channel, 
                                  SelectionKey.OP_CONNECT, timeoutLeft);
        //如果就位,再看看channel是否已连接
        if (ret > 0 && channel.finishConnect()) {
          return;
        }
        //如果没就位,或者超时了,抛出异常
        if (ret == 0 ||
            (timeout > 0 &&  
              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
          throw new SocketTimeoutException(
                    timeoutExceptionString(channel, timeout, 
                                           SelectionKey.OP_CONNECT));
        }
      }
    } catch (IOException e) {
      // javadoc for SocketChannel.connect() says channel should be closed.
      try {
        channel.close();
      } catch (IOException ignored) {}
      throw e;
    } finally {
      if (blockingOn && channel.isOpen()) {
        channel.configureBlocking(true);
      }
    }
  }

 再看SelectorPool的select过程

 

 

int select(SelectableChannel channel, int ops, long timeout) 
                                                   throws IOException {
     //从pool中拿一个selector,SelectorPool维护着一个Provider列表,每个provider都有一个selector队列
      SelectorInfo info = get(channel);
      
      SelectionKey key = null;
      int ret = 0;
      
      try {
        while (true) {
          long start = (timeout == 0) ? 0 : System.currentTimeMillis();
		//CONNECT就位事件
          key = channel.register(info.selector, ops);
		//阻塞select,直到超时
          ret = info.selector.select(timeout);
          
          if (ret != 0) {
            return ret;
          }
          
          /* Sometimes select() returns 0 much before timeout for 
           * unknown reasons. So select again if required.
           */
	//如果超时了,返回0,没超时则继续select
          if (timeout > 0) {
            timeout -= System.currentTimeMillis() - start;
            if (timeout <= 0) {
              return 0;
            }
          }
          
          if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedIOException("Interruped while waiting for " +
                                             "IO on channel " + channel +
                                             ". " + timeout + 
                                             " millis timeout left.");
          }
        }
      } finally {
	//处理完后,清理key
        if (key != null) {
          key.cancel();
        }
        
        //clear the canceled key.
        try {
          info.selector.selectNow();
        } catch (IOException e) {
          LOG.info("Unexpected Exception while clearing selector : " +
                   StringUtils.stringifyException(e));
          // don't put the selector back.
          info.close();
          return ret; 
        }
        
        release(info);
      }
    }

 连上之后,初始化IO

 

 

  public SocketInputStream(ReadableByteChannel channel, long timeout)
                                                        throws IOException {
    SocketIOWithTimeout.checkChannelValidity(channel);
    reader = new Reader(channel, timeout);
  }

  SocketIOWithTimeout(SelectableChannel channel, long timeout) 
                                                 throws IOException {
    checkChannelValidity(channel);
    
    this.channel = channel;
    this.timeout = timeout;
    // Set non-blocking
    channel.configureBlocking(false);
  }

  public SocketOutputStream(WritableByteChannel channel, long timeout) 
                                                         throws IOException {
    SocketIOWithTimeout.checkChannelValidity(channel);
    writer = new Writer(channel, timeout);
  }

写header

 

 

    private void writeHeader() throws IOException {
	 //固定4个字节
      out.write(HBaseServer.HEADER.array());
	 //版本信息,一个字节
      out.write(HBaseServer.CURRENT_VERSION);
      //When there are more fields we can have ConnectionHeader Writable.
      DataOutputBuffer buf = new DataOutputBuffer();
	 //header序列化byte
      header.write(buf);
	 //写长度,4个字节,写内容
      int bufLen = buf.getLength();
      out.writeInt(bufLen);
      out.write(buf.getData(), 0, bufLen);
    }

 之后IO READ线程Connection启动

 

 

    public void run() {
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": starting, having connections "
            + connections.size());

      try {
	  //如果没有请求,则等一段时间
        while (waitForWork()) {//wait here for work - read or close connection
          receiveResponse();
        }
      } catch (Throwable t) {
        LOG.warn("Unexpected exception receiving call responses", t);
        markClosed(new IOException("Unexpected exception receiving call responses", t));
      }

      close();

      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": stopped, remaining connections "
            + connections.size());
    }

 数据读取

 

 

protected void receiveResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
	//活动时间
      touch();

      try {
        // See HBaseServer.Call.setResponse for where we write out the response.
        // It writes the call.id (int), a flag byte, then optionally the length
        // of the response (int) followed by data.

        // Read the call id.
	//请求的id
        int id = in.readInt();

        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + id);
	//从map中获取请求
        Call call = calls.get(id);
	//成功还是失败
        // Read the flag byte
        byte flag = in.readByte();
        boolean isError = ResponseFlag.isError(flag);
        if (ResponseFlag.isLength(flag)) {
          // Currently length if present is unused.
          in.readInt();
        }
	//RPC状态
        int state = in.readInt(); // Read the state.  Currently unused.
	//如果请求异常,则获取异常信息,包装成异常
        if (isError) {
          if (call != null) {
            //noinspection ThrowableInstanceNeverThrown
            call.setException(new RemoteException(WritableUtils.readString(in),
                WritableUtils.readString(in)));
          }
        } else {
	//反序列化,HbaseObjectWritable,和server保持一致
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          // it's possible that this call may have been cleaned up due to a RPC
          // timeout, so check if it still exists before setting the value.
		//call还没被删掉,则设置返回值
          if (call != null) {
            call.setValue(value);
          }
        }
	//完事后,从map中删除
        calls.remove(id);
      } catch (IOException e) {
        if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
          // Clean up open calls but don't treat this as a fatal condition,
          // since we expect certain responses to not make it by the specified
          // {@link ConnectionId#rpcTimeout}.
          closeException = e;
        } else {
          // Since the server did not respond within the default ping interval
          // time, treat this as a fatal condition and close this connection
          markClosed(e);
        }
      } catch (Exception e) {
        markClosed(new IOException(e.getMessage(), e));
      } finally {
	//检查超时请求并处理
        if (remoteId.rpcTimeout > 0) {
          cleanupCalls(remoteId.rpcTimeout);
        }
      }
    }

 超时请求检查和处理

 

 

    protected void cleanupCalls(long rpcTimeout) {
      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
      while (itor.hasNext()) {
        Call c = itor.next().getValue();
        long waitTime = System.currentTimeMillis() - c.getStartTime();
	//超时了。写回一个超时异常
        if (waitTime >= rpcTimeout) {
          if (this.closeException == null) {
            // There may be no exception in the case that there are many calls
            // being multiplexed over this connection and these are succeeding
            // fine while this Call object is taking a long time to finish
            // over on the server; e.g. I just asked the regionserver to bulk
            // open 3k regions or its a big fat multiput into a heavily-loaded
            // server (Perhaps this only happens at the extremes?)
            this.closeException = new CallTimeoutException("Call id=" + c.id +
              ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
          }
          c.setException(this.closeException);
          synchronized (c) {
            c.notifyAll();
          }
          itor.remove();
        } 
	//calls是一个排序map,按照key的字典序,key是id,一个integer,也就是按照id升序,如果最老的请求(id最小)还没超时,则后续请求都没超时,直接break
	else {
          break;
        }
      }
	//修改SO_TIMEOUT
      try {
        if (!calls.isEmpty()) {
          Call firstCall = calls.get(calls.firstKey());
          long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
          if (maxWaitTime < rpcTimeout) {
            rpcTimeout -= maxWaitTime;
          }
        }
        if (!shouldCloseConnection.get()) {
          closeException = null;
          if (socket != null) {
            socket.setSoTimeout((int) rpcTimeout);
          }
        }
      } catch (SocketException e) {
        LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
      }
    }
  }

 Reader/WRITER IO操作

 

 int doIO(ByteBuffer buf, int ops) throws IOException {
    
    /* For now only one thread is allowed. If user want to read or write
     * from multiple threads, multiple streams could be created. In that
     * case multiple threads work as well as underlying channel supports it.
     */
    if (!buf.hasRemaining()) {
      throw new IllegalArgumentException("Buffer has no data left.");
      //or should we just return 0?
    }

    while (buf.hasRemaining()) {
      if (closed) {
        return -1;
      }
	//这里执行reader/writer的操作,基本就是对channel的read/write操作
      try {
        int n = performIO(buf);
	//有数据处理,则返回,有可能只处理了部分数据,上层保证数据是否完整
        if (n != 0) {
          // successful io or an error.
          return n;
        }
      } catch (IOException e) {
        if (!channel.isOpen()) {
          closed = true;
        }
        throw e;
      }

      //now wait for socket to be ready.
	//一个数据都没处理,则继续监听对应READ/WRITE事件
      int count = 0;
      try {
        count = selector.select(channel, ops, timeout);  
      } catch (IOException e) { //unexpected IOException.
        closed = true;
        throw e;
      } 
	//超时了,否则继续执行读写操作,直到buffer被全部处理
      if (count == 0) {
        throw new SocketTimeoutException(timeoutExceptionString(channel,
                                                                timeout, ops));
      }
      // otherwise the socket should be ready for io.
    }
    
    return 0; // does not reach here.
  }

 IO read线程启动之后,业务线程开始写数据

 protected void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }

      // For serializing the data to be written.

      final DataOutputBuffer d = new DataOutputBuffer();
      try {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " sending #" + call.id);
	//占位
        d.writeInt(0xdeadbeef); // placeholder for data length
	//请求id,4个字节
        d.writeInt(call.id);
	//把param序列化
        call.param.write(d);
	//要写的数据
        byte[] data = d.getData();
        int dataLength = d.getLength();
        // fill in the placeholder
	//替换
        Bytes.putInt(data, 0, dataLength - 4);
        //noinspection SynchronizeOnNonFinalField
	//写并flush
        synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
          out.write(data, 0, dataLength);
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        //the buffer is just an in-memory buffer, but it is still polite to
        // close early
        IOUtils.closeStream(d);
      }
    }

 SocketOutputStream写

  public void write(byte[] b, int off, int len) throws IOException {
    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
	//循环写,直到写完或抛异常
    while (buf.hasRemaining()) {
      try {
        if (write(buf) < 0) {
          throw new IOException("The stream is closed");
        }
      } catch (IOException e) {
        /* Unlike read, write can not inform user of partial writes.
         * So will close this if there was a partial write.
         */
        if (buf.capacity() > buf.remaining()) {
          writer.close();
        }
        throw e;
      }
    }
  }

 业务线程发送请求后,就进入等待状态,read线程则等待server端返回的数据,将返回数据反序列化后,放入call对象,并唤醒业务线程继续处理

 

  • [HBase]RPC框架之client实现
            
    
    
        hbaserpcclient 
  • 大小: 231.4 KB
相关标签: hbase rpc client