欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

etl工具nifi使用系列(六):使用ExecuteScript执行脚本(1)

程序员文章站 2022-07-06 16:19:44
...

介绍

本文介绍了如何使用NiFi处理器ExecuteScript完成特定任务的各种方法,并给出了GroovyJython,Javascript(Nashorn)和JRuby中给出的示例。

  • 第1部分 - 介绍NiFi API和FlowFiles
    • 从传入队列获取流文件
    • 创建新的流文件
    • 使用流文件属性
    • 传输流文件
    • 记录
  • 第2部分 - FlowFile I / O和错误处理

    • 从流文件中读取
    • 写入流文件
    • 从流文件读取和写入
    • 错误处理
  • 第3部分 - 高级功能

    • 使用动态属性
    • 添加模块
    • 国家管理
    • 访问控制器服务

介绍
ExecuteScript是一个多功能处理器,允许用户使用编程语言编写自定义逻辑,每次触发ExecuteScript处理器时都会执行该编程语言。以下变量绑定被提供给脚本以允许访问NiFi组件:

session(会话):这是对分配给处理器的ProcessSession的引用。会话允许您对流文件(如create()putAttribute()transfer()以及read()write()()进行操作。

context(上下文):这是对处理器的ProcessContext的引用。它可以用来检索处理器属性,关系,Controller服务和StateManager。

log:这是对处理器ComponentLog的引用。用它来记录消息给NiFi,比如log.info('Hello world!')

REL_SUCCESS:这是对处理器定义的“成功”关系的引用。它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。

REL_FAILURE:这是对处理器定义的“失败”关系的引用。和REL_SUCCESS一样,它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。

Dynamic Properties : 在ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以针对NiFi表达式语言评估该属性,将该值作为适当的数据类型(例如布尔值)等进行转换。由于动态属性名称会成为脚本的变量名称,您必须知道所选脚本引擎的变量命名属性。例如,Groovy不允许在变量名称中使用句点(。),因此如果“my.property”是一个动态属性名称,则会发生错误。

与这些变量的交互是通过NiFi Java API完成的,下面的每个配方将在引入时讨论相关的API调用。以下部分的配方对流文件执行各种功能,例如读取/写入属性,转移到关系,记录等。请注意,这些示例是片段,不会按原样运行。例如,如果使用session.get()从队列中检索到流文件,则必须将其转移到关系或删除,否则会发生错误。这些代码片段只是为了说明概念,没有添加样板代码来使其成为实例。在后面的文章中,我将把它们放在一起,以显示执行有用任务的完整工作脚本。

需求:从会话中获取传入的流文件

use case:您有连接到ExecuteScript的连接,并且想要从队列中检索一个流文件进行处理。

方法:使用会话对象中的get()方法。 此方法返回要处理的次最高优先级FlowFile的FlowFile。 如果没有FlowFile进行处理,则该方法将返回null。 请注意,即使处理器中存在稳定的FlowFiles流,也可能返回null。 如果处理器有多个并发任务,并且其他任务已经检索到FlowFiles,则可能发生这种情况。 如果脚本需要一个FlowFile继续处理,那么它应该立即返回,如果从session.get()返回null
Examples:
Groovy

flowFile = session.get()
if(!flowFile) return

jython

flowFile = session.get() 
if (flowFile != None):
    # All processing code starts at this indent
# implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
   // All processing code goes here
}

JRuby

flowFile = session.get()
if flowFile != nil
   # All processing code goes here
end

需求:从会话中获取多个传入的流文件

Use Case: 你有连接到ExecuteScript的连接,并想从队列中检索多个流文件进行处理。

方法:使用会话对象中的get(maxResults)方法。此方法返回到来自工作队列的maxResults FlowFiles。如果没有FlowFiles可用,则返回一个空列表(该方法不返回null)。注意:如果存在多个传入队列,则根据一次调用是否轮询所有队列或仅调用一个队列,行为是未指定的。话虽如此,观察到的行为(NiFi 1.1.0+和以前)在这里描述。

Examples:

Groovy

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
   flowFileList.each { flowFile -> 
       // Process each FlowFile here
   }
}

Jython

flowFileList = session.get(100)
if not flowFileList.isEmpty():
    for flowFile in flowFileList: 
         # Process each FlowFile here

Javascript

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
  for each (var flowFile in flowFileList) { 
       // Process each FlowFile here
  }
}

JRuby

flowFileList = session.get(100)
if !(flowFileList.isEmpty())
   flowFileList.each { |flowFile| 
       # Process each FlowFile here
   }
end

需求: 创建一个新的FlowFile

Use Case: 想要生成一个新的FlowFile发送到下一个处理器

Approach:使用会话对象的create()方法。此方法返回一个新的FlowFile对象,您可以在其上执行进一步的处理

Examples:

Groovy

flowFile = session.create()
// Additional processing here

Jython

flowFile = session.create() 
# Additional processing here

Javascript

var flowFile = session.create();
// Additional processing here

JRuby

flowFile = session.create()
# Additional processing here

需求: 从父级FlowFile创建一个新的FlowFile

Use Case: 基于传入的FlowFile生成新的FlowFile

Approach:使用会话对象的create(parentFlowFile)方法。此方法采用父级FlowFile引用,并返回一个新的子FlowFile对象。新创建的FlowFile将继承除UUID之外的所有父级属性。此方法将自动生成Provenance FORK事件或Provenance JOIN事件,具体取决于在提交ProcessSession之前是否从同一父级生成了其他FlowFiles。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
newFlowFile = session.create(flowFile)
// Additional processing here

Jython

flowFile = session.get() 
if (flowFile != None):
    newFlowFile = session.create(flowFile) 
    # Additional processing here

Javascript

var flowFile = session.get();
if (flowFile != null) {
  var newFlowFile = session.create(flowFile);
  // Additional processing here
}

JRuby

flowFile = session.get()
if flowFile != nil
  newFlowFile = session.create(flowFile)
  # Additional processing here
end

需求:为流文件添加一个属性

Use Case: 想要添加自定义属性的流文件。

方法:使用会话对象中的putAttribute(flowFile,attributeKey,attributeValue)方法。 此方法使用给定的键/值对更新给定的FlowFile属性。 注意:“uuid”属性对于FlowFile是固定的,不能修改; 如key被命名为“uuid”,它将被忽略。

这也是一个很好的提及FlowFile对象是不可变的;这意味着如果您通过API更新FlowFile的属性(或以其他方式更改),则会获得新版本的FlowFile的新参考。将FlowFiles传输到关系时,这是非常重要的。您必须保留对最新版本FlowFile的引用,并且必须传输或删除从会话中检索或创建的所有FlowFiles的最新版本,否则在执行时会出现错误。大多数情况下,用于存储FlowFile引用的变量将被从改变FlowFile的方法返回的最新版本覆盖(中间FlowFile引用将自动丢弃)。在这些示例中,您将看到添加属性时重新使用flowFile引用的这种技术。请注意,对FlowFile的当前引用被传递给putAttribute()方法。生成的FlowFile具有名为“myAttr”的属性,其值为“myValue”。另请注意,该方法需要一个字符串的值;如果你有一个对象,你将不得不将它序列化为一个字符串。最后,请注意,如果您要添加多个属性,最好创建一个Map并使用putAllAttributes()来代替(请参阅下面以了解详细信息)。
Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Jython

flowFile = session.get() 
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
   flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}

JRuby

flowFile = session.get()
if flowFile != nil
   flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end

Recipe: 将多个属性添加到流文件

Use Case: 想要添加自定义属性的流文件。

Approach:使用会话对象中的putAllAttributes(flowFile,attributeMap)方法。此方法使用给定Map中的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改;如果**被命名为“uuid”,它将被忽略。

这里的技术是创建一个你想更新的属性键/值对的Map(Jython中的字典,JRuby中的Hash),然后调用putAllAttributes()。这比为每个键/值对调用putAttribute()要高效得多,因为后一种情况会导致框架为添加的每个属性创建一个临时版本的FlowFile(请参阅上面关于FlowFile不变性的讨论)。这些示例显示了两个条目myAttr1和myAttr2的映射,设置为“1”,将数字2的语言特定的强制转换为字符串(以符合key和value均需要字符串值的方法签名)。请注意,session.transfer()没有在这里指定(所以下面的代码片段不工作),请参阅下面的配方。

Examples:

Groovy

attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)

Jython

attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get() 
if (flowFile != None):
    flowFile = session.putAllAttributes(flowFile, attrMap)
# implicit return at the end

Javascript

var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get() 
if (flowFile != null) {
    flowFile = session.putAllAttributes(flowFile, attrMap)
}

JRuby

attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get() 
if flowFile != nil
    flowFile = session.putAllAttributes(flowFile, attrMap)
end

Recipe: 从流文件中获取属性

Use Case: 想要检查属性的流文件。

Approach:使用FlowFile对象的getAttribute(attributeKey)方法。 此方法返回给定attributeKey的String值,如果找不到attributeKey,则返回null。 这些例子显示了检索“filename”属性的值。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')

Jython

flowFile = session.get() 
if (flowFile != None):
    myAttr = flowFile.getAttribute('filename')
# implicit return at the end

Javascript

var flowFile = session.get() 
if (flowFile != null) {
    var myAttr = flowFile.getAttribute('filename')
}

JRuby

flowFile = session.get() 
if flowFile != nil
    myAttr = flowFile.getAttribute('filename')
end

Recipe: 从流文件获取所有属性

Use Case: You have a flow file from which you’d like to retrieve its attributes.

3202/5000
食谱:从流文件获取所有属性

用例:你有一个你想从中获取属性的流文件。

方法:使用FlowFile对象的getAttributes()方法。此方法返回一个带有String键和String值的Map,表示流文件的属性的键/值对。示例显示了对FlowFile的所有属性的Map的迭代。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
flowFile.getAttributes().each { key,value ->
  // Do something with the key/value pair
}

Jython

flowFile = session.get() 
if (flowFile != None):
    for key,value in flowFile.getAttributes().iteritems():
       # Do something with key and/or value
# implicit return at the end

Javascript

var flowFile = session.get() 
if (flowFile != null) {
    var attrs = flowFile.getAttributes();
    for each (var attrKey in attrs.keySet()) { 
       // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
  }
}

JRuby

flowFile = session.get() 
if flowFile != nil
    flowFile.getAttributes().each { |key,value| 
       # Do something with key and/or value
   }
end

Recipe: 将流文件转移到关系

Use Case: 在处理流文件(新建或传入)之后,您要将流文件转换为关系(“成功”或“失败”)。在这种简单的情况下,让我们假设有一个名为“errorOccurred”的变量,指出FlowFile应该传送到哪个关系。其他错误处理技术将在本系列的第2部分中讨论。
方法:使用会话对象的transfer(flowFile,relationship)方法。从文档:此方法根据给定的关系将给定的FlowFile传送到适当的目标处理器工作队列。如果关系导致多于一个目的地,则FlowFile的状态被复制,使得每个目的地都接收到FlowFile的精确副本,尽管每个目的地将具有其自己的唯一标识。

注:ExecuteScript将在每次执行结束时执行session.commit()以确保操作已被提交。不需要(也不应该)在脚本中执行session.commit()。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
// Processing occurs here
if(errorOccurred) {
  session.transfer(flowFile, REL_FAILURE)
}
else {
  session.transfer(flowFile, REL_SUCCESS)
}

Jython

flowFile = session.get() 
if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
   // All processing code goes here
   if(errorOccurred) {
     session.transfer(flowFile, REL_FAILURE)
   }
   else {
     session.transfer(flowFile, REL_SUCCESS)
   }
}

JRuby

flowFile = session.get()
if flowFile != nil
   # All processing code goes here
   if errorOccurred
     session.transfer(flowFile, REL_FAILURE)
   else
     session.transfer(flowFile, REL_SUCCESS)
   end
end

Recipe: 以指定的日志记录级别向日志发送消息

Use Case: 想要将处理过程中发生的事件报告给日志框架。

Approach:使用带有warn(),trace(),debug(),info()或error()方法的log变量。这些方法可以采用单个字符串,或者一个字符串,后跟一个对象数组,或者一个字符串,后跟一个Throwable对象数组。第一个用于简单的消息。当你有一些你想记录的动态对象/值的时候使用第二种。要在消息字符串中引用这些消息,请在消息中使用“{}”。这些是按照外观顺序对Object数组进行评估的,所以如果消息的内容是“Found these things:{} {} {}”,而Object数组是[‘Hello’,1,true],那么记录的消息将会是“找到这些东西:你好1true”。这些日志记录方法的第三种形式也需要一个Throwable参数,并且在发生异常并且想要记录它时非常有用。

Examples:

Groovy

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

Jython

from java.lang import Object
from jarray import array
objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)

Javascript

var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)

JRuby

log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)

希望这些片段有助于在各种脚本语言和流文件操作的背景下说明NiFi API的各个部分。我将在后面的文章中将这些配方放在一起,以展示端到端脚本的一些示例。有关更多示例,用例和解释,请查看我的博客。在本系列的下一篇文章中,我将讨论读取和写入流文件的内容,以及讨论错误处理技术。

原文链接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html