欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop学习(2)-java客户端操作hdfs及secondarynode作用

程序员文章站 2022-04-06 12:58:21
首先要在windows下解压一个windows版本的hadoop 然后在配置他的环境变量,同时要把hadoop的share目录下的hadoop下的相关jar包拷贝到esclipe 然后Build Path 下面上代码 练习:从一个文件里面不断地采集日志上传到hdfs里面 1.流程介绍 启动一个定时任 ......

首先要在windows下解压一个windows版本的hadoop

然后在配置他的环境变量,同时要把hadoop的share目录下的hadoop下的相关jar包拷贝到esclipe

然后build path

下面上代码

import java.io.bufferedreader;
import java.io.fileinputstream;
import java.io.ioexception;
import java.io.inputstreamreader;
import java.net.uri;
import java.net.urisyntaxexception;
import java.util.arrays;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.fsdataoutputstream;
import org.apache.hadoop.fs.filestatus;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.locatedfilestatus;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.fs.remoteiterator;
import org.junit.before;
import org.junit.test;

public class hdfsclientdemo {
    
    
    public static void main(string[] args) throws exception {
        /**
         * configuration参数对象的机制:
         *    构造时,会加载jar包中的默认配置 xx-default.xml
         *    再加载 用户配置xx-site.xml  ,覆盖掉默认参数
         *    构造完成之后,还可以conf.set("p","v"),会再次覆盖用户配置文件中的参数值
         */
        // new configuration()会从项目的classpath中加载core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
        configuration conf = new configuration();
        
        // 指定本客户端上传文件到hdfs时需要保存的副本数为:2
        conf.set("dfs.replication", "2");
        // 指定本客户端上传文件到hdfs时切块的规格大小:64m
        conf.set("dfs.blocksize", "64m");
        
        // 构造一个访问指定hdfs系统的客户端对象: 参数1:——hdfs系统的uri,参数2:——客户端要特别指定的参数,参数3:客户端的身份(用户名)
        filesystem fs = filesystem.get(new uri("hdfs://172.31.2.38:9000/"), conf, "root");
        
        // 上传一个文件到hdfs中
        fs.copyfromlocalfile(new path("d:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new path("/aaa/"));
        
        fs.close();
    }
    
    filesystem fs = null;
    
    @before
    public void init() throws exception{
        configuration conf = new configuration();
        conf.set("dfs.replication", "2");
        conf.set("dfs.blocksize", "64m");
        
        fs = filesystem.get(new uri("hdfs://172.31.2.38:9000/"), conf, "root");
        
    }
    
    
    /**
     * 从hdfs中下载文件到客户端本地磁盘
     * @throws ioexception 
     * @throws illegalargumentexception 
     */
    @test
    public void testget() throws illegalargumentexception, ioexception{
        
        fs.copytolocalfile(new path("/test"), new path("d:/"));
        fs.close();
        
    }
    
    
    /**
     * 在hdfs内部移动文件\修改名称
     */
    @test
    public void testrename() throws exception{
        
        fs.rename(new path("/install.log"), new path("/aaa/in.log"));
        
        fs.close();
        
    }
    
    /**
     * 在hdfs中创建文件夹
     */
    @test
    public void testmkdir() throws exception{
        
        fs.mkdirs(new path("/xx/yy/zz"));
        
        fs.close();
    }
    
    
    /**
     * 在hdfs中删除文件或文件夹
     */
    @test
    public void testrm() throws exception{
        
        fs.delete(new path("/aaa"), true);
        
        fs.close();
    }
    
    
    
    /**
     * 查询hdfs指定目录下的文件信息
     */
    @test
    public void testls() throws exception{
        // 只查询文件的信息,不返回文件夹的信息
        remoteiterator<locatedfilestatus> iter = fs.listfiles(new path("/"), true);
        
        while(iter.hasnext()){
            locatedfilestatus status = iter.next();
            system.out.println("文件全路径:"+status.getpath());
            system.out.println("块大小:"+status.getblocksize());
            system.out.println("文件长度:"+status.getlen());
            system.out.println("副本数量:"+status.getreplication());
            system.out.println("块信息:"+arrays.tostring(status.getblocklocations()));
            
            system.out.println("--------------------------------");
        }
        fs.close();
    }
   

    /**
     * 读取hdfs中的文件的内容
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */
    @test
    public void testreaddata() throws illegalargumentexception, ioexception {

        fsdatainputstream in = fs.open(new path("/test.txt"));

        bufferedreader br = new bufferedreader(new inputstreamreader(in, "utf-8"));

        string line = null;
        while ((line = br.readline()) != null) {
            system.out.println(line);
        }

        br.close();
        in.close();
        fs.close();

    }

    /**
     * 读取hdfs中文件的指定偏移量范围的内容
     * 
     * 
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */
    @test
    public void testrandomreaddata() throws illegalargumentexception, ioexception {

        fsdatainputstream in = fs.open(new path("/xx.dat"));

        // 将读取的起始位置进行指定
        in.seek(12);

        // 读16个字节
        byte[] buf = new byte[16];
        in.read(buf);

        system.out.println(new string(buf));

        in.close();
        fs.close();

    }

    /**
     * 往hdfs中的文件写内容
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */

    @test
    public void testwritedata() throws illegalargumentexception, ioexception {

        fsdataoutputstream out = fs.create(new path("/zz.jpg"), false);

        // d:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

        fileinputstream in = new fileinputstream("d:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

        byte[] buf = new byte[1024];
        int read = 0;
        while ((read = in.read(buf)) != -1) {
            out.write(buf,0,read);
        }
        
        in.close();
        out.close();
        fs.close();

    }

}

 

练习:从一个文件里面不断地采集日志上传到hdfs里面

1.流程介绍

---启动一个定时任务

    --定时探测日志原目录

    --获取文件上传到一个待上传的临时目录

    --逐一上传到hdfs目标路径,同时移动到备份目录里

--启动一个定时任务:

    --探测备份目录中的备份数据是否已经超出,如果超出就删除

 

 主类为:

import java.util.timer;

public class datacollectmain {
    
    public static void main(string[] args) {
        
        timer timer = new timer();
        //第一个为task类,第二个开始时间 第三个没隔多久执行一次
        timer.schedule(new collecttask(), 0, 60*60*1000l);
        
        timer.schedule(new backupcleantask(), 0, 60*60*1000l);
        
    }
    

}

collecttask类:

这个类要继承timertask,重写run方法,主要内容就是不断收集日志文件

package cn.edu360.hdfs.datacollect;

import java.io.file;
import java.io.filenamefilter;
import java.net.uri;
import java.text.simpledateformat;
import java.util.arrays;
import java.util.date;
import java.util.properties;
import java.util.timertask;
import java.util.uuid;

import org.apache.commons.io.fileutils;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.log4j.logger;

public class collecttask extends timertask {

    @override
    public void run() {
        try {
            // 获取配置参数
            properties props = propertyholderlazy.getprops();

            // 获取本次采集时的日期
            simpledateformat sdf = new simpledateformat("yyyy-mm-dd-hh");
            string day = sdf.format(new date());
            
            file srcdir = new file("d:/logs/accesslog");
            // 列出日志源目录中需要采集的文件
            //里面传了一个文件过滤器,重写accept方法,return true就要
            file[] listfiles = srcdir.listfiles(new filenamefilter() {
                @override
                public boolean accept(file dir, string name) {
                    if (name.startswith("access.log")) {
                        return true;
                    }
                    return false;
                }
            });
            // 将要采集的文件移动到待上传临时目录
            file touploaddir = new file("d:/logs/toupload");
            for (file file : listfiles) {
                
                //这里如果是 file.renameto(touploaddir)是不对的,因为会生成一个toupload的文件而不是文件夹
                //要用renameto的话你要自己加上文件的新名字比较麻烦
                //用fileutiles是对file操作的一些工具类
                //第一个目标文件,第二个路径,第三个是否存在覆盖
                fileutils.movefiletodirectory(file, touploaddir, true);
            }

            // 构造一个hdfs的客户端对象
            filesystem fs = filesystem.get(new uri("hdfs://hdp-01:9000"), new configuration(), "root");
            
            file[] touploadfiles = touploaddir.listfiles();

            // 检查hdfs中的日期目录是否存在,如果不存在,则创建
            path hdfsdestpath = new path("/logs" + day);
            if (!fs.exists(hdfsdestpath)) {
                fs.mkdirs(hdfsdestpath);
            }

            // 检查本地的备份目录是否存在,如果不存在,则创建
            file backupdir = new file("d:/logs/backup" + day + "/");
            if (!backupdir.exists()) {
                backupdir.mkdirs();
            }

            for (file file : touploadfiles) {
                // 传输文件到hdfs并改名access_log_
                fs.copyfromlocalfile(new path(file.getabsolutepath()), new path("/logs"+day+"/access_log_"+uuid.randomuuid()+".log"));

                // 将传输完成的文件移动到备份目录
                //注意这里依然不要用renameto
                fileutils.movefiletodirectory(file, backupdir, true);
            }

        } catch (exception e) {
            e.printstacktrace();
        }

    }
/**
     * 读取hdfs中的文件的内容
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */
    @test
    public void testreaddata() throws illegalargumentexception, ioexception {

        fsdatainputstream in = fs.open(new path("/test.txt"));

        bufferedreader br = new bufferedreader(new inputstreamreader(in, "utf-8"));

        string line = null;
        while ((line = br.readline()) != null) {
            system.out.println(line);
        }

        br.close();
        in.close();
        fs.close();

    }

    /**
     * 读取hdfs中文件的指定偏移量范围的内容
     * 
     * 
     * 作业题:用本例中的知识,实现读取一个文本文件中的指定block块中的所有数据
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */
    @test
    public void testrandomreaddata() throws illegalargumentexception, ioexception {

        fsdatainputstream in = fs.open(new path("/xx.dat"));

        // 将读取的起始位置进行指定
        in.seek(12);

        // 读16个字节
        byte[] buf = new byte[16];
        in.read(buf);

        system.out.println(new string(buf));

        in.close();
        fs.close();

    }

    /**
     * 往hdfs中的文件写内容
     * 
     * @throws ioexception
     * @throws illegalargumentexception
     */

    @test
    public void testwritedata() throws illegalargumentexception, ioexception {

        fsdataoutputstream out = fs.create(new path("/zz.jpg"), false);

        // d:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

        fileinputstream in = new fileinputstream("d:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

        byte[] buf = new byte[1024];
        int read = 0;
        while ((read = in.read(buf)) != -1) {
            out.write(buf,0,read);
        }
        
        in.close();
        out.close();
        fs.close();

    }

 

}

backupcleantask类

package cn.edu360.hdfs.datacollect;

import java.io.file;
import java.text.simpledateformat;
import java.util.date;
import java.util.timertask;

import org.apache.commons.io.fileutils;

public class backupcleantask extends timertask {

    @override
    public void run() {

        simpledateformat sdf = new simpledateformat("yyyy-mm-dd-hh");
        long now = new date().gettime();
        try {
            // 探测本地备份目录
            file backupbasedir = new file("d:/logs/backup/");
            file[] daybackdir = backupbasedir.listfiles();

            // 判断备份日期子目录是否已超24小时
            for (file dir : daybackdir) {
                long time = sdf.parse(dir.getname()).gettime();
                if(now-time>24*60*60*1000l){
                    fileutils.deletedirectory(dir);
                }
            }
        } catch (exception e) {
            e.printstacktrace();
        }

    }

}

 

hdfs中namenode中储存元数据(对数据的描述信息)是在内存中以树的形式储存的,并且每隔一段时间都会把这些元数据序列化到磁盘中。序列化的东西在磁盘中叫 fsimage文件。

元数据可能会很大很大,所以只能是定期的序列化

问题1:序列化的时候,发生了元数据的修改怎么办

答:namenode会把每次用户的操作都记录下来,记录成日志文件,存在edits日志文件中

其中edits日志文件也会像log4j滚动日志文件一样,当文件太大的时候会另起一个文件并改名字

问题2:当edits文件太多的时候,一次宕机也会花大量的时间从edits里恢复,怎么办

答:会定期吧edits文件重放fsimage文件,并记录edits的编号,把那些重放过的日志文件给删除。这样也相当于重新序列化了,

所以namenode并不会做这样的事情,是由secondary node做的,他会定期吧namenode的fsimage文件和edits文件下载下来

并把fsimage文件反序列化,并且读日志文件更新元数据,然后序列化到磁盘,然后把他上传给namenode。

这个机制叫做checkpoint机制

这里secondarynode 相当一一个小秘书

 

 

 

 

 

 额外知识点

注意,在windows里面不要写有些路径不要写绝对路径,因为程序放到linux下面可能会找不到,因此报错

 一般使用class加载器,这样当这个class加载的时候就会知道这个class在哪

类加载器的一些使用例子

比如我加载一个配置文件,为了避免出现绝对路径,我们可以是用类加载器

     properties props = new properties();
        //加载配置文件,这样写的目的是为了避免在windows里出现绝对路径,用类加载器,再把文件传化成流
        props.load(hdfswordcount.class.getclassloader().getresourceasstream("job.properties"));

 

而对于一些功能性的类,我们最好在写逻辑的时候也不要直接去导入这个包,而是使用class.forname

//这样不直接导入这个包,直接用类加载器,是面向接口编程的一种思想。这里我并不是在开始import xxxx.mapper,这里mapper是一个接口,这里我用了多态
        class<?> mapper_class = class.forname(props.getproperty("mapper_class"));
        mapper mapper = (mapper) mapper_class.newinstance();

 

单例模式

https://www.cnblogs.com/crazy-wang-android/p/9054771.html

只有个一实例,必须自己创建自己这个实例,必须为别人提供这个实例

 

 

 饿汉式单例:就算没有人调用这个class,他也会加载进去;

如对于一个配置文件的加载

import java.util.properties;

/**
 * 单例设计模式,方式一: 饿汉式单例
 *
 */
public class propertyholderhungery {

    private static properties prop = new properties();

    static {
        try {
            //将一个文件prop.load(stram)  
            //这里面如果传一个io流不好,因为要用到绝对路径,使用了类加载器  这种不管有没有使用这个类都会加载
            prop.load(propertyholderhungery.class.getclassloader().getresourceasstream("collect.properties"));
        } catch (exception e) {

        }
    }
    public static properties getprops() throws exception {
        return prop;
    }

}

 懒汉式:只有调用的时候才会有,但会有线程安全问题

/**
 * 单例模式:懒汉式——考虑了线程安全
 * */

public class propertyholderlazy {

    private static properties prop = null;

    public static properties getprops() throws exception {
        if (prop == null) {
            synchronized (propertyholderlazy.class) {
                if (prop == null) {
                    prop = new properties();
                    prop.load(propertyholderlazy.class.getclassloader().getresourceasstream("collect.properties"));
                }
            }
        }
        return prop;
    }

}