欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

关于Java程序向Hive导入数据的问题解决方法

程序员文章站 2022-04-06 14:31:37
...

解决方法:

1、使用Linux命令,如下:

String tmpFile = "/tmp" + File.separator + TableUtil.TABLE_PREFIX_MAILING_MEMBER + "@" + mailListId + ".log";  
String scp_command = String.format("scp %s x.x.x.x:%s", fileKey, tmpFile);  
logger.info("Current TaskId#" + taskId + " : " + scp_command);  
Process process = Runtime.getRuntime().exec(scp_command);  
logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : <OUTPUT>");  
InputStreamReader inputStream = new InputStreamReader(process.getInputStream());  
BufferedReader inputReader = new BufferedReader(inputStream);  
String line = null;  
while((line = inputReader.readLine()) != null) {  
    logger.info(line);  
}  
logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : </OUTPUT>");  
if (inputReader != null) {  
    inputReader.close();  
}  
if (inputStream != null) {  
    inputStream.close();  
}  
logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : <ERROR>");  
InputStreamReader errorStream = new InputStreamReader(process.getErrorStream());  
BufferedReader errorReader = new BufferedReader(errorStream);  
line = null;  
while((line = errorReader.readLine()) != null) {  
    logger.info(line);  
}  
logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : </ERROR>");  
if (errorReader != null) {  
    errorReader.close();  
}  
if (errorStream != null) {  
    errorStream.close();  
}  
process.waitFor();  
logger.info("Current TaskId#" + taskId + " : " + scp_command);  
  
Thread.sleep(5000);  
// 赋权限  
String chown_command = String.format("%s '%s %s'", "ssh x.x.x.x", "chown hadoop:hadoop -R", tmpFile);  
logger.info("Current TaskId#" + taskId + " :" + chown_command);  
process = Runtime.getRuntime().exec(chown_command);  
logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : <OUTPUT>");  
inputStream = new InputStreamReader(process.getInputStream());  
inputReader = new BufferedReader(inputStream);  
line = null;  
while((line = inputReader.readLine()) != null) {  
    logger.info(line);  
}  
logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : </OUTPUT>");  
if (inputReader != null) {  
    inputReader.close();  
}  
if (inputStream != null) {  
    inputStream.close();  
}  
logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : <ERROR>");  
errorStream = new InputStreamReader(process.getErrorStream());  
errorReader = new BufferedReader(errorStream);  
line = null;  
while((line = errorReader.readLine()) != null) {  
    logger.info(line);  
}  
logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : </ERROR>");  
if (errorReader != null) {  
    errorReader.close();  
}  
if (errorStream != null) {  
    errorStream.close();  
}  
process.waitFor();  
logger.info("Current TaskId#" + taskId + " : " + chown_command);   

 2、使用如下方法:

 

public static void copyLocalFileHdfs(String localPath, String hdfsPath) throws IOException {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://Ucluster");
    conf.set("dfs.nameservices", "Ucluster");
    conf.set("dfs.ha.namenodes.Ucluster", "xx,yy");
    conf.set("dfs.namenode.rpc-address.Ucluster.xx","hostname:port");
    conf.set("dfs.namenode.rpc-address.Ucluster.yy","hostname:port");
    conf.set("dfs.client.failover.proxy.provider.Ucluster",
            "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    try {
        FileSystem fs = FileSystem.get(conf);
        Path src = new Path(localPath);
        Path dst = new Path(hdfsPath);
        fs.copyFromLocalFile(src, dst);
    } catch (Exception e) {
        e.printStackTrace();
    }
}