etl nifi ExecuteScript 一些 Groovy,Jython,Javascript(Nashorn)和JRuby 语言手法
目录
用create()搞一个新的FlowFile发送到下一个处理器
介绍
ExecuteScript 的 Groovy,Jython,Javascript(Nashorn)和JRuby 写法, ExecuteScript优势可言灵活的写出内容,在运行中使用脚本。
几个重要的玩意
- session(会话):这是对分配给处理器的ProcessSession的引用。会话允许您对流文件(如
create()
,putAttribute()
和transfer()
以及read()
和write()
()进行操作。 - context(上下文):这是对处理器的ProcessContext的引用。它可以用来检索处理器属性,关系,Controller服务和StateManager。
- log:这是对处理器ComponentLog的引用。用它来记录消息给NiFi,比如
log.info('Hello world!')
- REL_SUCCESS:这是对处理器定义的“成功”关系的引用。它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。
- REL_FAILURE:这是对处理器定义的“失败”关系的引用。和REL_SUCCESS一样,它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。
- Dynamic Properties : 在ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以针对NiFi表达式语言评估该属性,将该值作为适当的数据类型(例如布尔值)等进行转换。由于动态属性名称会成为脚本的变量名称,您必须知道所选脚本引擎的变量命名属性。例如,Groovy不允许在变量名称中使用句点(。),因此如果“my.property”是一个动态属性名称,则会发生错误。
获取文件前提条件
- 从会话中获取传入的流文件
- 在ExecuteScript 的前一个节点要有个流文件 FlowFile ,大部分处理器都ok
- 用 session.get() 就可以获取
获取流file小李子
-
Groovy
flowFile = session.get()
if(!flowFile) return
- jython
flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
# implicit return at the end
- Javascript
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}
- JRuby
flowFile = session.get()
if flowFile != nil
# All processing code goes here
end
从回话中获取多个流文件然后弄它
- 使用会话对象中的get(maxResults)此方法返回到来自工作队列的maxResults FlowFiles
- 如果没有FlowFiles可用,则返回一个空列表(该方法不返回null)
- 如果存在多个传入队列,则根据一次调用是否轮询所有队列或仅调用一个队列,行为是未指定的。
小李子
- Groovy
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
flowFileList.each { flowFile ->
// Process each FlowFile here
}
}
- Jython
flowFileList = session.get(100)
if not flowFileList.isEmpty():
for flowFile in flowFileList:
# Process each FlowFile here
- Javascript
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
for each (var flowFile in flowFileList) {
// Process each FlowFile here
}
}
- JRuby
flowFileList = session.get(100)
if !(flowFileList.isEmpty())
flowFileList.each { |flowFile|
# Process each FlowFile here
}
end
用create()搞一个新的FlowFile发送到下一个处理器
- 使用会话对象的create()方法。
- 此方法返回一个新的FlowFile对象,
- 在上执行进一步的处理
小李子
- Groovy
flowFile = session.create()
// Additional processing here
- Jython
flowFile = session.create()
# Additional processing here
- Javascript
var flowFile = session.create();
// Additional processing here
- JRuby
flowFile = session.create()
# Additional processing here
从基于传入的FlowFile生成新的FlowFile
- 使用会话对象的create(parentFlowFile)方法。
- 此方法采用父级FlowFile引用
- 并返回一个新的子FlowFile对象。
- 新创建的FlowFile将继承除UUID之外的所有父级属性
- 此方法将自动生成Provenance FORK事件或Provenance JOIN事件
- 具体取决于在提交ProcessSession之前是否从同一父级生成了其他FlowFiles
栗子
- Groovy
flowFile = session.get()
if(!flowFile) return
newFlowFile = session.create(flowFile)
// Additional processing here
- Jython
flowFile = session.get()
if (flowFile != None):
newFlowFile = session.create(flowFile)
# Additional processing here
- Javascript
var flowFile = session.get();
if (flowFile != null) {
var newFlowFile = session.create(flowFile);
// Additional processing here
}
- JRuby
flowFile = session.get()
if flowFile != nil
newFlowFile = session.create(flowFile)
# Additional processing here
end
想要添加自定义属性的流文件,为流文件添加一个属性
- 使用会话对象中的putAttribute(flowFile,attributeKey,attributeValue)方法
- 此方法使用给定的键/值对更新给定的FlowFile属性
- 注意:“uuid”属性对于FlowFile是固定的,不能修改; 如key被命名为“uuid”,它将被忽略。
FlowFile对象介绍
这也是一个很好的提及FlowFile对象是不可变的;这意味着如果您通过API更新FlowFile的属性(或以其他方式更改),则会获得新版本的FlowFile的新参考。将FlowFiles传输到关系时,这是非常重要的。您必须保留对最新版本FlowFile的引用,并且必须传输或删除从会话中检索或创建的所有FlowFiles的最新版本,否则在执行时会出现错误。大多数情况下,用于存储FlowFile引用的变量将被从改变FlowFile的方法返回的最新版本覆盖(中间FlowFile引用将自动丢弃)。在这些示例中,您将看到添加属性时重新使用flowFile引用的这种技术。请注意,对FlowFile的当前引用被传递给putAttribute()方法。生成的FlowFile具有名为“myAttr”的属性,其值为“myValue”。另请注意,该方法需要一个字符串的值;如果你有一个对象,你将不得不将它序列化为一个字符串。最后,请注意,如果您要添加多个属性,最好创建一个Map并使用putAllAttributes()来代替 看下边栗子
小李子
- Groovy
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
- Jython
flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end
- Javascript
var flowFile = session.get();
if (flowFile != null) {
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}
- JRuby
flowFile = session.get()
if flowFile != nil
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end
想要添加自定义属性的流文件,将多个属性添加到流文件
- 使用会话对象中的putAllAttributes(flowFile,attributeMap)方法。
- 此方法使用给定Map中的键/值对更新给定的FlowFile属性。
- 注意:“uuid”属性对于FlowFile是固定的,不能修改;如果**被命名为“uuid”,它将被忽略。
Map介绍
这里的技术是创建一个你想更新的属性键/值对的Map(Jython中的字典,JRuby中的Hash),然后调用putAllAttributes()。这比为每个键/值对调用putAttribute()要高效得多,因为后一种情况会导致框架为添加的每个属性创建一个临时版本的FlowFile。这些示例显示了两个条目myAttr1和myAttr2的映射,设置为“1”,将数字2的语言特定的强制转换为字符串(以符合key和value均需要字符串值的方法签名)。请注意,session.transfer()没有在这里指定(所以下面的代码片段不工作)
示例
- Groovy
attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)
- Jython
attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
flowFile = session.putAllAttributes(flowFile, attrMap)
# implicit return at the end
- Javascript
var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()
if (flowFile != null) {
flowFile = session.putAllAttributes(flowFile, attrMap)
}
- JRuby
attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get()
if flowFile != nil
flowFile = session.putAllAttributes(flowFile, attrMap)
end
从流文件中获取属性
- 使用FlowFile对象的getAttribute(attributeKey)方法。
- 此方法返回给定attributeKey的String值,如果找不到attributeKey,则返回null。 这些例子显示了检索“filename”属性的值。
栗子
- Groovy
flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')
- Jython
flowFile = session.get()
if (flowFile != None):
myAttr = flowFile.getAttribute('filename')
# implicit return at the end
- Javascript
var flowFile = session.get()
if (flowFile != null) {
var myAttr = flowFile.getAttribute('filename')
}
- JRuby
flowFile = session.get()
if flowFile != nil
myAttr = flowFile.getAttribute('filename')
end
从流文件获取所有属性 all
- 使用FlowFile对象的getAttributes()方法。
- 此方法返回一个带有String键和String值的Map,表示流文件的属性的键/值对。示例显示了对FlowFile的所有属性的Map的迭代。
栗子:
- Groovy
flowFile = session.get()
if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}
- Jython
flowFile = session.get()
if (flowFile != None):
for key,value in flowFile.getAttributes().iteritems():
# Do something with key and/or value
# implicit return at the end
- Javascript
var flowFile = session.get()
if (flowFile != null) {
var attrs = flowFile.getAttributes();
for each (var attrKey in attrs.keySet()) {
// Do something with attrKey (the key) and/or attrs[attrKey] (the value)
}
}
- JRuby
flowFile = session.get()
if flowFile != nil
flowFile.getAttributes().each { |key,value|
# Do something with key and/or value
}
end
将流文件转移到关系 (“成功”或“失败”)
- 在处理流文件(新建或传入)之后,您要将流文件转换为关系(“成功”或“失败”)。
- 在这种简单的情况下,让我们假设有一个名为“errorOccurred”的变量,指出FlowFile应该传送到哪个关系。
- 使用会话对象的transfer(flowFile,relationship)方法。
- 此方法根据给定的关系将给定的FlowFile传送到适当的目标处理器工作队列。
- 从文档:如果关系导致多于一个目的地,则FlowFile的状态被复制,使得每个目的地都接收到FlowFile的精确副本,尽管每个目的地将具有其自己的唯一标识。
重点
- ExecuteScript将在每次执行结束时执行session.commit()以确保操作已被提交。不需要(也不应该)在脚本中执行session.commit()。
栗子
- Groovy
flowFile = session.get()
if(!flowFile) return
// Processing occurs here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
}
else {
session.transfer(flowFile, REL_SUCCESS)
}
- Jython
flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
if errorOccurred:
session.transfer(flowFile, REL_FAILURE)
else:
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
- Javascript
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
}
else {
session.transfer(flowFile, REL_SUCCESS)
}
}
- JRuby
flowFile = session.get()
if flowFile != nil
# All processing code goes here
if errorOccurred
session.transfer(flowFile, REL_FAILURE)
else
session.transfer(flowFile, REL_SUCCESS)
end
end
设置自己的日志和日志级别弄起来
- 使用带有warn(),trace(),debug(),info()或error()方法的log变量。
这些方法可以采用单个字符串,或者一个字符串,后跟一个对象数组,或者一个字符串,后跟一个Throwable对象数组。第一个用于简单的消息。当你有一些你想记录的动态对象/值的时候使用第二种。要在消息字符串中引用这些消息,请在消息中使用“{}”。这些是按照外观顺序对Object数组进行评估的,所以如果消息的内容是“Found these things:{} {} {}”,而Object数组是[‘Hello’,1,true],那么记录的消息将会是“找到这些东西:你好1true”。这些日志记录方法的第三种形式也需要一个Throwable参数,并且在发生异常并且想要记录它时非常有用。
栗子
- Groovy
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
- Jython
from java.lang import Object
from jarray import array
objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)
- Javascript
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)
- JRuby
log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)
ok
持续更新
上一篇: 扩充String类型的函数功能
下一篇: Java微信公众平台开发之扫码支付模式二