欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据框架hadoop的IPC机制实例

程序员文章站 2022-05-22 17:07:08
...
    Hadoop IPC(Inter-Process Communication,进程间通信)这是一种简洁,低消耗的通信机制,可以精确控制进程间通信中如连接、超时、缓存等细节。Hadoop IPC机制的实现使用了Java动态代理,Java NIO等技术。

如下是一个使用Hadoop IPC实现客户端调用服务器端方法的示例功能是返回服务器端的一个文件信息

1 文件信息类IPCFileStatus

    代码如下所示:

package org.seandeng.hadoop.ipc;

 

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.Date;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

 

public class IPCFileStatus implements Writable {

    private String filename;

    private long time;

    public IPCFileStatus() {

    }

    public IPCFileStatus(String filename) {

        this.filename=filename;

        this.time=(new Date()).getTime();

    }

    public String getFilename() {

        return filename;

    }

    public void setFilename(String filename) {

        this.filename = filename;

    }

    public long getTime() {

        return time;

    }

    public void setTime(long time) {

        this.time = time;

    }

    public String toString() {

        return "File: "+filename+" Create at "+(new Date(time));

    }

    public void readFields(DataInput inthrows IOException {

        this.filename = Text.readString(in);

        this.time = in.readLong();

    }

    public void write(DataOutput outthrows IOException {

        Text.writeString(outfilename);

        out.writeLong(time);

    }

}

由于IPCFileStatus类的对象需要从服务器端传到客户端,所以就需要进行序列化,Writable接口就是Hadoop定义的一个序列化接口。 
    由于客户端要调用服务器的方法,所以客户端需要知道服务器有哪些方法可以调用,在IPC中使用的是定义接口的方法,如定义一个IPC接口,客户端和服务器端都知道这个接口,客户端通过IPC获取到一个服务器端这个实现了接口的引用,待要调用服务器的方法时,直接使用这个引用来调用方法,这样就可以调用服务器的方法了。

2 接口IPCQueryStatus

    定义一个服务器端和客户端接口IPCQueryStatus如下所示:

package org.seandeng.hadoop.ipc;

 

import org.apache.hadoop.ipc.VersionedProtocol;

 

public interface IPCQueryStatus extends VersionedProtocol {

    IPCFileStatus getFileStatus(String filename);

}

    在接口IPCQueryStatus中,定义了一个getFileStatus(String filename)方法根据文件名得到一个IPCFileStatus对象,注意到IPCQueryStatus接口继承自接口 org.apache.hadoop.ipc.VersionedProtocol接口,VersionedProtocol接口是Hadoop IPC接口必须继承的一个接口,它定义了一个方法getProtocolVersion(),用于返回服务器端的接口实现的版本号,有两个参数,分别是协议接口对应的接口名称protocol和客户端期望服务器的版本号clientVersion,主要作用是检查通信双方的接口是否一致,VersionedProtocol的代码如下:

package org.apache.hadoop.ipc;

 

import java.io.IOException;

/**

 * Superclass of all protocols that use Hadoop RPC.

 * Subclasses of this interface are also supposed to have

 * a static final long versionID field.

 */

public interface VersionedProtocol {

  /**

   * Return protocol version corresponding to protocol interface.

   * @param protocol The classname of the protocol interface

   * @param clientVersion The version of the protocol that the client speaks

   * @return the version that the server will speak

   */

  public long getProtocolVersion(String protocol

                                 long clientVersionthrows IOException;

}

3 实现类IPCQueryStatusImpl

    定义好了接口,那么在服务器端就需要有一个接口的实现类,用于实现具体的业务逻辑,下面的IPCQueryStatusImpl类实现了IPCQueryStatus接口,仅仅简单实现了IPCQueryStatus规定两个方法

package org.seandeng.hadoop.ipc;

 

import java.io.IOException;

 

public class IPCQueryStatusImpl implements IPCQueryStatus {

    public IPCQueryStatusImpl() {}

 

    public IPCFileStatus getFileStatus(String filename) {

        IPCFileStatus status=new IPCFileStatus(filename);

        System.out.println("Method getFileStatus Called, return: "+status);

        return status;

    }

    /**

     * 用于服务器与客户端,进行IPC接口版本检查,再服务器返回给客户端时调用,如果服务器端的IPC版本与客户端不一致

     * 那么就会抛出版本不一致的异常

     */

    public long getProtocolVersion(String protocollong clientVersionthrows IOException {

        System.out.println("protocol: "+protocol);

        System.out.println("clientVersion: "+clientVersion);

        return IPCQueryServer.IPC_VER;

    }

}

getFileStatus()方法根据参数filename创建了一个IPCFileStatus对象,getProtocolVersion()方法返回服务器端使用的接口版本。接口和实现类都完成之后就可以用客户端和服务器进行通信了。

4 IPCQueryServer

    服务器端进行一些成员变量的初始化,然后使用Socket绑定IP,然后在某个端口上监听客户端的请求IPCQueryServer类相关代码如下所示:

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

import org.apache.hadoop.ipc.Server;

 

public class IPCQueryServer {

    public static final int IPC_PORT = 32121;

    public static final long IPC_VER = 5473L;

 

    public static void main(String[] args) {

        try {

            Configuration conf = new Configuration();

            IPCQueryStatusImpl queryService=new IPCQueryStatusImpl();

            System.out.println(conf);

            Server server = RPC.getServer(queryService, "127.0.0.1", IPC_PORT, 1, false, conf);

            server.start();

 

            System.out.println("Server ready, press any key to stop");

            System.in.read();

 

            server.stop();

            System.out.println("Server stopped");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

    在服务器端先创建一个IPCQueryStatusImpl的对象,传递到RPC.getServer()方法中。服务器端使用RPC.getServer()方法穿给创建服务器端对象server,代码中RPC.getServer()方法的几个参数说明如下:

· 第一个参数queryService标识该服务器对象对外提供的服务对象实例,即客户端所要调用的具体对象,下面客户端的代码调用的接口如此对应;

· 第二个参数"127.0.0.1"表示监绑定所有的IP地址;

· 第三个参数IPC_PORT表示监听的端口;

· 第四个参数1表示Server端的Handler实例(线程)的个数为1

· 第五个参数false表示打开调用方法日志;

· 第六个参数是Configuration对象,用于定制Server端的配置

创建Server对象之后,调用Server.start()方法开始监听客户端的请求,并根据客户端的请求提供服务。

5 请求类IPCQueryClient

客户端需要先获取到一个代理对象,然后才能进行方法调用,在IPC中,使用RPC.getProxy()方法获取代理对象。客户端的代码如下: 

package org.seandeng.hadoop.ipc;

 

import java.net.InetSocketAddress;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

 

public class IPCQueryClient {

    public static void main(String[] args) {

        try {

            System.out.println("Interface name: "+IPCQueryStatus.class.getName());

            System.out.println("Interface name: "+IPCQueryStatus.class.getMethod("getFileStatus", String.class).getName());

            InetSocketAddress addr=new InetSocketAddress("localhost", IPCQueryServer.IPC_PORT);

            IPCQueryStatus query=(IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr,new Configuration());

            IPCFileStatus status=query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");

            System.out.println(status);

            RPC.stopProxy(query);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

    客户端的代码很简单,首先构造一个要请求服务器的网络地址(IP和端口),然后通过RPC.getProxy()方法获取到一个IPCQueryStatus对象,然后进行相应的方法调用。其中客户端代码中RPC.getProxy()方法的参数说明如下:

· 第一个参数是IPC接口对象,可以通过IPC接口的静态成员class直接获得。接口的静态成员class保存了该接口的java.lang.Class实例,它表示正在运行的Java应用程序中的类和接口,提供一系列与Java反射相关的重要功能;

· 第二个参数是接口版本,由于接口会根据需求不断地进行升级,形成多个版本的IPC接口,如果客户端和服务器端使用的IPC接口版本不一致,结果将是灾难性的,所以在建立IPC时,需要对IPC的双方进行版本检查;

· 第三个参数是服务器的Socket地址,用于建立IPC的底层TCP连接;

· 第四个参数是Configuration对象,用于定制IPC客户端参数

6 执行结果

客户端的代码编写完成之后就可以运行程序了,先启动服务器端,再运行一个客户端,就完成了一次客户端调用服务器的过程,客户端调用了服务器端 IPCQueryStatusImpl对象的getFileStatus()方法,服务器端返回了方法调用结果即IPCFileStatus对象。服务器端和客户端执行日志如下所示:

服务器端:

2014-11-26 13:00:49,147 WARN  conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively

Configuration: core-default.xml, core-site.xml

2014-11-26 13:00:50,124 INFO  ipc.Server (Server.java:run(328)) - Starting SocketReader

2014-11-26 13:00:50,222 INFO  ipc.Server (Server.java:run(598)) - IPC Server Responder: starting

2014-11-26 13:00:50,223 INFO  ipc.Server (Server.java:run(434)) - IPC Server listener on 32121: starting

Server ready, press any key to stop

2014-11-26 13:00:50,224 INFO  ipc.Server (Server.java:run(1358)) - IPC Server handler 0 on 32121: starting

protocol: org.seandeng.hadoop.ipc.IPCQueryStatus

clientVersion: 5473

Method getFileStatus Called, return: File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014

客户端:

Interface name: org.seandeng.hadoop.ipc.IPCQueryStatus

Interface name: getFileStatus

2014-11-26 13:00:59,790 WARN  conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively

File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014