欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java结合HADOOP集群文件上传下载

程序员文章站 2024-03-02 13:05:22
对hdfs上的文件进行上传和下载是对集群的基本操作,在《hadoop权威指南》一书中,对文件的上传和下载都有代码的实例,但是对如何配置hadoop客户端却是没有讲得很清楚,...

对hdfs上的文件进行上传和下载是对集群的基本操作,在《hadoop权威指南》一书中,对文件的上传和下载都有代码的实例,但是对如何配置hadoop客户端却是没有讲得很清楚,经过长时间的搜索和调试,总结了一下,如何配置使用集群的方法,以及自己测试可用的对集群上的文件进行操作的程序。首先,需要配置对应的环境变量:

复制代码 代码如下:

hadoop_home="/home/work/tools/java/hadoop-client/hadoop"
for f in $hadoop_home/hadoop-*.jar; do
        hadoop_classpath=${hadoop_classpath}:$f
done
for f in $hadoop_home/lib/*.jar; do
        hadoop_classpath=${hadoop_classpath}:$f
done
hadoopvfs_home="/home/work/tools/java/hadoop-client/hadoop-vfs"
for f in $hadoopvfs_home/lib/*.jar; do
        hadoop_classpath=${hadoop_classpath}:$f
done
export ld_library_path=$ld_library_path:/home/work/tools/java/hadoop-client/hadoop/lib/native/linux-amd64-64/

其中ld_library_path是在调用时需要用到的库的路径,hadoop_classpath则是我们hadoop客户端里各种jar包
有一点需要注意的是最好不要使用hadoop_home这个变量,这个是一个系统使用的环境变量,最好不要和它冲突
编译类的方法:

复制代码 代码如下:

javac -classpath $classpath:$hadoop_classpath hdfsutil.java

运行的方法:

复制代码 代码如下:

java -classpath $classpath:$hadoop_classpath hdfsutil

但是在实际的使用过程中,会报no permission之类的错误,或者你能保证代码没有问题的情况下,在运行的时候也会报一些奇奇怪怪的错误
那么问题来了,这是什么鬼?
答案:这是因为没有配置对应集群的配置文件
因为在《hadoop权威指南》一书中,弱化了配置的东西,所以在具体使用集群的时候就会出现问题,如何解决呢,这样子:

复制代码 代码如下:

this.conf = new configuration(false);
conf.addresource("./hadoop-site.xml");
conf.addresource("./hadoop-default.xml");
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.distributedfilesystem.class.getname());conf.set("fs.file.impl", org.apache.hadoop.fs.localfilesystem.class.getname());

为什么会这样,书上只是很简单的:

this.conf = new configuration();
那是因为默认你的集群在本地,所以不需要做配置,但是在实际使用的过程中,各个集群的配置是不同的,所以我们要引入集群的配置
这是非常重要的一点,因为实际使用的过程中我们都是使用的hadoop的客户端,而且是已经搭好环境的集群,所以我们需要做好本地的配置
hadoop-site.xml和hadoop-default.xml这两个文件在所使用的客户端的conf目录下,在addresource的时候指定好目录就行了

将以上所提到的配置,全部配完之后,这个程序才能真正运行起来,所以配置是非常重要的一环。

以下是对应的工具的代码,有兴趣的看一下吧,使用的是文件流的方式来搞的,这样子也可以打通ftp和hdfs之间文件的互传:

import java.io.bufferedinputstream;
import java.io.fileinputstream;
import java.io.filenotfoundexception;
import java.io.fileoutputstream;
import java.io.ioexception;
import java.io.inputstream;
import java.io.outputstream;
import java.net.uri;
import java.net.url;
import java.io.*;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.ioutils;
import org.apache.hadoop.util.progressable;

public class hdfsutil {
  private string hdfs_node = "";
  private string hdfs_path = "";
  private string file_path = "";
  private string hadoop_site = "";
  private string hadoop_default = "";
  private configuration conf = null;

  public hdfsutil(string hdfs_node) {
    this.hdfs_node = hdfs_node;
  }

  public string gethdfsnode() {
    return this.hdfs_node;
  }

  public void sethdfspath(string hdfs_path){
    this.hdfs_path = hdfs_path;
  }

  public string gethdfspath(){
    return this.hdfs_path;
  }

  public void setfilepath(string file_path){
    this.file_path = file_path;
  }

  public string getfilepath(){
    return this.file_path;
  }

  public void sethadoopsite(string hadoop_site){
    this.hadoop_site = hadoop_site;
  }

  public string gethadoopsite(){
    return this.hadoop_site;
  }

  public void sethadoopdefault(string hadoop_default){
    this.hadoop_default = hadoop_default;
  }

  public string gethadoopdefault(){
    return this.hadoop_default;
  }

  public int setconfigure(boolean flag) {
    if (flag == false){
      if (this.gethadoopsite() == "" || this.gethadoopdefault() == ""){
        return -1;
      }
      else {
        this.conf = new configuration(false);
        conf.addresource(this.gethadoopdefault());
        conf.addresource(this.gethadoopsite());
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.distributedfilesystem.class.getname());
        conf.set("fs.file.impl", org.apache.hadoop.fs.localfilesystem.class.getname());
        return 0;
      }
    }
    this.conf = new configuration();
    return 0;
  }

  public configuration getconfigure() {
    return this.conf;
  }

  public int upload(string localname, string remotename) throws filenotfoundexception, ioexception {
    inputstream instream = null;
    filesystem fs = null;
    try{
      instream = new bufferedinputstream(new fileinputstream(localname));
      fs = filesystem.get(uri.create(this.hdfs_node), this.conf);
      outputstream outstream = fs.create(new path(remotename) ,new progressable() {
        public void progress(){
          system.out.print('.');
        }
      });

      ioutils.copybytes(instream, outstream, 4096, true);
      instream.close();
      return 0;
    } catch (ioexception e){
      instream.close();
      e.printstacktrace();
      return -1;
    }
  }

  public int upload(inputstream instream, string remotename) throws filenotfoundexception, ioexception {
    filesystem fs = null;
    try{
      fs = filesystem.get(uri.create(this.hdfs_node), this.conf);
      outputstream outstream = fs.create(new path(remotename) ,new progressable() {
        public void progress(){
          system.out.print('.');
        }
      });

      ioutils.copybytes(instream, outstream, 4096, true);
      instream.close();
      return 0;
    } catch (ioexception e){
      instream.close();
      e.printstacktrace();
      return -1;
    }
  }

  public int donwload(string remotename, string localname, int lines) throws filenotfoundexception, ioexception {
    fileoutputstream fos = null;
    inputstreamreader isr = null;
    bufferedreader br = null;
    string str = null;
    outputstreamwriter osw = null;
    bufferedwriter buffw = null;
    printwriter pw = null;
    filesystem fs = null;
    inputstream instream = null;
    try {
      fs = filesystem.get(uri.create(this.hdfs_node + remotename), this.conf);
      instream = fs.open(new path(this.hdfs_node + remotename));
      fos = new fileoutputstream(localname);
      osw = new outputstreamwriter(fos, "utf-8");
      buffw = new bufferedwriter(osw);
      pw = new printwriter(buffw);
      isr = new inputstreamreader(instream, "utf-8");
      br = new bufferedreader(isr);
      while((str = br.readline()) != null && lines > 0){
        lines--;
        pw.println(str);
      }
    } catch (ioexception e){
      throw new ioexception("couldn't write.", e);
    } finally {
      pw.close();
      buffw.close();
      osw.close();
      fos.close();
      instream.close()
    }
    return 0;
  }

  //main to test
  public static void main(string[] args){
    string hdfspath = null;
    string localname = null;
    string hdfsnode = null;
    int lines = 0;

    if (args.length == 4){
      hdfsnode = args[0];
      hdfspath = args[1];
      localname = args[2];
      lines = integer.parseint(args[3]);
    }
    else{
      hdfsnode = "hdfs://nj01-nanling-hdfs.dmop.baidu.com:54310";
      hdfspath = "/app/ps/spider/wdmqa/wangweilong/test/hdfsutil.java";
      localname = "/home/work/workspace/project/dhc2-0/dhc/base/ftp/papapa";
      lines = 5;
    }
    hdfsutil hdfsutil = new hdfsutil(hdfsnode);
    hdfsutil.setfilepath(hdfsutil.gethdfsnode()+hdfspath);
    hdfsutil.sethadoopsite("./hadoop-site.xml");
    hdfsutil.sethadoopdefault("./hadoop-default.xml");
    hdfsutil.setconfigure(false);
    try {
      hdfsutil.donwload(hdfspath, localname, lines);
    } catch (ioexception e){
      e.printstacktrace();
    }
  }

如果想要了解ftp上文件的下载,请参考这篇文章:

如果想要打通ftp和hdfs文件互传,只要创建一个类,调用这两篇文章中的工具的接口就可以搞定,自己写的代码,实测有效。

以上就是本文的全部内容了,希望能够对大家熟练掌握java有所帮助。

请您花一点时间将文章分享给您的朋友或者留下评论。我们将会由衷感谢您的支持!