kettle插件二次开发
先把环境搭建好,这里可以参考我之前的博客,或者比较好的参考文献:
https://blog.csdn.net/tj_java/article/details/78765158
原理部分:
转换插件开发:
kettle转换步骤插件至少需要实现四个接口:
org.pentaho.di.trans.step.StepInterface 负责数据处理,转换和流转。这里面主要由processRow()方法来处理。
org.pentaho.di.trans.step.StepDataInterface 数据处理设计的具体数据,以及对数据的状态的设置和回收
org.pentaho.di.trans.step.StepMetaInterface 元数据的处理,加载xml(负责将Dialog数据加载到xml),校验,主要是对一个步骤的定义的基本数据。
org.pentaho.di.trans.step.StepDialogInterface 提供GUI/dialog编译步骤的元数据。
转换步骤插件各个类名规则:
StepInterface的实现类以插件的功能先关命名:*.java
StepDataInterface的实现类:*Data.java
StepMetaInterface实现类:*Meta.java
StepDialogInterface实现类:*Dialog.java
<?xml version="1.0" encoding="UTF-8"?>
<plugin
id="TemplatePlugin"
iconfile="icon.png"
description="Template Plugin"
tooltip="Only there for demonstration purposes"
category="Demon" <!--插件入所在分类-->
classname="plugin.template.TemplateStepMeta"> <!--插件入口类-->
<libraries>
<library name="TemplateStepPlugin.jar"/>
</libraries>
<localized_category>
<category locale="en_US">Demon</category>
<category locale="zh_CN">测试</category>
</localized_category>
<localized_description>
<description locale="en_US">Template step</description>
<description locale="en_US">Template step</description>
</localized_description>
<localized_tooltip>
<tooltip locale="en_US">Template step</tooltip>
<tooltip locale="en_US">Template step</tooltip>
</localized_tooltip>
</plugin>
这里对Meta后缀的类重点讲解(弄清楚这个类,其实已经解决了trans插件开发的都大部分问题):
注意:getXML是保存、loadXML是读取,这2个方法基本是属于模板类的代码,跟着其他项目一样写即可。本质使用xml来定义data数据格式
如果需要对插件进行错误处理步骤,实现错误分流(实现:主步骤输出、错误处理步骤),需要重写父类方法,返回true即可:
@Override
public boolean supportsErrorHandling() {
return true;
}
这里可以参考一个kettle已有插件的源码:(数据检验)
Dialog类:/kettle-7.1-eclipse/ui/org/pentaho/di/ui/trans/steps/validator/ValidatorDialog.java
其他类:/kettle-7.1-eclipse/engine/org/pentaho/di/trans/steps/validator/包下有:
KettleValidatorException.java
Validation.java
Validator.java
ValidatorData.java
ValidatorMeta.java
messages/
国际化相关:messages/:
messages_en_US.properties
messages_fr_FR.properties
messages_it_IT.properties
messages_ja_JP.properties
messages_ko_KR.properties
messages_zh_CN.properties
具体代码分析:
开始执行,调用:public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException
里面使用Object[]r = getRow(); // get row, set busy!获取一行记录
返回值是boolean,其中有个first来标记是否为第一行。
返回值:
- false 处理完毕
- true 循环调用
有个RunThread类内run方法通过while来不断循环调用processRow方法。
在processRow中使用:
- 正确记录输出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
- 错误数据输出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");
关于日志:BaseStep有如下方法,直接使用即可打印在kettle控制台:
logMinimal、logBasic、logDetailed、logDebug等
并在代码中判断级别:
if (log.isDetailed())
...
if (log.isDebug())
...
if (log.isRowlevel())
...
具体meta类中一些变量,可以调试查看:在*(按照kettlt命名规范)类中:
meta = (MetaDataPluginMeta) smi;
data = (MetaDataPluginData) sdi;
1,meta拿到了dialog中填写的值
2,调用getRow()方法,直接拿到对应的一行数据,这里input是表输入,这里直接拿到了数据库中第一行数据(数据,不是表头)
3,其中first是BaseStep中定义,当第一行时,first标记为true
4,获取表头信息调用: List<ValueMetaInterface> metaList = getInputRowMeta().getValueMetaList();
这里上面声明了一个data对象是用户存储数据的,data里面有字段来保存表头信息:data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();
5.获取组件名称:getStepname()
6.getMetaStore()
7:关于:getFields方法:
- 已经过时的:
public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space )
- 非过时的:
public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space, Repository repository, IMetaStore metaStore )
- 非过时方法参数含义:
- inputRowMeta对应上个组件提供的变量,对数据库input就是sql里面具体的column name调用(RowMetaInterface) getInputRowMeta().clone()可以获取
- name 当天转换名称,getStepname()获取
- info 直接 null
- netStep 直接null
- space 变量的存储空间,类型map,来存你定义的内容
- getRepository()
- getMetaStore()获取
注:该方法在自己编写插件的时候,并没有重写,下一个组件就能直接拿到input的字段名
8.如果要添加新的或者修改一行数据,可以用kettlt提供的:RowDataUtil.addValueData来实现:
Object[] outputRow = RowDataUtil.addValueData(r, data.outputRowMeta.size() - 1, "dummy value");
Object[] addValueData( Object[] rowData, int length, Object extra )
参数含义:
rowData 构造出一行数据,r (getRow()获取),当前的行值。
length 要替换的值,所在r的下标
extra 新的值
该类提供了下列方法:
9.关于数据分流:
在processRow中使用:
- 正确记录输出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
- 错误数据输出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");
其中putError后面三个记录,需要在*Meta中重写,然后在图形化中鼠标右键,定义错误数、错误列名等等,就能在下个组件中拿到直接拿到:
@Override
public boolean supportsErrorHandling() {
return true;//返回true即可。
}
整体实现:一个自定义元数据,根据元数据中定义的数据,来检查input表中的数据是否满足,
- 满足,则正常导入到目标表。
- 不满足,则写入到错误表。
其中元数据索引表,存储表、表字段、元数据值code。
根据元数据code查询出一个list集合,该集合中的值,为元数据索引表中指定字段允许的值。
最终插件实现效果:
上一篇: 阿里图标库iconfont前端使用
下一篇: ICONFONT字体图标库使用