大数据框架hadoop的IPC机制实例
如下是一个使用Hadoop IPC实现客户端调用服务器端方法的示例,功能是返回服务器端的一个文件信息。
1 文件信息类IPCFileStatus
代码如下所示:
package org.seandeng.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class IPCFileStatus implements Writable {
private String filename;
private long time;
public IPCFileStatus() {
}
public IPCFileStatus(String filename) {
this.filename=filename;
this.time=(new Date()).getTime();
}
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public String toString() {
return "File: "+filename+" Create at "+(new Date(time));
}
public void readFields(DataInput in) throws IOException {
this.filename = Text.readString(in);
this.time = in.readLong();
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, filename);
out.writeLong(time);
}
}
由于IPCFileStatus类的对象需要从服务器端传到客户端,所以就需要进行序列化,Writable接口就是Hadoop定义的一个序列化接口。
由于客户端要调用服务器的方法,所以客户端需要知道服务器有哪些方法可以调用,在IPC中使用的是定义接口的方法,如定义一个IPC接口,客户端和服务器端都知道这个接口,客户端通过IPC获取到一个服务器端这个实现了接口的引用,待要调用服务器的方法时,直接使用这个引用来调用方法,这样就可以调用服务器的方法了。
2 接口IPCQueryStatus
定义一个服务器端和客户端接口IPCQueryStatus如下所示:
package org.seandeng.hadoop.ipc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface IPCQueryStatus extends VersionedProtocol {
IPCFileStatus getFileStatus(String filename);
}
在接口IPCQueryStatus中,定义了一个getFileStatus(String filename)方法,根据文件名得到一个IPCFileStatus对象,注意到IPCQueryStatus接口继承自接口 org.apache.hadoop.ipc.VersionedProtocol接口,VersionedProtocol接口是Hadoop IPC接口必须继承的一个接口,它定义了一个方法getProtocolVersion(),用于返回服务器端的接口实现的版本号,有两个参数,分别是协议接口对应的接口名称protocol和客户端期望服务器的版本号clientVersion,主要作用是检查通信双方的接口是否一致,VersionedProtocol的代码如下:
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* Superclass of all protocols that use Hadoop RPC.
* Subclasses of this interface are also supposed to have
* a static final long versionID field.
*/
public interface VersionedProtocol {
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak
*/
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
}
3 实现类IPCQueryStatusImpl
定义好了接口,那么在服务器端就需要有一个接口的实现类,用于实现具体的业务逻辑,下面的IPCQueryStatusImpl类实现了IPCQueryStatus接口,仅仅简单实现了IPCQueryStatus规定两个方法。
package org.seandeng.hadoop.ipc;
import java.io.IOException;
public class IPCQueryStatusImpl implements IPCQueryStatus {
public IPCQueryStatusImpl() {}
public IPCFileStatus getFileStatus(String filename) {
IPCFileStatus status=new IPCFileStatus(filename);
System.out.println("Method getFileStatus Called, return: "+status);
return status;
}
/**
* 用于服务器与客户端,进行IPC接口版本检查,再服务器返回给客户端时调用,如果服务器端的IPC版本与客户端不一致
* 那么就会抛出版本不一致的异常
*/
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
System.out.println("protocol: "+protocol);
System.out.println("clientVersion: "+clientVersion);
return IPCQueryServer.IPC_VER;
}
}
getFileStatus()方法根据参数filename创建了一个IPCFileStatus对象,getProtocolVersion()方法返回服务器端使用的接口版本。接口和实现类都完成之后就可以用客户端和服务器进行通信了。
4 IPCQueryServer类
服务器端进行一些成员变量的初始化,然后使用Socket绑定IP,然后在某个端口上监听客户端的请求。IPCQueryServer类相关代码如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class IPCQueryServer {
public static final int IPC_PORT = 32121;
public static final long IPC_VER = 5473L;
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
IPCQueryStatusImpl queryService=new IPCQueryStatusImpl();
System.out.println(conf);
Server server = RPC.getServer(queryService, "127.0.0.1", IPC_PORT, 1, false, conf);
server.start();
System.out.println("Server ready, press any key to stop");
System.in.read();
server.stop();
System.out.println("Server stopped");
} catch (Exception e) {
e.printStackTrace();
}
}
}
在服务器端先创建一个IPCQueryStatusImpl的对象,传递到RPC.getServer()方法中。服务器端使用RPC.getServer()方法穿给创建服务器端对象server,代码中RPC.getServer()方法的几个参数说明如下:
· 第一个参数queryService标识该服务器对象对外提供的服务对象实例,即客户端所要调用的具体对象,下面客户端的代码调用的接口如此对应;
· 第二个参数"127.0.0.1"表示监绑定所有的IP地址;
· 第三个参数IPC_PORT表示监听的端口;
· 第四个参数1表示Server端的Handler实例(线程)的个数为1
· 第五个参数false表示不打开调用方法日志;
· 第六个参数是Configuration对象,用于定制Server端的配置。
创建Server对象之后,调用Server.start()方法开始监听客户端的请求,并根据客户端的请求提供服务。
5 请求类IPCQueryClient
客户端需要先获取到一个代理对象,然后才能进行方法调用,在IPC中,使用RPC.getProxy()方法获取代理对象。客户端的代码如下:
package org.seandeng.hadoop.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class IPCQueryClient {
public static void main(String[] args) {
try {
System.out.println("Interface name: "+IPCQueryStatus.class.getName());
System.out.println("Interface name: "+IPCQueryStatus.class.getMethod("getFileStatus", String.class).getName());
InetSocketAddress addr=new InetSocketAddress("localhost", IPCQueryServer.IPC_PORT);
IPCQueryStatus query=(IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr,new Configuration());
IPCFileStatus status=query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");
System.out.println(status);
RPC.stopProxy(query);
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端的代码很简单,首先构造一个要请求服务器的网络地址(IP和端口),然后通过RPC.getProxy()方法获取到一个IPCQueryStatus对象,然后进行相应的方法调用。其中客户端代码中RPC.getProxy()方法的参数说明如下:
· 第一个参数是IPC接口对象,可以通过IPC接口的静态成员class直接获得。接口的静态成员class保存了该接口的java.lang.Class实例,它表示正在运行的Java应用程序中的类和接口,提供一系列与Java反射相关的重要功能;
· 第二个参数是接口版本,由于接口会根据需求不断地进行升级,形成多个版本的IPC接口,如果客户端和服务器端使用的IPC接口版本不一致,结果将是灾难性的,所以在建立IPC时,需要对IPC的双方进行版本检查;
· 第三个参数是服务器的Socket地址,用于建立IPC的底层TCP连接;
· 第四个参数是Configuration对象,用于定制IPC客户端参数。
6 执行结果
客户端的代码编写完成之后就可以运行程序了,先启动服务器端,再运行一个客户端,就完成了一次客户端调用服务器的过程,客户端调用了服务器端 IPCQueryStatusImpl对象的getFileStatus()方法,服务器端返回了方法调用结果即IPCFileStatus对象。服务器端和客户端执行日志如下所示:
服务器端:
2014-11-26 13:00:49,147 WARN conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively Configuration: core-default.xml, core-site.xml 2014-11-26 13:00:50,124 INFO ipc.Server (Server.java:run(328)) - Starting SocketReader 2014-11-26 13:00:50,222 INFO ipc.Server (Server.java:run(598)) - IPC Server Responder: starting 2014-11-26 13:00:50,223 INFO ipc.Server (Server.java:run(434)) - IPC Server listener on 32121: starting Server ready, press any key to stop 2014-11-26 13:00:50,224 INFO ipc.Server (Server.java:run(1358)) - IPC Server handler 0 on 32121: starting protocol: org.seandeng.hadoop.ipc.IPCQueryStatus clientVersion: 5473 Method getFileStatus Called, return: File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014 |
客户端:
Interface name: org.seandeng.hadoop.ipc.IPCQueryStatus Interface name: getFileStatus 2014-11-26 13:00:59,790 WARN conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014 |
推荐阅读