欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop_rpc简介

程序员文章站 2022-07-14 15:18:13
...

 

1.RPC简介
 1.1 RPC (remote procedure call)远程过程调用,

       指的是java进程,即一个java进程调用另一个java进程中对象的方法。

 1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
 1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
 1.4 RPC是hadoop构建的基础。

 1.5 RPC实际上就是socket通信,只要知道了对方地址和端口,即可实现通信,因此RPC可以实现多进程间交流,更适合于Hadoop集群不同地址(分布式)下的通信


2 模拟rpc机制,自定义代码写法:

  2.1 需要一个接口和具体业务类,在接口中定义好需要被远程调用的业务类

(相当于公约,定义好公共规则-要调用的方法,然后客户端调用时直接传递规约中指定好的方法,最终调用到服务的这个方法上)

  2.2 需要有RPC客户端和服务端类

  2.3 代码如下:

1 接口:

import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyBizAble extends VersionedProtocol{

	public String sayHello(String name);
}

2 业务实现类:

public class MyBiz implements MyBizAble{

	
	public String sayHello(String name){
		
		return "hello " + name;
	}

	@Override
	public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
		return 0;
	}
}


3 服务端

public class MyServer {

	public static String bindAddress = "localhost";
	public static int port = 1001;
	 
	public static void main(String[] args) throws IOException {
		
		MyBiz myBiz = new MyBiz();
	    
		 /** Construct an RPC server.
	     * @param instance the instance whose methods will be called
	     * @param conf the configuration to use
	     * @param bindAddress the address to bind on to listen for connection
	     * @param port the port to listen for connections on
	     */
将服务端实例myBiz 服务端地址 端口交给RPC管理监听,客户端调用服务端对应方法时,会通过RPC
关联到服务端对应方法,服务端处理好后将结果在通过RPC 网络流向RPC客户端
	    Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
	    server.start();
	}
	
	
}


4 客户端

public class MyClient {

	public static void main(String[] args) throws IOException {
		/**
		 * 构建一个客户端代理对象
		 * 该代理对象实现了命名协议
		 * 代理对象会与指定地址的服务端通讯
		 */
		MyBizAble proxy = (MyBizAble)RPC.waitForProxy(MyBizAble.class, 1001, 
				new InetSocketAddress(MyServer.bindAddress, MyServer.port), 
				new Configuration());
		String result = proxy.sayHello("zm");
		System.out.println(result);
		
	/*	MyBizAble proxy = (MyBizAble)RPC.waitForProxy(
				MyBizAble.class,
				MyBizAble.VERSION,
				new InetSocketAddress(MyServer.ADDRESS, MyServer.PORT),
				new Configuration());
	final String result = proxy.hello("world");
	System.out.println("客户端结果:"+result);
	//关闭网络连接
	RPC.stopProxy(proxy);*/
	
		
	}
}

 

 

 

3 hadoop中rpc结构图



hadoop_rpc简介
 

 
 

 

4 查看namenode源码,确认是否为rpc通讯的一部分,是rpc的服务端

 

4.1 是否长时间运行,一直等待客户端请求(socket server端就是这种典型标志)

4.2 看是否含有服务端代表典型标志:

 

 Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());

 

 

4.3 源码跟踪和解释如下:

namenode rpc代码跟踪如下:

public class NameNode implements ClientProtocol, DatanodeProtocol,
                                 NamenodeProtocol,.... {
 .......
 /** RPC server */
  private Server server;
 ......

 public static void main(String argv[]) throws Exception {
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null); // 点击进入
      if (namenode != null)
        namenode.join();
    } catch (Throwable e) {
      LOG.error(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }	

 如下:
  public static NameNode createNameNode(String argv[], 
                                 Configuration conf) throws IOException {
	...............
     NameNode namenode = new NameNode(conf); 	 // 点击进入
	...............							 
	}

如下:
  /*  ................
    upgrade and create a snapshot of the current file system state  在
	  ...............
  */
  public NameNode(Configuration conf) throws IOException {
    try {
      initialize(conf);// 点击进入
    } catch (IOException e) {
      this.stop();
      throw e;
    }
  }


 如下:
  private void initialize(Configuration conf) throws IOException {
  ...............
  this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());
  ...............
  }

}	  


源码参考完毕,解释如下:


一般的RPC server端代码在调用时:
第一个参数the instance whose methods will be called 表示要调用的业务类的方法,
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());

上面Namenode中,第一个参数this表示我的业务类是Namonode,同时RPC.getServer又出现在了Namenode类,因此Namenode
即表示了业务实现类,又担当了RPC server的功能。
this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());
	


NameNode 作为业务实现类,需要实现公约,才能被客户端调用到,
看代码 
public class NameNode implements ClientProtocol, DatanodeProtocol,NamenodeProtocol,
其中:
ClientProtocol, DatanodeProtocol,NamenodeProtocol 就是公约,分别代表:
ClientProtocol: 定义用户和namenode打交道的方法的接口公约
DatanodeProtocol:Protocol that a DFS datanode uses to communicate with the NameNode
NamenodeProtocol: secondary NameNode uses to communicate with the NameNode


 

 

5 通过代码查看client是如何通过rpc机制来调用到namenode节点的

 

0 客户端上传文件到hdfs为例:
	private static void putData(FileSystem fileSystem) {
		try {
			System.out.println(fileSystem.getClass().getName());//运行后打印结果如下 org.apache.hadoop.hdfs.DistributedFileSystem
			
			FSDataOutputStream out = fileSystem.create(new Path(FILE)); // 点击进入
			FileInputStream in = new FileInputStream("E:/seq100w.txt"); 
			IOUtils.copyBytes(in, out, 1024, true);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	如下:
	 public FSDataOutputStream create(Path f) throws IOException {
    return create(f, true); // 点击进入
  }
   不断点击进入,直到如下:
  
  如下:
    public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;

	  
   去org.apache.hadoop.hdfs.DistributedFileSystem中找方法create:
     public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite,
    int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {

    statistics.incrementWriteOps(1);
    return new FSDataOutputStream
       (dfs.create(getPathName(f), permission,  // 点击进入 dfs.create
                   overwrite, true, replication, blockSize, progress, bufferSize),
        statistics);
  } //  返回类是一个流, 流就需要一个目的地,
  
  如下: 进入到类DFSClient.java 
  
  
  Create a new dfs file with the specified block replication 
   * with write-progress reporting and return an output stream for writing
   * into the file.  
   // 主要关注两件事: 1 在namenode上创建了一个dfs file  2 创建用于存储用户上传文件数据的文件流
  public OutputStream create(String src, 
                             FsPermission permission,
                             boolean overwrite, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getDefault();
    }
    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
    LOG.debug(src + ": masked=" + masked);
    final DFSOutputStream result = new DFSOutputStream(src, masked,  // 点击进入
        overwrite, createParent, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));
    beginFileLease(src, result);
    return result;
  }
   
  
      如下: 在类DFSClient.java内,有内部类DFSOutputStream
	  DFSClient.java {
	  
	  public final ClientProtocol namenode;
	  
      /**
     * Create a new output stream to the given DataNode.//创建指向特定datanode的输出流
     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
     */
    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        boolean createParent, short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
      this(src, blockSize, progress, bytesPerChecksum, replication);

      computePacketChunkSize(writePacketSize, bytesPerChecksum);

      try {
        // Make sure the regular create() is done through the old create().
        // This is done to ensure that newer clients (post-1.0) can talk to
        // older clusters (pre-1.0). Older clusters lack the new  create()
        // method accepting createParent as one of the arguments.
        if (createParent) {
          namenode.create( // 看这里,这里的namenode是ClientProtocol类型, 
		                   // 此时才真正是 DFSClient.java通过ClientProtocol来和NamoNode搭上线
						   // 此时RCP客户端已经调用了公约ClientProtocol的create方法,
            src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(
            src, masked, clientName, overwrite, false, replication, blockSize);
        }
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.class,
                                       FileNotFoundException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      streamer.start();
    }
	
	
	而 ClientProtocol namenode的初始化操作在 DFSClient构造函数中,
	DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats) {
	   ......
	   277行 this.namenode = createNamenode(this.rpcNamenode, conf); 进行了初始化操作
	  }

	  
	而DFSClient初始化在类DistributedFileSystem:
	 public void initialize(URI uri, Configuration conf) throws IOException {
	 ...
	this.dfs = new DFSClient(namenode, conf, statistics);  // 100行
	...
	}
  }
  

 

流程图如下:


hadoop_rpc简介
 

 

 

6  通过代码查看DataNode与NameNode是如何RPC通信的(涉及心跳机制简介)

 

 

DataNode:{

public DatanodeProtocol namenode = null;

开始:
public static void main(String args[]) {
    secureMain(args, null);
  }
  
  
  
 一路跟踪到:
 
 void startDataNode(Configuration conf, 
                     AbstractList<File> dataDirs, SecureResources resources
                     ) throws IOException {
	...
	 // connect to name node     370行  得到DatanodeProtocol的客户端实例
    this.namenode = (DatanodeProtocol)   
      RPC.waitForProxy(DatanodeProtocol.class,
                       DatanodeProtocol.versionID,
                       nameNodeAddr, 
                       conf);
	...
	
}	

通过搜索  namenode.  来看RPC客户端Datanode都调用了DatanodeProtocol公约的哪些方法.
eg: namenode.sendHeartbeat(),  


而NameNode通过实现规约DatanodeProtocol的方法 来提供RPC服务端。
这就是DataNode和NameNode交流的过程。





心跳机制:

查看DataNode sendHeartbeat(...)方法 如下:

DataNode :

public void offerService() throws Exception {
     
    LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
       " Initial delay: " + initialBlockReportDelay + "msec");

    //
    // Now loop for a long time....
    //

    while (shouldRun) {
      try {
        long startTime = now();

        //
        // Every so often, send heartbeat or block-report      在间隔大于3S下 执行相关操作
        //
        
        if (startTime - lastHeartbeat > heartBeatInterval) {
		
		  ....
		  }
	

}

 

 

 下面是hadoop-mapreduce-rpc 大概图:

 


hadoop_rpc简介
 

 

 

 map-reduce 交互详细图:


hadoop_rpc简介
 

 

 

上一篇: Hadoop简介

下一篇: Hadoop RPC简介