欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

nifi探索之ExecuteScript处理器(2)

程序员文章站 2022-07-06 16:20:26
...

Abstract

接上一篇,本节主要叙述flowfile相关的IO操作以及一些相对比较高级的处理方法

IO部分

  1. flowfile的读取 session.read(flowfile,InputStreamCallback), 其中InputStreamCallback为一个callback 对象,提供了InputStream对象能够从flowfile中读取数据
flowfile=session.get()
if(!flowfile)return
def text=''
session.read(flowfile,{inputStream->   text=IOUtils.toString(inputStream,StandardCharsets.UTF_8)
 }as InputStreamCallback)
  1. flowfile的写入 session.write(flowfile,outputStreamCallback)
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
  outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
  1. flowfile内容更新 session.write(flowfile,streamCallback)
flowfile=session.get()
if(!flowfile)return
def text='Hello world'
flowfile=session.write(flowfile,{inputStream,outputStream->
    text=IOUtils.toString(inputStream,StandardCharsets.UTF_8)
    outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
}as StreamCallback)
session.transfer(flowfile,REL_SUCCESS)
  1. exception handling
flowfile=session.get()
if(!flowfile)return
try{
    session.transfer(flowfile,REL_SUCCESS)
}catch(e){
    log.error('Error: transfer flowfile ',e)
    session.transfer(flowfile,REL_FAILURE)
}

高级操作

  1. Dynamic Properties
    ExecuteScript允许用户定义写properties,nifi中的一些processor并不支持dynamic properties,但是ExecuteScript会将dynamic properties传递给变量,可以通过propertity name获取对应的取值
    e.g. 在executeScript中添加了一个myProperty1属性,对应值为myValue1
def myValue1=myProperty1.value
  1. State Management,用于存储一些信息,比如QueryDatabaseTable中会记录特定列的最大值,当下一次再fetch 数据时,只会返回大于该值的数据。保存的信息分为两个level:local和cluster
import org.apache.nifi.components.state.Scope
def oldMap=context.stateManager.getState(Scope.LOCAL).toMap()
def stateManager=context.stateManger
def stateMap=stateManager.getState(Scope.CLUSTER)
def newMap=['myKey1':'myValue1']
if(stateMap.version==-1)
    stateManager.setState(newMap,Scope.CLUSTER)
else
    stateManager.replace(stateMap,newMap,Scope.CLUSTER)

clear state map

import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)
相关标签: nifi processor