关于通过java调用datax,返回任务执行的方法
datax
datax 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、oracle、sqlserver、postgre、hdfs、hive、ads、hbase、tablestore(ots)、maxcompute(odps)、drds 等各种异构数据源之间高效的数据同步功能。
datax的详细介绍
引言
因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。
datax源码跟踪
从github下完源码开始改造,datax的启动类在datax-core包下engine类的entry方法,该方法是一个静态方法。
public static void entry(final string[] args) throws throwable { options options = new options(); options.addoption("job", true, "job config."); options.addoption("jobid", true, "job unique id."); options.addoption("mode", true, "job runtime mode."); basicparser parser = new basicparser(); commandline cl = parser.parse(options, args); string jobpath = cl.getoptionvalue("job"); // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1 string jobidstring = cl.getoptionvalue("jobid"); runtime_mode = cl.getoptionvalue("mode"); configuration configuration = configparser.parse(jobpath); long jobid; if (!"-1".equalsignorecase(jobidstring)) { jobid = long.parselong(jobidstring); } else { // only for dsc & ds & datax 3 update string dscjoburlpatternstring = "/instance/(\\d{1,})/config.xml"; string dsjoburlpatternstring = "/inner/job/(\\d{1,})/config"; string dstaskgroupurlpatternstring = "/inner/job/(\\d{1,})/taskgroup/"; list<string> patternstringlist = arrays.aslist(dscjoburlpatternstring, dsjoburlpatternstring, dstaskgroupurlpatternstring); jobid = parsejobidfromurl(patternstringlist, jobpath); } boolean isstandalonemode = "standalone".equalsignorecase(runtime_mode); if (!isstandalonemode && jobid == -1) { // 如果不是 standalone 模式,那么 jobid 一定不能为-1 throw dataxexception.asdataxexception(frameworkerrorcode.config_error, "非 standalone 模式必须在 url 中提供有效的 jobid."); } configuration.set(coreconstant.datax_core_container_job_id, jobid); //打印vminfo vminfo vminfo = vminfo.getvminfo(); if (vminfo != null) { log.info(vminfo.tostring()); } log.info("\n" + engine.filterjobconfiguration(configuration) + "\n"); log.debug(configuration.tojson()); configurationvalidate.dovalidate(configuration); engine engine = new engine(); engine.start(configuration); }
里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用jobcontainer 的start() 方法。
@override public void start() { log.info("datax jobcontainer starts job."); boolean hasexception = false; boolean isdryrun = false; try { this.starttimestamp = system.currenttimemillis(); isdryrun = configuration.getbool(coreconstant.datax_job_setting_dryrun, false); if (isdryrun) { log.info("jobcontainer starts to do precheck ..."); this.precheck(); } else { userconf = configuration.clone(); log.debug("jobcontainer starts to do prehandle ..."); this.prehandle(); log.debug("jobcontainer starts to do init ..."); this.init(); log.info("jobcontainer starts to do prepare ..."); this.prepare(); log.info("jobcontainer starts to do split ..."); this.totalstage = this.split(); log.info("jobcontainer starts to do schedule ..."); this.schedule(); log.debug("jobcontainer starts to do post ..."); this.post(); log.debug("jobcontainer starts to do posthandle ..."); this.posthandle(); log.info("datax jobid [{}] completed successfully.", this.jobid); this.invokehooks(); } } catch (throwable e) { log.error("exception when job run", e); hasexception = true; if (e instanceof outofmemoryerror) { this.destroy(); system.gc(); } if (super.getcontainercommunicator() == null) { // 由于 containercollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containercollector 进行初始化 abstractcontainercommunicator tempcontainercollector; // standalone tempcontainercollector = new standalonejobcontainercommunicator(configuration); super.setcontainercommunicator(tempcontainercollector); } communication communication = super.getcontainercommunicator().collect(); // 汇报前的状态,不需要手动进行设置 // communication.setstate(state.failed); communication.setthrowable(e); communication.settimestamp(this.endtimestamp); communication tempcomm = new communication(); tempcomm.settimestamp(this.starttransfertimestamp); communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage); super.getcontainercommunicator().report(reportcommunication); throw dataxexception.asdataxexception( frameworkerrorcode.runtime_error, e); } finally { if (!isdryrun) { this.destroy(); this.endtimestamp = system.currenttimemillis(); if (!hasexception) { //最后打印cpu的平均消耗,gc的统计 vminfo vminfo = vminfo.getvminfo(); if (vminfo != null) { vminfo.getdelta(false); log.info(vminfo.totalstring()); } log.info(perftrace.getinstance().summarizenoexception()); this.logstatistics(); } } } }
而我们需要的任务信息就在this.logstatistics() 中
private void logstatistics() { long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000; long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000; if (0l == transfercosts) { transfercosts = 1l; } if (super.getcontainercommunicator() == null) { return; } communication communication = super.getcontainercommunicator().collect(); communication.settimestamp(this.endtimestamp); communication tempcomm = new communication(); tempcomm.settimestamp(this.starttransfertimestamp); communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage); // 字节速率 long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes) / transfercosts; long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records) / transfercosts; reportcommunication.setlongcounter(communicationtool.byte_speed, bytespeedpersecond); reportcommunication.setlongcounter(communicationtool.record_speed, recordspeedpersecond); super.getcontainercommunicator().report(reportcommunication); log.info(string.format( "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", "任务启动时刻", dateformat.format(starttimestamp), "任务结束时刻", dateformat.format(endtimestamp), "任务总计耗时", string.valueof(totalcosts) + "s", "任务平均流量", strutil.stringify(bytespeedpersecond) + "/s", "记录写入速度", string.valueof(recordspeedpersecond) + "rec/s", "读出记录总数", string.valueof(communicationtool.gettotalreadrecords(communication)), "读写失败总数", string.valueof(communicationtool.gettotalerrorrecords(communication)) )); log.info("task-total-info:" + dateformat.format(starttimestamp) + "|" + dateformat.format(endtimestamp) + "|" + string.valueof(totalcosts) + "|" + strutil.stringify(bytespeedpersecond) + "|" + string.valueof(recordspeedpersecond) + "|" + string.valueof(communicationtool.gettotalreadrecords(communication)) + "|" + string.valueof(communicationtool.gettotalerrorrecords(communication)) ); if (communication.getlongcounter(communicationtool.transformer_succeed_records) > 0 || communication.getlongcounter(communicationtool.transformer_failed_records) > 0 || communication.getlongcounter(communicationtool.transformer_filter_records) > 0) { log.info(string.format( "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", "transformer成功记录总数", communication.getlongcounter(communicationtool.transformer_succeed_records), "transformer失败记录总数", communication.getlongcounter(communicationtool.transformer_failed_records), "transformer过滤记录总数", communication.getlongcounter(communicationtool.transformer_filter_records) )); } }
改造开始
新增返回实体dataxresult (get、set省略)
public class dataxresult { //任务启动时刻 private long starttimestamp; //任务结束时刻 private long endtimestamp; //任务总时耗 private long totalcosts; //任务平均流量 private long bytespeedpersecond; //记录写入速度 private long recordspeedpersecond; //读出记录总数 private long totalreadrecords; //读写失败总数 private long totalerrorrecords; //成功记录总数 private long transformersucceedrecords; // 失败记录总数 private long transformerfailedrecords; // 过滤记录总数 private long transformerfilterrecords; //字节数 private long readsucceedbytes; //转换开始时间 private long endtransfertimestamp; //转换结束时间 private long starttransfertimestamp; //转换总耗时 private long transfercosts;
重写logstatistics方法,返回该实体。
private dataxresult logstatistics(dataxresult resultmsg) { long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000; long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000; if (0l == transfercosts) { transfercosts = 1l; } if (super.getcontainercommunicator() == null) { return resultmsg; } communication communication = super.getcontainercommunicator().collect(); long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes) / transfercosts; long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records) / transfercosts; return resultmsg.getresultmsg(starttimestamp, endtimestamp, totalcosts, bytespeedpersecond, recordspeedpersecond, communication.getlongcounter(communicationtool.transformer_succeed_records), communication.getlongcounter(communicationtool.transformer_failed_records), communication.getlongcounter(communicationtool.transformer_filter_records), communication.getlongcounter(communicationtool.transformer_failed_records), communication.getlongcounter(communicationtool.transformer_filter_records), communication.getlongcounter(communicationtool.read_succeed_bytes), this.endtransfertimestamp, this.starttransfertimestamp, transfercosts ); }
还需要重写jobcontainer的**start()**方法。
@override public dataxresult start(dataxresult dataxresult) { ... dataxresult result = new dataxresult(); result = logstatistics(dataxresult); ... return result; }
然后在engine 类中添加模拟测试方法mockentry
public dataxresult mockstart(configuration allconf) { ... dataxresult dataxresult = new dataxresult(); return container.start(dataxresult); }
开始测试
在com.alibaba.datax.core.util.container.coreconstant里修改datax_home 为本地路径
该datax_home路径下有以下几个目录
public class test { public static void main(string[] args) { string[] datxargs = {"-job", coreconstant.datax_home + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"}; try { dataxresult dataxresult= engine.mockentry(datxargs); } catch (throwable e) { e.printstacktrace(); } } }
执行结果为
3
大功告成!
以上这篇关于通过java调用datax,返回任务执行的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
上一篇: jsp中调用java代码小结
下一篇: php5与php7的区别点总结