Sqoopjava接口将MySQL数据导入导出HDFS及BUG
程序员文章站
2022-07-03 20:20:24
先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。
Sqoop 2...
先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。
Sqoop 2 Demo: HDFS 是远程集群上,MySQL 是本地,没有成功,可能是环境问题
package com.kay.transfer; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.model.*; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.validation.Status; import java.util.Collection; import java.util.UUID; /** * Created by kay on 2017/12/12. */ public class SqoopTest { public static void sqoopTransfer() { //初始化 String url = "https://192.168.1.200:12000/sqoop/"; SqoopClient client = new SqoopClient(url); Collection arr= client.getConnectors(); for (MConnector m:arr) { System.out.println(m.getLinkConfig()); } //创建一个源链接 JDBC 为链接创建一个占位符 long fromConnectorId = 2; MLink fromLink = client.createLink("generic-jdbc-connector"); fromLink.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10)); fromLink.setCreationUser("arcgis1009"); //填入连接配置的值 MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.1.28:3306/mydb"); fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); fromLinkConfig.getStringInput("linkConfig.username").setValue("root"); fromLinkConfig.getStringInput("linkConfig.password").setValue("lk123456"); // fromLinkConfig.getStringInput("dialect.identifierEnclose").setValue("`"); //保存填充过的连接对象 Status fromStatus = client.saveLink(fromLink); if(fromStatus.canProceed()) { System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId()); } else { System.out.println("创建JDBC Link失败"); } //创建一个目的地链接HDFS long toConnectorId = 3; MLink toLink = client.createLink("hdfs-connector"); toLink.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10)); toLink.setCreationUser("arcgis1009"); MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://192.168.1.200:9000/"); Status toStatus = client.saveLink(toLink); if(toStatus.canProceed()) { System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId()); } else { System.out.println("创建HDFS Link失败"); } //创建一个任务 long fromLinkId = fromLink.getPersistenceId(); System.out.println("fromLinkId: "+fromLinkId); long toLinkId = toLink.getPersistenceId(); System.out.println("toLinkId: "+toLinkId); MJob job = client.createJob(fromLinkId, toLinkId); job.setName("kay-job" + UUID.randomUUID()); job.setCreationUser("arcgis1009"); //设置源链接任务配置信息 from MFromConfig fromJobConfig = job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("mydb"); fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("user"); fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id"); //to MToConfig toJobConfig = job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/tmp"+ UUID.randomUUID()); toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE"); toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE"); // toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true); //设置驱动配置-------如果是mapreduce,就是mapper的数量 MDriverConfig driverConfig = job.getDriverConfig(); driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1); // 保存填充过的连接对象 Status status = client.saveJob(job); if(status.canProceed()) { System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); } else { System.out.println("JOB创建失败。"); } //启动任务 long jobId = job.getPersistenceId(); System.out.println(jobId); MSubmission submission = client.startJob(jobId); System.out.println("JOB提交状态为 : " + submission.getStatus()); while(submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); //三秒报告一次进度 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("JOB执行结束... ..."); System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); Counters counters = submission.getCounters(); if(counters != null) { System.out.println("计数器:"); for(CounterGroup group : counters) { System.out.print("\t"); System.out.println(group.getName()); for(Counter counter : group) { System.out.print("\t\t"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } if(submission.getExceptionInfo() != null) { System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo()); } System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕"); } public static void main(String[] args) throws Exception { sqoopTransfer(); } }
Sqoop 1.4.6 Demo: HDFS 远程,MySQL本地,测试成功
注意"--bindir","./src/main/resources", 是将数据库表生成的映射对象文件 放到的目录,之前没有这句命令,程序报找不到 user 类,查找之下被生成在了项目根路径下面,后来在*上看到要加上这句shell指令,但是还有一个小问题是,第一次执行还是会报 找不到 user class,原因是第一次还没生成,第二次运行目录下已存在user.class,user.java,user.jar 三个文件 ,则不会报错,也就是说要先生成这些映射文件。 不知道别人是怎么做,如果要动态生成加导入怎么解决的?
package com.kay.transfer; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.Sqoop; import org.apache.sqoop.tool.SqoopTool; import org.apache.sqoop.util.OptionsFileUtil; /** * Created by kay on 2017/12/12. */ public class Test { private static int importDataFromMysql() throws Exception { String[] args = new String[] { "--bindir","./src/main/resources", "--connect","jdbc:mysql://localhost:3306/mydb", "--driver","com.mysql.jdbc.Driver", "-username","root", "-password","root", "--table","user", "-m","1", "--target-dir","java_import_user9" }; String[] expandArguments = OptionsFileUtil.expandArguments(args); SqoopTool tool = SqoopTool.getTool("import"); Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.200:9000");//设置HDFS服务地址 Configuration loadPlugins = SqoopTool.loadPlugins(conf); @SuppressWarnings("deprecation") Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins); return Sqoop.runSqoop(sqoop, expandArguments); } public static void main(String[] args) throws Exception { importDataFromMysql(); } }
上一篇: 谷歌即将让用户通过网络浏览器访问Chrome远程桌面
下一篇: 开源引擎Docker单机安装教程