NiFi 脚本执行器使用指南 (part 1)

NiFi脚本执行器-ExecuteScript 使用指南 (part 1)

ExecuteScript让NiFi能够执行脚原本完成数据流程任务,从而能够编写本身的任务节点而不只仅是采用已有的任务节点,具备很强的灵活性。html

本文是介绍使用ExecuteScript来完成任务的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的“菜谱” 包括:java

Part 1 - 介绍 NiFi API 和 FlowFilesapache

  • 从incoming queue获得flow file
  • 建立一个新的 flow files
  • 与 flow file attributes一块儿工做
  • 转换 flow file
  • 日志 Logging

Part 2 - FlowFile I/O 和 Error Handling编程

  • 从 flow file 中读取
  • 写入 flow file
  • 读/写 flow file
  • Error Handling

Part 3 - 高级特征数组

  • 使用动态属性
  • 添加模块
  • 状态管理
  • 存取控制器服务

介绍

ExecuteScript 是一个万能的处理器,容许用户使用编程语言定义本身的数据处理功能, 在每一次 ExecuteScript processor 触发时被调用。下面的变量绑定到脚本环境,以提供脚本中访问 NiFi 组件环境:ruby

session: 是对processor的ProcessSession属性的引用。session容许在 flow files 执行下面的操做: create(), putAttribute(), transfer(), 像 read() 和 write()同样。session

context: 是对 ProcessContext 的引用。能够用于检索 processor 的属性, 关系, Controller Services, 和 StateManager。数据结构

log: 是对ComponentLog的引用。用于 log 消息到 NiFi系统, 如 log.info('Hello world!')。并发

REL_SUCCESS: 这是对 "success" relationship 的引用。这是从父类 (ExecuteScript)的静态变量基础来的, 可是一些引擎(如 Lua)不容许引用静态成员, 只是一个为了方便的变量。框架

REL_FAILURE: 这是对 "failure" relationship 的引用。与 REL_SUCCESS 同样, 这是从父类 (ExecuteScript)的静态变量基础来的, 可是一些引擎(如 Lua)不容许引用静态成员, 只是一个为了方便的变量。

Dynamic Properties: 任何在ExecuteScript定义的动态属性都做为变量集合到 PropertyValue 对象,对应于dynamic property。容许得到property的 String 值 , 经过NiFi表达式进行求值,得到相应的类型 (如 Boolean, 等等)。 由于动态属性名称成为脚本里的变量名, 你须要了解所选的脚本引擎的变量命名属性。 例如, Groovy 不容许变量名中提供 (.) , 因此,若是 "my.property"做为动态属性将会报错。

与这些变量名的交互经过 NiFi Java API进行, 下面的每个案例将讨论相应的API调用。 下面的案例执行不一样的函数操做 flow files, 如 reading/writing 属性, 转换为 relationship, logging, 等等。须要注意,这里的例子只是一些片断。举例来讲, 若是使用session.get()从队列中获取 flow file , 必须转换为 relationship 或者移除, 要不将会引起错误。代码片断应该是平面化的并且保持清晰,没有容易引发混乱的代码,仅用于演示概念,从而让工做简单。 在之后的文章中,我将这些放到一块儿,造成完整的脚本,能够干一些有用的事情。

Recipes

Recipe: 从session中得到flow file

Use Case: 从队列中得到输入flow file,执行 ExecuteScript 并进行处理。

Approach: 使用 session对象的get(). 该方法返回FlowFile,是下一个最高优先级的FlowFile用于处理. 若是没有 FlowFile 用于处理, 该方法将返回 null. 注意, 若是一个持续的FlowFiles流进入processor,也可能会返回null. 这在多个并发任务处理时会发生,此时其余任务已得到了 FlowFiles. 若是脚本要求有一个 FlowFile才能继续处理, 若是是session.get()获得null应该当即返回。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return

Jython

flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
# implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}

JRuby

flowFile = session.get()
if flowFile != nil
# All processing code goes here
end

 

Recipe: 获得多个 flow files

Use Case: 从queue(s)得到多个flow files用于ExecuteScript处理

Approach: 使用session对象的get(maxResults) 方法. 该方法从工做队列中返回最多 maxResults 个FlowFiles . 若是没有 FlowFiles 可用, 一个空的list 将被返回 (而不是返回 null).

注意: 若是多个输入队列存在, 在单个调用是否多个队列或单个队列将被拉去的行为是未指定的. 观察到的行为 (对于 NiFi 1.1.0+ 和之前的版本) 描述在 here.

Examples:

Groovy

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    flowFileList.each { flowFile ->
    // Process each FlowFile here
    }
}

Jython

flowFileList = session.get(100)
if not flowFileList.isEmpty():
    for flowFile in flowFileList:
        # Process each FlowFile here

Javascript

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    for each (var flowFile in flowFileList) {
        // Process each FlowFile here
    }
}

JRuby

flowFileList = session.get(100)
if !(flowFileList.isEmpty())
    flowFileList.each { |flowFile|
        # Process each FlowFile here}
end

 

Recipe: 建立一个新的FlowFile。

Use Case: 建立一个新的 FlowFile 发送到下一步的 processor

Approach: 使用session的 create() 方法. 该方法返回 FlowFile 对象, 以用于后续的处理操做。

Examples:

Groovy

flowFile = session.create()
// Additional processing here

Jython

flowFile = session.create()
# Additional processing here

Javascript

var flowFile = session.create();
// Additional processing here

JRuby

flowFile = session.create()
# Additional processing here

 

Recipe: 从父级FlowFile建立新的 FlowFile。

Use Case: You want to generate new FlowFile(s) based on an incoming FlowFile

Approach: 使用session的 create(parentFlowFile) 方法. 该方法得到父级 FlowFile 的引用,而后返回 and 新的派生 FlowFile 对象。新建立的 FlowFile 将继承父级的全部属性(除了 UUID). 该方法将自动建立一个 起源 FORK 事件或 起源 JOIN 事件, 取决于FlowFiles 是否从同一个parent建立,在 ProcessSession被提交的时候.

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
    newFlowFile = session.create(flowFile)
    // Additional processing here

Jython

flowFile = session.get()
if (flowFile != None):
    newFlowFile = session.create(flowFile)
    # Additional processing here

Javascript

var flowFile = session.get();
if (flowFile != null) {
    var newFlowFile = session.create(flowFile);
    // Additional processing here
}

JRuby

flowFile = session.get()
if flowFile != nil
    newFlowFile = session.create(flowFile)
    # Additional processing here
end

 

Recipe: 添加属性到 flow file

Use Case: 在已有的 flow file 上添加本身的属性。

Approach: 使用session对象的 putAttribute(flowFile, attributeKey, attributeValue) 方法。 该方法更新给定的 FlowFile's 属性,使用给出的 key/value 对来进行。

注意: 对象的 "uuid" 属性是固定的,而且不能修改; 若是key被命名为 "uuid", 将被忽略.

这里的FlowFile 对象是不可改变的; 这意味着,若是经过API更新了 FlowFile 的属性 (或其它的改变了) , 你将获得一个新版的FlowFile的新的引用。当转换FlowFiles到relationships时这是很是重要的。你必须保持对FlowFile的最新版本的引用, 你必须转换或者移除全部的FlowFiles的最后版本, 不然执行时将会获得错误信息。常常状况下, 该用于存储 FlowFile 引用变量将会被最后返回的版本覆盖 (中间的 FlowFile 应用将会被自动抛弃). 在这个例子中,你能够看到当添加属性时重用flowFile引用的技术。注意到当前的flowFile引用被传递给putAttribute() 方法. 这个结果FlowFile具备命名为 'myAttr'值为 'myValue'的属性。若是你有一个对象,能够序列化为String. 最终, 请注意若是你添加了多个属性, 最好建立一个Map,而后使用 putAllAttributes() 方法来进行赋值。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Jython

flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
    # implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}

JRuby

flowFile = session.get()
if flowFile != nil
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end

 

Recipe: 添加多个属性到一个flow file

Use Case: 向 flow file 添加自定义属性。

Approach: 使用 session对象的putAllAttributes(flowFile, attributeMap) 方法。该方法更新给定的FlowFile's 属性,以 key/value 对的方式存储在Map中返回。

注意:  "uuid" 对 FlowFile是固定的; 若是 key 被命名为 "uuid", 将会被忽略。

该技术建立了一个 Map (aka dictionary in Jython, hash in JRuby) 用于更新,而后调用putAllAttributes() 。这比对putAttribute() 对每个 key/value 遍历效率更高, 这将致使对每个属性调用时 FlowFile 都须要建立一个副本 (查看上面 FlowFile 不变性的讨论)。下面例子中的Map包含两个条目: myAttr1 和 myAttr2, 设为 '1' 而且第二个为 String (附着到方法签名,对key和value都要求 String)。 注意到session.transfer() 在这里并未指定 (所以下面的代码片断并不能工做), 查看下面的方法。

Examples:

Groovy

attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)

Jython

attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAllAttributes(flowFile, attrMap)
    # implicit return at the end

Javascript

var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()

if (flowFile != null) {
    flowFile = session.putAllAttributes(flowFile, attrMap)
}

JRuby

attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get()
if flowFile != nil
flowFile = session.putAllAttributes(flowFile, attrMap)
end

 

Recipe: 从 flow file 获得属性

Use Case: 得到flow file 的属性。

Approach: 使用FlowFile对象getAttribute(attributeKey) 。 该方法对于给定的attributeKey返回一个字符串值 , 若是没有找到相应的key就返回null. 下面的例子演示返回FlowFile的 "filename" 属性。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
myAttr = flowFile.getAttribute('filename')

Jython

flowFile = session.get()

if (flowFile != None):
    myAttr = flowFile.getAttribute('filename')
    # implicit return at the end

Javascript

var flowFile = session.get()

if (flowFile != null) {
    var myAttr = flowFile.getAttribute('filename')
}

JRuby

flowFile = session.get()

if flowFile != nil
myAttr = flowFile.getAttribute('filename')
end

 

Recipe: 从 flow file获得全部的属性

Use Case: 从flow file获得全部的属性。

Approach: 使用FlowFile对象的getAttributes() 方法。 该方法返回 Map 数据结构,由字符串的 keys 和 values组成, 表明一个FlowFile的属性的 key/value 值对。 下面的显示如何递归显示FlowFile的全部属性的Map的值。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
flowFile.getAttributes().each { key,value ->
    // Do something with the key/value pair
}

Jython

flowFile = session.get()

if (flowFile != None):
    for key,value in flowFile.getAttributes().iteritems():
    # Do something with key and/or value

# implicit return at the end

Javascript

var flowFile = session.get()

if (flowFile != null) {
    var attrs = flowFile.getAttributes();
    for each (var attrKey in attrs.keySet()) {
        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
    }
}

JRuby

flowFile = session.get()

if flowFile != nil
    flowFile.getAttributes().each 
    { |key,value|
        # Do something with key and/or value
    }
end

 

Recipe: 转移一个flow file 到 relationship

Use Case: 在处理完flow file (new or incoming)以后, 你但愿将flow file转移到 relationship ("success" or "failure"). 在这个简单的例子中,让咱们假定有一个变量叫作 "errorOccurred", 用于指示在哪一种 relationship下 FlowFile 将被转移。更多的错误处理技术在本系列文章的第二部分讨论。

Approach: 使用session对象的transfer(flowFile, relationship) 方法。基于给定的relationship,该方法将给定的FlowFile发送到适合的目标处理器队列。若是relationship通向不止一个目标,FlowFile的状态将被复制 ,从而每个目标都将收到一个 FlowFile的拷贝,所以也将具备惟一的标识符UUID。

注意: 最后,ExecuteScript将执行session.commit() 以进行操做的提交。你不须要在脚本内部执行session.commit() 来执行提交操做。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return

// Processing occurs here
if(errorOccurred) {
    session.transfer(flowFile, REL_FAILURE)
}
else {
    session.transfer(flowFile, REL_SUCCESS)
}

Jython

flowFile = session.get()

if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

Javascript

var flowFile = session.get();

if (flowFile != null) {
    // All processing code goes here
    if(errorOccurred) {
        session.transfer(flowFile, REL_FAILURE)
    }
    else {
        session.transfer(flowFile, REL_SUCCESS)
    }
}

JRuby

flowFile = session.get()
if flowFile != nil
    # All processing code goes here
    if errorOccurred
        session.transfer(flowFile, REL_FAILURE)
    else
        session.transfer(flowFile, REL_SUCCESS)
    end
end

 

Recipe: 发送消息到 log并制定日志级别

Use Case: 但愿报告一些事件、消息并经过日志框架写入。

Approach: 使用 log 的方法(), trace(), debug(), info(), 或 error() 完成。这些方法能够是单个的字符串或者字符串数组对象, 或字符串后面跟着Throwable的对象数组。第一个用于简单消息. 第二个用于一些动态对象(值)的log。在消息字符串中使用 "{}" 进行引用。这些用于对对象数组进行求值,当消息读到 "Found these things: {} {} {}" 而且 Object array 是 ['Hello',1,true], 那么logged 消息将是 "Found these things: Hello 1 true",第三种logging方法带一个 Throwable 参数, 这在例外被捕捉到而且但愿日志记录时使用。

Examples:

Groovy

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

Jython

from java.lang import Object
from jarray import array

objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)

Javascript

var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);

objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)

JRuby

log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)

但愿这些代码片断可以演示 NiFi API 在不一样脚本语言中的用法以及 flow file 操做。在后续的文章中,我将把这些方法所有放到一块儿,从而构成一个完整的脚本. 对于更多的例子, 用例和解释,请参考 my blog. 在本序列的下一篇文章中, 我将讨论如何读写flow files的内容, 以及错误处理技术. 但愿能帮到你!

英:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

相关文章
相关标签/搜索