hadoop_rpc简介
1.RPC简介
1.1 RPC (remote procedure call)远程过程调用,
指的是java进程,即一个java进程调用另一个java进程中对象的方法。
1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
1.4 RPC是hadoop构建的基础。
1.5 RPC实际上就是socket通信,只要知道了对方地址和端口,即可实现通信,因此RPC可以实现多进程间交流,更适合于Hadoop集群不同地址(分布式)下的通信
2 模拟rpc机制,自定义代码写法:
2.1 需要一个接口和具体业务类,在接口中定义好需要被远程调用的业务类
(相当于公约,定义好公共规则-要调用的方法,然后客户端调用时直接传递规约中指定好的方法,最终调用到服务的这个方法上)
2.2 需要有RPC客户端和服务端类
2.3 代码如下:
1 接口:
import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyBizAble extends VersionedProtocol{
public String sayHello(String name);
}
2 业务实现类:
public class MyBiz implements MyBizAble{
public String sayHello(String name){
return "hello " + name;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 0;
}
}
3 服务端
public class MyServer {
public static String bindAddress = "localhost";
public static int port = 1001;
public static void main(String[] args) throws IOException {
MyBiz myBiz = new MyBiz();
/** Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*/
将服务端实例myBiz 服务端地址 端口交给RPC管理监听,客户端调用服务端对应方法时,会通过RPC
关联到服务端对应方法,服务端处理好后将结果在通过RPC 网络流向RPC客户端
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
server.start();
}
}
4 客户端
public class MyClient {
public static void main(String[] args) throws IOException {
/**
* 构建一个客户端代理对象
* 该代理对象实现了命名协议
* 代理对象会与指定地址的服务端通讯
*/
MyBizAble proxy = (MyBizAble)RPC.waitForProxy(MyBizAble.class, 1001,
new InetSocketAddress(MyServer.bindAddress, MyServer.port),
new Configuration());
String result = proxy.sayHello("zm");
System.out.println(result);
/* MyBizAble proxy = (MyBizAble)RPC.waitForProxy(
MyBizAble.class,
MyBizAble.VERSION,
new InetSocketAddress(MyServer.ADDRESS, MyServer.PORT),
new Configuration());
final String result = proxy.hello("world");
System.out.println("客户端结果:"+result);
//关闭网络连接
RPC.stopProxy(proxy);*/
}
}
3 hadoop中rpc结构图
4 查看namenode源码,确认是否为rpc通讯的一部分,是rpc的服务端
4.1 是否长时间运行,一直等待客户端请求(socket server端就是这种典型标志)
4.2 看是否含有服务端代表典型标志:
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
4.3 源码跟踪和解释如下:
namenode rpc代码跟踪如下:
public class NameNode implements ClientProtocol, DatanodeProtocol,
NamenodeProtocol,.... {
.......
/** RPC server */
private Server server;
......
public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null); // 点击进入
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
如下:
public static NameNode createNameNode(String argv[],
Configuration conf) throws IOException {
...............
NameNode namenode = new NameNode(conf); // 点击进入
...............
}
如下:
/* ................
upgrade and create a snapshot of the current file system state 在
...............
*/
public NameNode(Configuration conf) throws IOException {
try {
initialize(conf);// 点击进入
} catch (IOException e) {
this.stop();
throw e;
}
}
如下:
private void initialize(Configuration conf) throws IOException {
...............
this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
...............
}
}
源码参考完毕,解释如下:
一般的RPC server端代码在调用时:
第一个参数the instance whose methods will be called 表示要调用的业务类的方法,
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
上面Namenode中,第一个参数this表示我的业务类是Namonode,同时RPC.getServer又出现在了Namenode类,因此Namenode
即表示了业务实现类,又担当了RPC server的功能。
this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
NameNode 作为业务实现类,需要实现公约,才能被客户端调用到,
看代码
public class NameNode implements ClientProtocol, DatanodeProtocol,NamenodeProtocol,
其中:
ClientProtocol, DatanodeProtocol,NamenodeProtocol 就是公约,分别代表:
ClientProtocol: 定义用户和namenode打交道的方法的接口公约
DatanodeProtocol:Protocol that a DFS datanode uses to communicate with the NameNode
NamenodeProtocol: secondary NameNode uses to communicate with the NameNode
5 通过代码查看client是如何通过rpc机制来调用到namenode节点的
0 客户端上传文件到hdfs为例:
private static void putData(FileSystem fileSystem) {
try {
System.out.println(fileSystem.getClass().getName());//运行后打印结果如下 org.apache.hadoop.hdfs.DistributedFileSystem
FSDataOutputStream out = fileSystem.create(new Path(FILE)); // 点击进入
FileInputStream in = new FileInputStream("E:/seq100w.txt");
IOUtils.copyBytes(in, out, 1024, true);
} catch (Exception e) {
e.printStackTrace();
}
}
如下:
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true); // 点击进入
}
不断点击进入,直到如下:
如下:
public abstract FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException;
去org.apache.hadoop.hdfs.DistributedFileSystem中找方法create:
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream
(dfs.create(getPathName(f), permission, // 点击进入 dfs.create
overwrite, true, replication, blockSize, progress, bufferSize),
statistics);
} // 返回类是一个流, 流就需要一个目的地,
如下: 进入到类DFSClient.java
Create a new dfs file with the specified block replication
* with write-progress reporting and return an output stream for writing
* into the file.
// 主要关注两件事: 1 在namenode上创建了一个dfs file 2 创建用于存储用户上传文件数据的文件流
public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
final DFSOutputStream result = new DFSOutputStream(src, masked, // 点击进入
overwrite, createParent, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
beginFileLease(src, result);
return result;
}
如下: 在类DFSClient.java内,有内部类DFSOutputStream
DFSClient.java {
public final ClientProtocol namenode;
/**
* Create a new output stream to the given DataNode.//创建指向特定datanode的输出流
* @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
*/
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
// Make sure the regular create() is done through the old create().
// This is done to ensure that newer clients (post-1.0) can talk to
// older clusters (pre-1.0). Older clusters lack the new create()
// method accepting createParent as one of the arguments.
if (createParent) {
namenode.create( // 看这里,这里的namenode是ClientProtocol类型,
// 此时才真正是 DFSClient.java通过ClientProtocol来和NamoNode搭上线
// 此时RCP客户端已经调用了公约ClientProtocol的create方法,
src, masked, clientName, overwrite, replication, blockSize);
} else {
namenode.create(
src, masked, clientName, overwrite, false, replication, blockSize);
}
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
streamer.start();
}
而 ClientProtocol namenode的初始化操作在 DFSClient构造函数中,
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) {
......
277行 this.namenode = createNamenode(this.rpcNamenode, conf); 进行了初始化操作
}
而DFSClient初始化在类DistributedFileSystem:
public void initialize(URI uri, Configuration conf) throws IOException {
...
this.dfs = new DFSClient(namenode, conf, statistics); // 100行
...
}
}
流程图如下:
6 通过代码查看DataNode与NameNode是如何RPC通信的(涉及心跳机制简介)
DataNode:{
public DatanodeProtocol namenode = null;
开始:
public static void main(String args[]) {
secureMain(args, null);
}
一路跟踪到:
void startDataNode(Configuration conf,
AbstractList<File> dataDirs, SecureResources resources
) throws IOException {
...
// connect to name node 370行 得到DatanodeProtocol的客户端实例
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
...
}
通过搜索 namenode. 来看RPC客户端Datanode都调用了DatanodeProtocol公约的哪些方法.
eg: namenode.sendHeartbeat(),
而NameNode通过实现规约DatanodeProtocol的方法 来提供RPC服务端。
这就是DataNode和NameNode交流的过程。
心跳机制:
查看DataNode sendHeartbeat(...)方法 如下:
DataNode :
public void offerService() throws Exception {
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
" Initial delay: " + initialBlockReportDelay + "msec");
//
// Now loop for a long time....
//
while (shouldRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report 在间隔大于3S下 执行相关操作
//
if (startTime - lastHeartbeat > heartBeatInterval) {
....
}
}
下面是hadoop-mapreduce-rpc 大概图:
map-reduce 交互详细图:
上一篇: Hadoop简介
下一篇: Hadoop RPC简介