欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

关于通过java调用datax,返回任务执行的方法

程序员文章站 2022-06-29 13:22:41
datax datax 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、oracle、sqlserver、postgre、hdfs、hive、a...

datax

datax 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、oracle、sqlserver、postgre、hdfs、hive、ads、hbase、tablestore(ots)、maxcompute(odps)、drds 等各种异构数据源之间高效的数据同步功能。

datax的详细介绍

请参考 datax-introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下engine类的entry方法,该方法是一个静态方法。

public static void entry(final string[] args) throws throwable {
 options options = new options();
 options.addoption("job", true, "job config.");
 options.addoption("jobid", true, "job unique id.");
 options.addoption("mode", true, "job runtime mode.");

 basicparser parser = new basicparser();
 commandline cl = parser.parse(options, args);

 string jobpath = cl.getoptionvalue("job");

 // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
 string jobidstring = cl.getoptionvalue("jobid");
 runtime_mode = cl.getoptionvalue("mode");

 configuration configuration = configparser.parse(jobpath);

 long jobid;
 if (!"-1".equalsignorecase(jobidstring)) {
  jobid = long.parselong(jobidstring);
 } else {
  // only for dsc & ds & datax 3 update
  string dscjoburlpatternstring = "/instance/(\\d{1,})/config.xml";
  string dsjoburlpatternstring = "/inner/job/(\\d{1,})/config";
  string dstaskgroupurlpatternstring = "/inner/job/(\\d{1,})/taskgroup/";
  list<string> patternstringlist = arrays.aslist(dscjoburlpatternstring,
   dsjoburlpatternstring, dstaskgroupurlpatternstring);
  jobid = parsejobidfromurl(patternstringlist, jobpath);
 }

 boolean isstandalonemode = "standalone".equalsignorecase(runtime_mode);
 if (!isstandalonemode && jobid == -1) {
  // 如果不是 standalone 模式,那么 jobid 一定不能为-1
  throw dataxexception.asdataxexception(frameworkerrorcode.config_error, "非 standalone 模式必须在 url 中提供有效的 jobid.");
 }
 configuration.set(coreconstant.datax_core_container_job_id, jobid);

 //打印vminfo
 vminfo vminfo = vminfo.getvminfo();
 if (vminfo != null) {
  log.info(vminfo.tostring());
 }

 log.info("\n" + engine.filterjobconfiguration(configuration) + "\n");

 log.debug(configuration.tojson());

 configurationvalidate.dovalidate(configuration);
 engine engine = new engine();
 engine.start(configuration);
 }

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用jobcontainer 的start() 方法。

@override
 public void start() {
 log.info("datax jobcontainer starts job.");

 boolean hasexception = false;
 boolean isdryrun = false;
 try {
  this.starttimestamp = system.currenttimemillis();
  isdryrun = configuration.getbool(coreconstant.datax_job_setting_dryrun, false);
  if (isdryrun) {
  log.info("jobcontainer starts to do precheck ...");
  this.precheck();
  } else {
  userconf = configuration.clone();
  log.debug("jobcontainer starts to do prehandle ...");
  this.prehandle();

  log.debug("jobcontainer starts to do init ...");
  this.init();
  log.info("jobcontainer starts to do prepare ...");
  this.prepare();
  log.info("jobcontainer starts to do split ...");
  this.totalstage = this.split();
  log.info("jobcontainer starts to do schedule ...");
  this.schedule();
  log.debug("jobcontainer starts to do post ...");
  this.post();

  log.debug("jobcontainer starts to do posthandle ...");
  this.posthandle();
  log.info("datax jobid [{}] completed successfully.", this.jobid);

  this.invokehooks();
  }
 } catch (throwable e) {
  log.error("exception when job run", e);

  hasexception = true;

  if (e instanceof outofmemoryerror) {
  this.destroy();
  system.gc();
  }


  if (super.getcontainercommunicator() == null) {
  // 由于 containercollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containercollector 进行初始化

  abstractcontainercommunicator tempcontainercollector;
  // standalone
  tempcontainercollector = new standalonejobcontainercommunicator(configuration);

  super.setcontainercommunicator(tempcontainercollector);
  }

  communication communication = super.getcontainercommunicator().collect();
  // 汇报前的状态,不需要手动进行设置
  // communication.setstate(state.failed);
  communication.setthrowable(e);
  communication.settimestamp(this.endtimestamp);

  communication tempcomm = new communication();
  tempcomm.settimestamp(this.starttransfertimestamp);

  communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage);
  super.getcontainercommunicator().report(reportcommunication);

  throw dataxexception.asdataxexception(
   frameworkerrorcode.runtime_error, e);
 } finally {
  if (!isdryrun) {

  this.destroy();
  this.endtimestamp = system.currenttimemillis();
  if (!hasexception) {
   //最后打印cpu的平均消耗,gc的统计
   vminfo vminfo = vminfo.getvminfo();
   if (vminfo != null) {
   vminfo.getdelta(false);
   log.info(vminfo.totalstring());
   }

   log.info(perftrace.getinstance().summarizenoexception());
   this.logstatistics();
  }
  }
 }
 }

而我们需要的任务信息就在this.logstatistics() 中

private void logstatistics() {
 long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000;
 long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000;
 if (0l == transfercosts) {
  transfercosts = 1l;
 }

 if (super.getcontainercommunicator() == null) {
  return;
 }

 communication communication = super.getcontainercommunicator().collect();
 communication.settimestamp(this.endtimestamp);

 communication tempcomm = new communication();
 tempcomm.settimestamp(this.starttransfertimestamp);

 communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage);

 // 字节速率
 long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes)
  / transfercosts;

 long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records)
  / transfercosts;

 reportcommunication.setlongcounter(communicationtool.byte_speed, bytespeedpersecond);
 reportcommunication.setlongcounter(communicationtool.record_speed, recordspeedpersecond);

 super.getcontainercommunicator().report(reportcommunication);


 log.info(string.format(
  "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n",
  "任务启动时刻",
  dateformat.format(starttimestamp),

  "任务结束时刻",
  dateformat.format(endtimestamp),

  "任务总计耗时",
  string.valueof(totalcosts) + "s",
  "任务平均流量",
  strutil.stringify(bytespeedpersecond)
   + "/s",
  "记录写入速度",
  string.valueof(recordspeedpersecond)
   + "rec/s", "读出记录总数",
  string.valueof(communicationtool.gettotalreadrecords(communication)),
  "读写失败总数",
  string.valueof(communicationtool.gettotalerrorrecords(communication))
 ));

 log.info("task-total-info:" + dateformat.format(starttimestamp) + "|" +
  dateformat.format(endtimestamp) + "|" +
  string.valueof(totalcosts) + "|" +
  strutil.stringify(bytespeedpersecond) + "|" +
  string.valueof(recordspeedpersecond) + "|" +
  string.valueof(communicationtool.gettotalreadrecords(communication)) + "|" +
  string.valueof(communicationtool.gettotalerrorrecords(communication))
 );

 if (communication.getlongcounter(communicationtool.transformer_succeed_records) > 0
  || communication.getlongcounter(communicationtool.transformer_failed_records) > 0
  || communication.getlongcounter(communicationtool.transformer_filter_records) > 0) {
  log.info(string.format(
   "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
   "transformer成功记录总数",
   communication.getlongcounter(communicationtool.transformer_succeed_records),

   "transformer失败记录总数",
   communication.getlongcounter(communicationtool.transformer_failed_records),

   "transformer过滤记录总数",
   communication.getlongcounter(communicationtool.transformer_filter_records)
  ));
 }
 }

改造开始

新增返回实体dataxresult (get、set省略)

public class dataxresult {
 //任务启动时刻
 private long starttimestamp;
 //任务结束时刻
 private long endtimestamp;
 //任务总时耗
 private long totalcosts;
 //任务平均流量
 private long bytespeedpersecond;
 //记录写入速度
 private long recordspeedpersecond;
 //读出记录总数
 private long totalreadrecords;
 //读写失败总数
 private long totalerrorrecords;
 //成功记录总数
 private long transformersucceedrecords;
 // 失败记录总数
 private long transformerfailedrecords;
 // 过滤记录总数
 private long transformerfilterrecords;
 //字节数
 private long readsucceedbytes;
 //转换开始时间
 private long endtransfertimestamp;
 //转换结束时间
 private long starttransfertimestamp;
 //转换总耗时
 private long transfercosts;

重写logstatistics方法,返回该实体。

private dataxresult logstatistics(dataxresult resultmsg) {
 long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000;
 long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000;
 if (0l == transfercosts) {
  transfercosts = 1l;
 }
 if (super.getcontainercommunicator() == null) {
  return resultmsg;
 }
 communication communication = super.getcontainercommunicator().collect();
 long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes)
  / transfercosts;
 long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records)
  / transfercosts;
  
 return resultmsg.getresultmsg(starttimestamp,
  endtimestamp,
  totalcosts,
  bytespeedpersecond,
  recordspeedpersecond,
  communication.getlongcounter(communicationtool.transformer_succeed_records),
  communication.getlongcounter(communicationtool.transformer_failed_records),
  communication.getlongcounter(communicationtool.transformer_filter_records),
  communication.getlongcounter(communicationtool.transformer_failed_records),
  communication.getlongcounter(communicationtool.transformer_filter_records),
  communication.getlongcounter(communicationtool.read_succeed_bytes),
  this.endtransfertimestamp,
  this.starttransfertimestamp,
  transfercosts
 );


 }

还需要重写jobcontainer的**start()**方法。

@override
 public dataxresult start(dataxresult dataxresult) {
 ...
 dataxresult result = new dataxresult();
 result = logstatistics(dataxresult);
 ...
 return result;
 }

然后在engine 类中添加模拟测试方法mockentry

 public dataxresult mockstart(configuration allconf) {

 ...
 dataxresult dataxresult = new dataxresult();
 return container.start(dataxresult);
 }

开始测试

在com.alibaba.datax.core.util.container.coreconstant里修改datax_home 为本地路径

关于通过java调用datax,返回任务执行的方法

该datax_home路径下有以下几个目录

关于通过java调用datax,返回任务执行的方法

public class test {

 public static void main(string[] args) {
 string[] datxargs = {"-job", coreconstant.datax_home + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
 try {
  dataxresult dataxresult= engine.mockentry(datxargs);
 } catch (throwable e) {
  e.printstacktrace();
 }

 }
}

执行结果为

3

大功告成!

以上这篇关于通过java调用datax,返回任务执行的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。