nifi探索之ExecuteScript处理器
程序员文章站
2022-07-06 16:20:14
...
Abstract
本文的主要内容是介绍ExecuteScript这个功能强大的处理器,用户可以根据自己的需求提供脚本代码,然后ExecuteScript就可以根据脚本来处理flowfile。ExectuteScript支持多种语言Groovy/JPython等等,本文的代码采用Groovy。
常用变量
- session变量:类型:ProcessSession,能够完成对flowfile的一些操作,例如create,putAttribute,transfer,read,write
- context变量:类型:ProcessContext,能够获取处理器的一些properties,relationship,controllerService和StateManager等内容
- log变量:用于写日志
- REL_SUCCESS: Success Relationship的引用
- REL_FAILURE: Failure Relationship的引用
常用的方法
- session.get():获取输入的flowfile,从队列中获取优先级最高的flowfile或者返回null
flowfile=session.get()
if(!flowfile)return
//对flowfile进行处理
- session.get(maxNum):获取包含0到maxNum条flowfile组成的list,如果输入的flowfile有多重来源,那么获取的flowfile list中的flowfile可能来源于同一个flowfile queue也可能是多个 flowfile queue
flowfileList=session.get(100)
if(!flowfileList.isEmpty()){
flowfileList.each{flowfile->//对每个flowfile的处理方法
}
- session.create():获取一个新的flowfile对象
flowfile=session.create()
- session.create(parentFlowfile):根据现有的parentFlowfile创建一个新的flowfile,新创建的flowfile将会继承parentFlowfile的属性以及其他内容(除了UUID)
flowfile=session.get()
if(!flowfile)return
newFlowfile=session.create(flowfile)
- session.putAttribute():给flowfile添加或者更新属性,其中UUID这个属性是不可以更改的,如果试图更改,则忽略
注意:flowfile是不可更改的,因此每次putAttribute时,将会创建一个新的flowfile,将旧的flowfile舍弃
flowfile=session.get()
if(!flowfile)return
flowfile=session.putAttribute(flowfile,'myAttr','myValue')
- session.putAllAttribute(attrMap):一次性添加或者更改多个属性
attrMap=['myAttr1':'1','myAttr2':Integer.toString(2)]
flowfile=session.get()
if(!flowfile)return
flowfile=session.putAllAttribute(flowfile,attrMap)
- session.getAttribute(‘attrName’):返回string value或者null(如果该attrname的属性不存在)
flowfile=session.get()
if(!flowfile)return
myAttr=flowfile.getAttribute('filename')
- session.getAllAttributes():获取flowfile所有的属性,返回的是一个map
flowfile=session.get()
if(!flowfile)return
flowfile.getAllAttributes().each{
key,value->//处理操作
}
-session.transfer(flowfile,relationship):将处理之后的flowfile transfer到指定的relationship
flowfile=session.get()
if(!flowfile)return
if(errorOccured)
session.transfer(flowfile,REL_FAILURE)
else
session.transfer(flowfile,REL_SUCCESS)
- log.info(…):日志操作,分为多个等级 warn/trace/debug/info/error
//无参数
log.info('字符串')
//有参数
log.info('字符串:{}{}{}',[参数1,参数2,参数3]as Object[])
原文来源自:https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html
下一篇: NiFi 初识