欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NiFi Processors之ExecuteScript

程序员文章站 2022-03-05 10:21:29
...

NiFi是一个数据处理和分发系统,其中很重要的一部分是处理器(Processors)。一个处理器组合系统间的数据路由、转换或调解。处理器可以访问给定FlowFile的属性及其内容流。处理器可以在给定的工作单元中对零个或多个FlowFile进行操作,并提交该工作或回滚。

本文介绍NiFi处理器之一 :ExecuteScript

描述

试验性的——在给定流文件和进程会话的情况下执行脚本。该脚本负责处理传入的流文件(例如,转移到SUCCESS或删除)同时创建任何流文件。如果处理不完整或不正确,会话将回滚。

持续使用的影响尚未得到验证。

属性

这里只介绍我使用过的配置:

名称 默认值 允许值 描述 提示
Script Engine Clojure Clojure,ECMAScript,Groovy,lua,python,ruby 执行脚步的引擎
Script Body 脚本内容 这里直接写脚本代码

动态属性

动态属性允许用户指定属性名和属性值。

指定的值可以被脚本使用。

使用说明

ExecuteScript是一个多功能处理器,允许用户使用编程语言编写自定义逻辑,每次触发ExecuteScript处理器时都会执行该编程语言。为脚本提供以下变量绑定以启用对NiFi组件的访问:

session:这是对分配给处理器的ProcessSession的引用。该会话允许您对流文件执行操作,如create(),putAttribute(),和 transfer(),以及 read() 和 write()。

context:这是对处理器的ProcessContext的引用。它可用于检索处理器属性,关系,Controller Services和StateManager。

log:这是对处理器的ComponentLog的引用。使用它将消息记录到NiFi,例如 log.info(‘Hello world!’)。

REL_SUCCESS:这是对为处理器定义的“成功”关系的引用。它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

REL_FAILURE:这是对为处理器定义的“失败”关系的引用。与REL_SUCCESS一样,它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

动态属性:ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以根据NiFi表达式语言评估属性,将值转换为适当的数据类型(例如布尔值等)等。因为动态属性名称变为脚本的变量名,您必须知道所选脚本引擎的变量命名属性。例如,Groovy不允许在变量名中使用句点(.),因此如果“my.property”是动态属性名称,则会发生错误。

Python代码示例:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class Area(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    
    if float(obj['LAT']) > 41 and float(obj['LAT']) < 42 and float(obj['LON']) > 123 and float(obj['LON']) < 125:
        obj['W'] = "1"
    else:
        obj['W'] = "0"

    outputStream.write(bytearray(json.dumps(obj, indent=4).encode('utf-8')))
 
flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile, Area())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

代码说明:接收的JSON数据含有经纬度信息,如果数据的经纬度信息在某个经纬度矩形区域内,为JSON数据添加W: "1",否则添加W: "0"。后面可以根据W的值对数据进行分流。

参考:

https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html

https://community.hortonworks.com/articles/35568/python-script-in-nifi.html

相关标签: 大数据 大数据

上一篇: TESS4J

下一篇: 2.搭建Hadoop HA