欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kettle插件二次开发

程序员文章站 2024-02-14 09:54:52
...

先把环境搭建好,这里可以参考我之前的博客,或者比较好的参考文献:

https://blog.csdn.net/tj_java/article/details/78765158

 

原理部分:

转换插件开发:

kettle转换步骤插件至少需要实现四个接口:

org.pentaho.di.trans.step.StepInterface 负责数据处理,转换和流转。这里面主要由processRow()方法来处理。

org.pentaho.di.trans.step.StepDataInterface 数据处理设计的具体数据,以及对数据的状态的设置和回收

org.pentaho.di.trans.step.StepMetaInterface 元数据的处理,加载xml(负责将Dialog数据加载到xml),校验,主要是对一个步骤的定义的基本数据。

org.pentaho.di.trans.step.StepDialogInterface 提供GUI/dialog编译步骤的元数据。

转换步骤插件各个类名规则:

StepInterface的实现类以插件的功能先关命名:*.java

StepDataInterface的实现类:*Data.java

StepMetaInterface实现类:*Meta.java

StepDialogInterface实现类:*Dialog.java

<?xml version="1.0" encoding="UTF-8"?>
<plugin
   id="TemplatePlugin"
   iconfile="icon.png"
   description="Template Plugin"
   tooltip="Only there for demonstration purposes"
   category="Demon"                                <!--插件入所在分类-->
   classname="plugin.template.TemplateStepMeta">   <!--插件入口类-->
	<libraries>
    	<library name="TemplateStepPlugin.jar"/>
	</libraries>
		    
   <localized_category>
     <category locale="en_US">Demon</category>
     <category locale="zh_CN">测试</category>
   </localized_category>
   <localized_description>
     <description locale="en_US">Template step</description>
     <description locale="en_US">Template step</description>
   </localized_description>
   <localized_tooltip>
     <tooltip locale="en_US">Template step</tooltip>
     <tooltip locale="en_US">Template step</tooltip>
   </localized_tooltip>
</plugin>

这里对Meta后缀的类重点讲解(弄清楚这个类,其实已经解决了trans插件开发的都大部分问题):

注意:getXML是保存、loadXML是读取,这2个方法基本是属于模板类的代码,跟着其他项目一样写即可。本质使用xml来定义data数据格式

 

 

kettle插件二次开发

 

如果需要对插件进行错误处理步骤,实现错误分流(实现:主步骤输出、错误处理步骤),需要重写父类方法,返回true即可:

@Override
	public boolean supportsErrorHandling() {
		return true;
	}

这里可以参考一个kettle已有插件的源码:(数据检验)

Dialog类:/kettle-7.1-eclipse/ui/org/pentaho/di/ui/trans/steps/validator/ValidatorDialog.java

其他类:/kettle-7.1-eclipse/engine/org/pentaho/di/trans/steps/validator/包下有:

KettleValidatorException.java

Validation.java

Validator.java

ValidatorData.java

ValidatorMeta.java

messages/

国际化相关:messages/:

messages_en_US.properties

messages_fr_FR.properties

messages_it_IT.properties

messages_ja_JP.properties

messages_ko_KR.properties

messages_zh_CN.properties

具体代码分析:

开始执行,调用:public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException

里面使用Object[]r = getRow(); // get row, set busy!获取一行记录

返回值是boolean,其中有个first来标记是否为第一行。

返回值:

  • false 处理完毕
  • true 循环调用

有个RunThread类内run方法通过while来不断循环调用processRow方法。

在processRow中使用:

  • 正确记录输出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
  • 错误数据输出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");

 

关于日志:BaseStep有如下方法,直接使用即可打印在kettle控制台:

logMinimal、logBasic、logDetailed、logDebug等

并在代码中判断级别:

if (log.isDetailed())

...

if (log.isDebug())

...

if (log.isRowlevel())

...

 

 

具体meta类中一些变量,可以调试查看:在*(按照kettlt命名规范)类中:

meta = (MetaDataPluginMeta) smi;

data = (MetaDataPluginData) sdi;

 

1,meta拿到了dialog中填写的值

 

 

2,调用getRow()方法,直接拿到对应的一行数据,这里input是表输入,这里直接拿到了数据库中第一行数据(数据,不是表头)

3,其中first是BaseStep中定义,当第一行时,first标记为true

4,获取表头信息调用: List<ValueMetaInterface> metaList = getInputRowMeta().getValueMetaList();

这里上面声明了一个data对象是用户存储数据的,data里面有字段来保存表头信息:data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();

5.获取组件名称:getStepname()

6.getMetaStore()

7:关于:getFields方法:

  • 已经过时的:

public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space )

  • 非过时的:

public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space, Repository repository, IMetaStore metaStore )

  • 非过时方法参数含义:
  1. inputRowMeta对应上个组件提供的变量,对数据库input就是sql里面具体的column name调用(RowMetaInterface) getInputRowMeta().clone()可以获取
  2. name 当天转换名称,getStepname()获取
  3. info 直接 null
  4. netStep 直接null
  5. space 变量的存储空间,类型map,来存你定义的内容
  6. getRepository()
  7. getMetaStore()获取

注:该方法在自己编写插件的时候,并没有重写,下一个组件就能直接拿到input的字段名

 

8.如果要添加新的或者修改一行数据,可以用kettlt提供的:RowDataUtil.addValueData来实现:

Object[] outputRow = RowDataUtil.addValueData(r, data.outputRowMeta.size() - 1, "dummy value");

Object[] addValueData( Object[] rowData, int length, Object extra )

参数含义:

rowData 构造出一行数据,r (getRow()获取),当前的行值。

length 要替换的值,所在r的下标

extra 新的值

该类提供了下列方法:

 

9.关于数据分流:

在processRow中使用:

  • 正确记录输出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
  • 错误数据输出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");

其中putError后面三个记录,需要在*Meta中重写,然后在图形化中鼠标右键,定义错误数、错误列名等等,就能在下个组件中拿到直接拿到:

@Override

public boolean supportsErrorHandling() {

return true;//返回true即可。

}

 

整体实现:一个自定义元数据,根据元数据中定义的数据,来检查input表中的数据是否满足,

  • 满足,则正常导入到目标表。
  • 不满足,则写入到错误表。

其中元数据索引表,存储表、表字段、元数据值code。

根据元数据code查询出一个list集合,该集合中的值,为元数据索引表中指定字段允许的值。

最终插件实现效果:

kettle插件二次开发

kettle插件二次开发

 

 

 

 

 

相关标签: kettle