欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop高可用集群

程序员文章站 2022-05-06 10:33:16
1.简介 若HDFS集群中只配置了一个NameNode,那么当该NameNode所在的节点宕机,则整个HDFS就不能进行文件的上传和下载。 若YARN集群中只配置了一个ResourceManager,那么当该ResourceManager所在的节点宕机,则整个YARN就不能进行任务的计算。 *Had ......

1.简介

 

 

若hdfs集群中只配置了一个namenode,那么当该namenode所在的节点宕机,则整个hdfs就不能进行文件的上传和下载。

若yarn集群中只配置了一个resourcemanager,那么当该resourcemanager所在的节点宕机,则整个yarn就不能进行任务的计算。

*hadoop依赖zookeeper进行各个模块的ha配置,其中状态为active的节点对外提供服务,而状态为standby的节点则只负责数据的同步,在必要时提供快速故障转移。

 

hadoop各个模块剖析:

hadoop集群管理:

 

 

2.hdfs ha集群

 

 

2.1 模型


当有两个namenode时,提供哪个namenode地址给客户端?

 

Hadoop高可用集群

 

1.hadoop提供了nameservice进程,其是namenode的代理,维护namenode列表并存储namenode的状态,客户端直接访问的是nameservice,nameservice会将请求转发给当前状态为active的namenode。

2.当启动hdfs时,datanode将同时向两个namenode进行注册。

 

 

怎样发现namenode无法提供服务以及如何进行namenode间状态的切换?

Hadoop高可用集群

 



1.hadoop提供了failovercontrolleractive和failovercontrollerstandby两个进程用于namenode的生命监控。

2.failovercontrolleractive和failovercontrollerstandby会分别监控对应状态的namenode,若namenode无异常则定期向zookeeper集群发送心跳,若在一定时间内zookeeper集群没收到failovercontrolleractive发送的心跳,则认为此时状态为active的namenode已经无法对外提供服务,因此将状态为standby的namenode切换为active状态。

 

namenode之间的数据如何进行同步和共享?

1.hadoop提供了journalnode用于存放namenode中的编辑日志。

2.当激活的namenode执行任何名称空间上的修改时,它将修改的记录保存到journalnode集群中,备用的namenode能够实时监控journalnode集群中日志的变化,当监控到日志发生改变时会将其同步到本地。

 

*当状态为active的namenode无法对外提供服务时,zookeeper将会自动的将处于standby状态的namenode切换成active。

 

 

2.2 hdfs ha高可用集群搭建

 

1.安装并配置zookeeper集群

 

2.配置hdfs(hdfs-site.xml)

<configuration> 
  <!-- 指定nameservice的名称 -->  
  <property> 
    <name>dfs.nameservices</name>  
    <value>mycluster</value> 
  </property>  
  <!-- 指定nameservice下两个namenode的名称 -->  
  <property> 
    <name>dfs.ha.namenodes.mycluster</name>  
    <value>nn1,nn2</value> 
  </property>  
  <!-- 分别指定namenode的rpc通讯地址 -->  
  <property> 
    <name>dfs.namenode.rpc-address.mycluster.nn1</name>  
    <value>192.168.1.80:8020</value> 
  </property>  
  <property> 
    <name>dfs.namenode.rpc-address.mycluster.nn2</name>  
    <value>192.168.1.81:8020</value> 
  </property>  
  <!-- 分别指定namenode的web监控页面地址 -->  
  <property> 
    <name>dfs.namenode.http-address.mycluster.nn1</name>  
    <value>192.168.1.80:50070</value> 
  </property>  
  <property> 
    <name>dfs.namenode.http-address.mycluster.nn2</name>  
    <value>192.168.1.81:50070</value> 
  </property>  
  <!-- 指定namenode编辑日志存储在journalnode集群中的目录-->  
  <property> 
    <name>dfs.namenode.shared.edits.dir</name>  
    <value>qjournal://192.168.1.80:8485;192.168.1.81:8485;192.168.1.82:8485/mycluster</value> 
  </property>
  <!-- 指定journalnode集群存放日志的目录-->  
  <property> 
    <name>dfs.journalnode.edits.dir</name>  
    <value>/usr/hadoop/hadoop-2.9.0/journalnode</value> 
  </property>  
  <!-- 配置namenode失败自动切换的方式-->  
  <property> 
    <name>dfs.client.failover.proxy.provider.mycluster</name>  
    <value>org.apache.hadoop.hdfs.server.namenode.ha.configuredfailoverproxyprovider</value> 
  </property>  
  <!-- 配置隔离机制-->  
  <property> 
    <name>dfs.ha.fencing.methods</name>  
    <value>sshfence</value> 
  </property>  
  <!-- 由于使用ssh,那么需要指定密钥的位置-->  
  <property> 
    <name>dfs.ha.fencing.ssh.private-key-files</name>  
    <value>/root/.ssh/id_rsa</value> 
  </property>  
  <!-- 开启失败故障自动转移-->  
  <property> 
    <name>dfs.ha.automatic-failover.enabled</name>  
    <value>true</value> 
  </property>  
  <!-- 配置zookeeper地址-->  
  <property> 
    <name>ha.zookeeper.quorum</name>  
    <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value> 
  </property>  
  <!-- 文件在hdfs中的备份数(小于等于namenode) -->  
  <property> 
    <name>dfs.replication</name>  
    <value>3</value> 
  </property>  
  <!-- 关闭hdfs的访问权限 -->  
  <property> 
    <name>dfs.permissions.enabled</name>  
    <value>false</value> 
  </property>  
  <!-- 指定一个配置文件,使namenode过滤配置文件中指定的host -->  
  <property> 
    <name>dfs.hosts.exclude</name>  
    <value>/usr/hadoop/hadoop-2.9.0/etc/hadoop/hdfs.exclude</value> 
  </property> 
</configuration>

 

*指定namenode的rpc通讯地址是为了接收failovercontrolleractive和failovercontrollerstandby以及datanode发送的心跳。

 

3.配置hadoop公共属性(core-site.xml)

<configuration> 
  <!-- hadoop工作目录,用于存放hadoop运行时namenode、datanode产生的数据 -->  
  <property> 
    <name>hadoop.tmp.dir</name>  
    <value>/usr/hadoop/hadoop-2.9.0/data</value> 
  </property>  
  <!-- 默认namenode,使用nameservice的名称 -->  
  <property> 
    <name>fs.defaultfs</name>  
    <value>hdfs://mycluster</value> 
  </property>  
  <!-- 开启hadoop的回收站机制,当删除hdfs中的文件时,文件将会被移动到回收站(/usr/<username>/.trash),在指定的时间过后再对其进行删除,此机制可以防止文件被误删除 -->  
  <property> 
    <name>fs.trash.interval</name>  
    <!-- 单位是分钟 -->  
    <value>1440</value> 
  </property> 
</configuration>

 

*在hdfs ha集群中,standby的namenode会对namespace进行checkpoint操作,因此就不需要在ha集群中运行secondarynamenode、checkpintnode、backupnode。

 

4.启动hdfs ha高可用集群

1.分别启动journalnode

Hadoop高可用集群

Hadoop高可用集群

Hadoop高可用集群

 

2.格式化第一个namenode并启动

Hadoop高可用集群

Hadoop高可用集群

 

3.第二个namenode同步第一个namenode的信息

Hadoop高可用集群

 

4.启动第二个namenode

Hadoop高可用集群

 

5.启动zookeeper集群

Hadoop高可用集群

Hadoop高可用集群

Hadoop高可用集群

 

6.格式化zookeeper

Hadoop高可用集群

*当格式化zk后,zk中将会多了hadoop-ha节点。

 

7.重启hdfs集群

Hadoop高可用集群

Hadoop高可用集群

 

当hdfs ha集群启动完毕后,可以分别访问namenode管理页面查看当前namenode的状态,、。

 

Hadoop高可用集群

 Hadoop高可用集群

*可以查看到主机名为hadoop1的namnode其状态为standby,而主机名为hadoop2的namenode其状态为active。

 

8.模拟namenode宕机,手动杀死进程。

Hadoop高可用集群

 

此时访问namenode管理页面,可见主机名为hadoop1的namenode其状态从原本的standby切换成active。

Hadoop高可用集群

 

 

2.3 java操作hdfs ha集群

 

*由于在hdfs ha集群中存在两个namenode,且服务端暴露的是nameservice,因此在通过java连接hdfs ha集群时需要使用configuration实例进行相关的配置。

 

/**
 * @auther: zhuanghaotang
 * @date: 2018/11/6 11:49
 * @description:
 */
public class hdfsutils {

    /**
     * hdfs namennode url
     */
    private static final string namenode_url = "hdfs://mycluster:8020";

    /**
     * 配置项
     */
    private static configuration conf = null;

    static {
        conf = new configuration();
        //指定默认连接的namenode,使用nameservice的名称
        conf.set("fs.defaultfs", "hdfs://mycluster");
        //指定nameservice的名称
        conf.set("dfs.nameservices", "mycluster");
        //指定nameservice下的namenode列表
        conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2");
        //分别指定namenode的rpc通讯地址
        conf.set("dfs.namenode.rpc-address.mycluster.nn1", "hadoop1:8020");
        conf.set("dfs.namenode.rpc-address.mycluster.nn2", "hadoop2:8020");
        //配置namenode失败自动切换的方式
        conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.configuredfailoverproxyprovider");
    }

    /**
     * 创建目录
     */
    public static void mkdir(string dir) throws exception {
        if (stringutils.isblank(dir)) {
            throw new exception("parameter is null");
        }
        dir = namenode_url + dir;
        filesystem fs = filesystem.get(uri.create(namenode_url), conf);
        if (!fs.exists(new path(dir))) {
            fs.mkdirs(new path(dir));
        }
        fs.close();
    }

    /**
     * 删除目录或文件
     */
    public static void delete(string dir) throws exception {
        if (stringutils.isblank(dir)) {
            throw new exception("parameter is null");
        }
        dir = namenode_url + dir;
        filesystem fs = filesystem.get(uri.create(namenode_url), conf);
        fs.delete(new path(dir), true);
        fs.close();
    }


    /**
     * 遍历指定路径下的目录和文件
     */
    public static list<string> listall(string dir) throws exception {
        list<string> names = new arraylist<>();
        if (stringutils.isblank(dir)) {
            throw new exception("parameter is null");
        }
        dir = namenode_url + dir;
        filesystem fs = filesystem.get(uri.create(dir), conf);
        filestatus[] files = fs.liststatus(new path(dir));
        for (int i = 0, len = files.length; i < len; i++) {
            if (files[i].isfile()) { //文件
                names.add(files[i].getpath().tostring());
            } else if (files[i].isdirectory()) { //目录
                names.add(files[i].getpath().tostring());
            } else if (files[i].issymlink()) { //软或硬链接
                names.add(files[i].getpath().tostring());
            }
        }
        fs.close();
        return names;
    }


    /**
     * 上传当前服务器的文件到hdfs中
     */
    public static void uploadlocalfiletohdfs(string localfile, string hdfsfile) throws exception {
        if (stringutils.isblank(localfile) || stringutils.isblank(hdfsfile)) {
            throw new exception("parameter is null");
        }
        hdfsfile = namenode_url + hdfsfile;
        filesystem fs = filesystem.get(uri.create(namenode_url), conf);
        path src = new path(localfile);
        path dst = new path(hdfsfile);
        fs.copyfromlocalfile(src, dst);
        fs.close();
    }


    /**
     * 通过流上传文件
     */
    public static void uploadfile(string hdfspath, inputstream inputstream) throws exception {
        if (stringutils.isblank(hdfspath)) {
            throw new exception("parameter is null");
        }
        hdfspath = namenode_url + hdfspath;
        filesystem fs = filesystem.get(uri.create(namenode_url), conf);
        fsdataoutputstream os = fs.create(new path(hdfspath));
        bufferedinputstream bufferedinputstream = new bufferedinputstream(inputstream);
        byte[] data = new byte[1024];
        while (bufferedinputstream.read(data) != -1) {
            os.write(data);
        }
        os.close();
        fs.close();
    }


    /**
     * 从hdfs中下载文件
     */
    public static byte[] readfile(string hdfsfile) throws exception {
        if (stringutils.isblank(hdfsfile)) {
            throw new exception("parameter is null");
        }
        hdfsfile = namenode_url + hdfsfile;
        filesystem fs = filesystem.get(uri.create(namenode_url), conf);
        path path = new path(hdfsfile);
        if (fs.exists(path)) {
            fsdatainputstream is = fs.open(path);
            filestatus stat = fs.getfilestatus(path);
            byte[] data = new byte[(int) stat.getlen()];
            is.readfully(0, data);
            is.close();
            fs.close();
            return data;
        } else {
            throw new exception("file not found in hdfs");
        }
    }


}

 

 

 

3.yarn ha集群

 

 

3.1 模型

 

 

Hadoop高可用集群

 

 

*启动两个resourcemanager后分别向zookeeper注册,通过zookeeper管理他们的状态,一旦状态为active的resourcemanager无法正常提供服务,zookeeper将会立即将状态为standby的resourcemanager切换为active。

 

 

3.2 yarn ha高可用集群搭建 

 

1.配置yarn(yarn-site.xml)

<configuration> 
  <!-- 配置reduce取数据的方式是shuffle(随机) -->  
  <property> 
    <name>yarn.nodemanager.aux-services</name>  
    <value>mapreduce_shuffle</value> 
  </property>  
  <!-- 开启日志 -->  
  <property> 
    <name>yarn.log-aggregation-enable</name>  
    <value>true</value> 
  </property>  
  <!-- 设置日志的删除时间 -1:禁用,单位为秒 -->  
  <property> 
    <name>yarn.log-aggregation。retain-seconds</name>  
    <value>864000</value> 
  </property>  
  <!-- 设置yarn的内存大小,单位是mb -->  
  <property> 
    <name>yarn.nodemanager.resource.memory-mb</name>  
    <value>8192</value> 
  </property>  
  <!-- 设置yarn的cpu核数 -->  
  <property> 
    <name>yarn.nodemanager.resource.cpu-vcores</name>  
    <value>8</value> 
  </property>
  <!-- yarn ha配置 -->  
  <!-- 开启yarn ha -->  
  <property> 
    <name>yarn.resourcemanager.ha.enabled</name>  
    <value>true</value> 
  </property>  
  <!-- 指定yarn ha的名称 -->  
  <property> 
    <name>yarn.resourcemanager.cluster-id</name>  
    <value>cluster1</value> 
  </property>  
  <!-- 分别指定两个resourcemanager的名称 -->  
  <property> 
    <name>yarn.resourcemanager.ha.rm-ids</name>  
    <value>rm1,rm2</value> 
  </property>  
  <!-- 分别指定两个resourcemanager的地址 -->  
  <property> 
    <name>yarn.resourcemanager.hostname.rm1</name>  
    <value>192.168.1.80</value> 
  </property>  
  <property> 
    <name>yarn.resourcemanager.hostname.rm2</name>  
    <value>192.168.1.81</value> 
  </property>  
  <!-- 分别指定两个resourcemanager的web访问地址 -->  
  <property> 
    <name>yarn.resourcemanager.webapp.address.rm1</name>  
    <value>192.168.1.80:8088</value> 
  </property>  
  <property> 
    <name>yarn.resourcemanager.webapp.address.rm2</name>  
    <value>192.168.1.81:8088</value> 
  </property>  
  <!-- 配置使用的zookeeper集群 -->  
  <property> 
    <name>yarn.resourcemanager.zk-address</name>  
    <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value> 
  </property>  
  <!-- resourcemanager restart配置 -->  
  <!-- 启用resourcemanager的restart功能,当resourcemanager重启时将会保存运行时信息到指定的位置,重启成功后再进行读取 -->  
  <property> 
    <name>yarn.resourcemanager.recovery.enabled</name>  
    <value>true</value> 
  </property>  
  <!-- resourcemanager restart使用的存储方式(实现类) -->  
  <property> 
    <name>yarn.resourcemanager.store.class</name>  
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.zkrmstatestore</value> 
  </property>  
  <!-- resourcemanager重启时数据保存在zookeeper中的目录 -->  
  <property> 
    <name>yarn.resourcemanager.zk-state-store.parent-path</name>  
    <value>/rmstore</value> 
  </property>  
  <!-- nodemanager restart配置 -->  
  <!-- 启用nodemanager的restart功能,当nodemanager重启时将会保存运行时信息到指定的位置,重启成功后再进行读取 -->  
  <property> 
    <name>yarn.nodemanager.recovery.enabled</name>  
    <value>true</value> 
  </property>  
  <!-- nodemanager重启时数据保存在本地的目录 -->  
  <property> 
    <name>yarn.nodemanager.recovery.dir</name>  
    <value>/usr/hadoop/hadoop-2.9.0/data/rsnodemanager</value> 
  </property>  
  <!-- 配置nodemanager的rpc通讯端口 -->  
  <property> 
    <name>yarn.nodemanager.address</name>  
    <value>0.0.0.0:45454</value> 
  </property> 
</configuration>

 

resourcemanager restart使用的存储方式(实现类)

1.resourcemanager运行时的数据保存在zk中:org.apache.hadoop.yarn.server.resourcemanager.recovery.zkrmstatestore

2.resourcemanager运行时的数据保存在hdfs中:org.apache.hadoop.yarn.server.resourcemanager.recovery.filesystemrmstatestore

3.resourcemanager运行时的数据保存在本地:org.apache.hadoop.yarn.server.resourcemanager.recovery.leveldbrmstatestore

*使用不同的存储方式将需要额外的配置项,可参考官网,http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/resourcemanagerrestart.html

 

 

2.启动yarn ha高可用集群

 

1.在resourcemanager所在节点中启动yarn集群

Hadoop高可用集群

 

2.手动启动另一个resourcemanager

 Hadoop高可用集群

 

*当启动yarn ha集群后,可以分别访问resourcemanager管理页面,、。

访问状态为standby的resourcemanager时,会将请求重定向到状态为active的resourcemanager的管理页面。

 

3.模拟resourcemanager宕机,手动杀死进程

Hadoop高可用集群

 

*zookeeper在一定时间内无法接收到状态为active的resourcemanager发送的心跳时,将会立即将状态为standby的resourcemanager切换为active。