关于Java程序向Hive导入数据的问题解决方法
程序员文章站
2022-04-06 14:31:37
...
解决方法:
1、使用Linux命令,如下:
String tmpFile = "/tmp" + File.separator + TableUtil.TABLE_PREFIX_MAILING_MEMBER + "@" + mailListId + ".log"; String scp_command = String.format("scp %s x.x.x.x:%s", fileKey, tmpFile); logger.info("Current TaskId#" + taskId + " : " + scp_command); Process process = Runtime.getRuntime().exec(scp_command); logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : <OUTPUT>"); InputStreamReader inputStream = new InputStreamReader(process.getInputStream()); BufferedReader inputReader = new BufferedReader(inputStream); String line = null; while((line = inputReader.readLine()) != null) { logger.info(line); } logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : </OUTPUT>"); if (inputReader != null) { inputReader.close(); } if (inputStream != null) { inputStream.close(); } logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : <ERROR>"); InputStreamReader errorStream = new InputStreamReader(process.getErrorStream()); BufferedReader errorReader = new BufferedReader(errorStream); line = null; while((line = errorReader.readLine()) != null) { logger.info(line); } logger.info("Current TaskId#" + taskId + ", SCP_COMMAND : </ERROR>"); if (errorReader != null) { errorReader.close(); } if (errorStream != null) { errorStream.close(); } process.waitFor(); logger.info("Current TaskId#" + taskId + " : " + scp_command); Thread.sleep(5000); // 赋权限 String chown_command = String.format("%s '%s %s'", "ssh x.x.x.x", "chown hadoop:hadoop -R", tmpFile); logger.info("Current TaskId#" + taskId + " :" + chown_command); process = Runtime.getRuntime().exec(chown_command); logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : <OUTPUT>"); inputStream = new InputStreamReader(process.getInputStream()); inputReader = new BufferedReader(inputStream); line = null; while((line = inputReader.readLine()) != null) { logger.info(line); } logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : </OUTPUT>"); if (inputReader != null) { inputReader.close(); } if (inputStream != null) { inputStream.close(); } logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : <ERROR>"); errorStream = new InputStreamReader(process.getErrorStream()); errorReader = new BufferedReader(errorStream); line = null; while((line = errorReader.readLine()) != null) { logger.info(line); } logger.info("Current TaskId#" + taskId + ", CHOWN_COMMAND : </ERROR>"); if (errorReader != null) { errorReader.close(); } if (errorStream != null) { errorStream.close(); } process.waitFor(); logger.info("Current TaskId#" + taskId + " : " + chown_command);
2、使用如下方法:
public static void copyLocalFileHdfs(String localPath, String hdfsPath) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://Ucluster"); conf.set("dfs.nameservices", "Ucluster"); conf.set("dfs.ha.namenodes.Ucluster", "xx,yy"); conf.set("dfs.namenode.rpc-address.Ucluster.xx","hostname:port"); conf.set("dfs.namenode.rpc-address.Ucluster.yy","hostname:port"); conf.set("dfs.client.failover.proxy.provider.Ucluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); try { FileSystem fs = FileSystem.get(conf); Path src = new Path(localPath); Path dst = new Path(hdfsPath); fs.copyFromLocalFile(src, dst); } catch (Exception e) { e.printStackTrace(); } }
上一篇: cdr怎么给图片添加朦胧效果?