欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

通过java api操作hdfs(kerberos认证)

程序员文章站 2022-03-24 11:36:19
...

参考代码如下

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.LineReader;

/***
 * 通过kerberos认证操作hdfs的demo
 */
public class HdfsDemo2 {

    public static void main(String[] args) throws Exception {

        String krb5File = "src/main/resources/krb5.conf";
//        String krb5File = "krb5.conf";
        String user = "[email protected]";
        String keyPath = "src/main/resources/asmp.keytab";
//        String keyPath = "asmp.keytab";

        System.setProperty("java.security.krb5.conf", krb5File);
        Configuration conf = new Configuration();
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(user, keyPath);

        String uri = "hdfs://nameservice1:8020";
        String remotePath = "/user/asmp/conf";
        System.out.println("test..");
        getHdfsFileList(conf,uri, remotePath);
        readFile(conf,uri,"/user/asmp/word.csv");
        downloadFile(conf,uri,"/user/asmp/word.csv","D:/output/20180426.csv");
    }

    /**
     * 获取Hdfs 指定目录下所有文件
     * 
     * @param uri hdfs远端连接url
     * @param remotePath hdfs远端目录路径
     * @param conf
     * @throws Exception
     */
    public static void getHdfsFileList( Configuration conf,String uri, String remotePath) throws Exception {

        FileSystem fs = FileSystem.get(new URI(uri), conf);
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path(remotePath), true);
        while (iter.hasNext()) {
            LocatedFileStatus status = iter.next();
            System.out.println(status.getPath().toUri().getPath());
        }
        fs.close();
    }

    /**
     * 读取文件的内容
     * @param filePath
     */
    public static void readFile(Configuration conf , String uri ,String filePath) throws Exception{

        Path path = new Path(filePath);
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        FSDataInputStream fdis= fs.open(path);
        Text line = new Text();
        LineReader reader = new LineReader(fdis); //一行一行的读
        while(reader.readLine(line) > 0) {
            System.out.println(line);//输出
        }
    }
    /**
     * 下载 hdfs上的文件
     * @param conf Configuration对象
     * @param uri HDFS地址
     * @param remote 需要下载的文件的HDFS地址
     * @param local 下载到本地目录
     * @throws Exception 异常
     */
    public static void downloadFile(Configuration conf , String uri ,String remote, String local) throws Exception {
        System.out.println("===downloadFile===");
        FileSystem fs = FileSystem.get(new URI(uri), conf);
//        fs.copyToLocalFile(new Path(remote), new Path(local));
        fs.copyToLocalFile(false,new Path(remote), new Path(local),true);
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

报错一:只能读取列表,不能读写文件

WARN BlockReaderFactory: I/O error constructing remote block reader.
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2884)
	at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:747)
	at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:662)
	at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:326)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:570)
	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:793)
	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
	at java.io.DataInputStream.read(DataInputStream.java:100)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:370)

原因:集群使用两个网卡,内网映射问题

解决:使用Cloudera Manager打开hdfs的如下两个设置

通过java api操作hdfs(kerberos认证)