nifi探索之ExecuteScript处理器(2)
程序员文章站
2022-07-06 16:20:26
...
Abstract
接上一篇,本节主要叙述flowfile相关的IO操作以及一些相对比较高级的处理方法
IO部分
- flowfile的读取 session.read(flowfile,InputStreamCallback), 其中InputStreamCallback为一个callback 对象,提供了InputStream对象能够从flowfile中读取数据
flowfile=session.get()
if(!flowfile)return
def text=''
session.read(flowfile,{inputStream-> text=IOUtils.toString(inputStream,StandardCharsets.UTF_8)
}as InputStreamCallback)
- flowfile的写入 session.write(flowfile,outputStreamCallback)
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
- flowfile内容更新 session.write(flowfile,streamCallback)
flowfile=session.get()
if(!flowfile)return
def text='Hello world'
flowfile=session.write(flowfile,{inputStream,outputStream->
text=IOUtils.toString(inputStream,StandardCharsets.UTF_8)
outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
}as StreamCallback)
session.transfer(flowfile,REL_SUCCESS)
- exception handling
flowfile=session.get()
if(!flowfile)return
try{
session.transfer(flowfile,REL_SUCCESS)
}catch(e){
log.error('Error: transfer flowfile ',e)
session.transfer(flowfile,REL_FAILURE)
}
高级操作
- Dynamic Properties
ExecuteScript允许用户定义写properties,nifi中的一些processor并不支持dynamic properties,但是ExecuteScript会将dynamic properties传递给变量,可以通过propertity name获取对应的取值
e.g. 在executeScript中添加了一个myProperty1属性,对应值为myValue1
def myValue1=myProperty1.value
- State Management,用于存储一些信息,比如QueryDatabaseTable中会记录特定列的最大值,当下一次再fetch 数据时,只会返回大于该值的数据。保存的信息分为两个level:local和cluster
import org.apache.nifi.components.state.Scope
def oldMap=context.stateManager.getState(Scope.LOCAL).toMap()
def stateManager=context.stateManger
def stateMap=stateManager.getState(Scope.CLUSTER)
def newMap=['myKey1':'myValue1']
if(stateMap.version==-1)
stateManager.setState(newMap,Scope.CLUSTER)
else
stateManager.replace(stateMap,newMap,Scope.CLUSTER)
clear state map
import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)
上一篇: Hadoop的HA搭建
推荐阅读