欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Sqoopjava接口将MySQL数据导入导出HDFS及BUG

程序员文章站 2022-07-03 20:20:24
先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。 Sqoop 2...

先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。

Sqoop 2 Demo: HDFS 是远程集群上,MySQL 是本地,没有成功,可能是环境问题

package com.kay.transfer;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.*;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

import java.util.Collection;
import java.util.UUID;

/**
 * Created by kay on 2017/12/12.
 */

public class SqoopTest {

    public static void sqoopTransfer() {
        //初始化
        String url = "https://192.168.1.200:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);

        Collection arr= client.getConnectors();
        for (MConnector m:arr) {
            System.out.println(m.getLinkConfig());
        }

        //创建一个源链接 JDBC   为链接创建一个占位符
        long fromConnectorId = 2;
        MLink fromLink = client.createLink("generic-jdbc-connector");
        fromLink.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10));
        fromLink.setCreationUser("arcgis1009");

        //填入连接配置的值
        MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.1.28:3306/mydb");
        fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
        fromLinkConfig.getStringInput("linkConfig.password").setValue("lk123456");
       // fromLinkConfig.getStringInput("dialect.identifierEnclose").setValue("`");

        //保存填充过的连接对象
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
            System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
            System.out.println("创建JDBC Link失败");
        }

        //创建一个目的地链接HDFS
        long toConnectorId = 3;
        MLink toLink = client.createLink("hdfs-connector");
        toLink.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10));
        toLink.setCreationUser("arcgis1009");
        MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://192.168.1.200:9000/");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
            System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
        } else {
            System.out.println("创建HDFS Link失败");
        }

        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        System.out.println("fromLinkId: "+fromLinkId);
        long toLinkId = toLink.getPersistenceId();
        System.out.println("toLinkId: "+toLinkId);

        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("kay-job" + UUID.randomUUID());
        job.setCreationUser("arcgis1009");

        //设置源链接任务配置信息 from
        MFromConfig fromJobConfig = job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("mydb");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("user");
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");


        //to
        MToConfig toJobConfig = job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/tmp"+ UUID.randomUUID());
        toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
        toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
       // toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);

        //设置驱动配置-------如果是mapreduce,就是mapper的数量
        MDriverConfig driverConfig = job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        // 保存填充过的连接对象
        Status status = client.saveJob(job);
        if(status.canProceed()) {
            System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
            System.out.println("JOB创建失败。");
        }


        //启动任务
        long jobId = job.getPersistenceId();
        System.out.println(jobId);


        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
            System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
            //三秒报告一次进度
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
            System.out.println("计数器:");
            for(CounterGroup group : counters) {
                System.out.print("\t");
                System.out.println(group.getName());
                for(Counter counter : group) {
                    System.out.print("\t\t");
                    System.out.print(counter.getName());
                    System.out.print(": ");
                    System.out.println(counter.getValue());
                }
            }
        }
        if(submission.getExceptionInfo() != null) {
            System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    }

    public static void main(String[] args) throws Exception {
        sqoopTransfer();
    }
}

Sqoop 1.4.6 Demo: HDFS 远程,MySQL本地,测试成功
注意"--bindir","./src/main/resources", 是将数据库表生成的映射对象文件 放到的目录,之前没有这句命令,程序报找不到 user 类,查找之下被生成在了项目根路径下面,后来在*上看到要加上这句shell指令,但是还有一个小问题是,第一次执行还是会报 找不到 user class,原因是第一次还没生成,第二次运行目录下已存在user.class,user.java,user.jar 三个文件 ,则不会报错,也就是说要先生成这些映射文件。 不知道别人是怎么做,如果要动态生成加导入怎么解决的?

package com.kay.transfer;

import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
/**
 * Created by kay on 2017/12/12.
 */
public class Test {
    private static int importDataFromMysql() throws Exception {
        String[] args = new String[] {
                "--bindir","./src/main/resources",
                "--connect","jdbc:mysql://localhost:3306/mydb",
                "--driver","com.mysql.jdbc.Driver",
                "-username","root",
                "-password","root",
                "--table","user",
                "-m","1",
                "--target-dir","java_import_user9"
        };

        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");

        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.200:9000");//设置HDFS服务地址
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);

        @SuppressWarnings("deprecation")
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        return Sqoop.runSqoop(sqoop, expandArguments);
    }

    public static void main(String[] args) throws Exception {
        importDataFromMysql();
    }
}