ExecuteScript让NiFi能够执行脚原本完成数据流程任务,从而能够编写本身的任务节点而不只仅是采用已有的任务节点,具备很强的灵活性。html
本文是介绍使用ExecuteScript来完成任务的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的“菜谱” 包括:java
Part 1 - 介绍 NiFi API 和 FlowFilespython
Part 2 - FlowFile I/O 和 Error Handlingsql
Part 3 - 高级特征apache
本系列的前两篇文章涵盖了 flow file 的基本操做, 如读写属性和内容, 以及使用"session" 变量 ( ProcessSession对象)获取和转移 flow files . ExecuteScript还有不少其余的能力,这里对一部分做简要介绍.编程
其中一个能力叫作 dynamic properties, 或者称为用户定义属性. processor 的一些属性能够由用户设置 property name 和 value. 不是全部的processors 都支持和使用动态属性, 在 ExecuteScript 将传递动态属性做为变量,改变了引用 PropertyValue 对象,对应于property's value. 这里有两个重要事须要了解:api
Use Case: 在脚本中获得 dynamic property(如, 配置参数).缓存
Approach: 使用变量的PropertyValue对象的getValue() 方法. 该方法返回其字符串表明 dynamic property. 注意,若是Expression Language包含在字符串中, getValue() 将不会对其求值(参加下一个方法实现求职功能).ruby
Examples:服务器
Groovy
def myValue1 = myProperty1.value
Jython
myValue1 = myProperty1.getValue()
Javascript
var myValue1 = myProperty1.getValue()
JRuby
myValue1 = myProperty1.getValue()
Use Case: 使用脚本中的动态属性 dynamic property, 在输入 flow file 中引用attribute(s) .
Approach: 从变量的PropertyValue对象使用 evaluateAttributeExpressions(flowFile) 方法. 该方法接着调用getValue(), 返回动态属性的字符串表示,并且对 Expression Language constructs 进行了求值. 若是 flow file不可用, 可是变量在环境或Variable Registry被定义, 你能够无参数调用 evaluateAttributeExpressions() 。
Examples:
Groovy
def myValue1 = myProperty1.value def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value
Jython
myValue1 = myProperty1.getValue() myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
Javascript
var myValue1 = myProperty1.getValue() var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
JRuby
myValue1 = myProperty1.getValue() myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
ExecuteScript 的另外一个特征是具备添加外部模块到 classpath 的能力, 这将容许使用大量的第三方库、脚本等加强能力. 可是,每个脚本引擎处理模块的方法都是不同的, 所以须要分开讨论。整体上说, 主要有两种类型的模块, Java libraries (JARs) 和 scripts (以在 ExecuteScript中的同一种语言编写. 这里将讨论和显示不一样 script engines 如何进行处理:
Groovy
Groovy script engine (至少在 ExecuteScript中) 不支持导入其余的 Groovy scripts, 可是容许 JARs 添加到 classpath. 所以,对于外部Groovy projects, 考虑编译为bytecode,而后指向 classes 目录或者包装为 JAR.
当使用 Groovy, 这个 Module Directory 属性设为 comma-separated 的文件列表 (JARs) 和 folders. 若是folder 被指定, ExecuteScript 将发现该目录全部的 JARs 并添加进去. 这容许你添加第三方软件,哪怕包含不少个 JARs. Groovy的例子参见 this blog post.
Jython
Jython script engine (在 ExecuteScript) 目前仅支持导入纯 Python 模块, 不包含natively-compiled modules (如CPython),如 numpy 或 scipy. 目前也暂不支持 JARs, 这在未来版本中也许会考虑. 查看 this HCC post 获得更多细节. 在Module Directory property在执行前须要加载, 使用"import sys" 跟着 "sys.path.append" 对每个指定的模块位置进行加载.
若是 Python 已经安装, 能够将全部的安装好的纯 Python modules 添加进来,经过将 site-packages 目录加到Module Directory 属性便可, 如:
/usr/local/lib/python2.7/site-packages
而后,你的脚本就能 import 各类软件包了,如:
from user_agents import parse
Javascript
Javascript script engine (在 ExecuteScript), 容许一样的 JARs/folders设置,与 Groovy engine同样. 将查找JARs 以及指定的folder.
JRuby
JRuby script engine (在 ExecuteScript) 目前只容许单个的 JARs指定, 若是 folder 被指定,其中必定要有classes ( java compiler 须要能看见), 若是folder 包含 JARs将不会自动加入。目前, 没有pure Ruby 模块能被导入.
我但愿未来可以改进全部这些脚本引擎, 从而具备更为强大的功能和一致的用户体验.
NiFi (如0.5.0 ) 提供了为 Processors 和其余 NiFi 组件持久化一些信息从而实现组件的状态管理功能. 例如, QueryDatabaseTable processor 保存对大数据集的跟踪, 当下次运行时, 将只会获取哪些比原来(存储在 State Manager)更大的行的数据.
状态管理的一个重要概念是Scope. NiFi 组件能够选择存储它的状态在集群级别仍是本地级别. 注意,在独立的 NiFi 实例中, "cluster scope" 与 "local scope"是同样的. 这个 scope 选择的区别在于在一个数据流中,每一个结点的处理器是否须要共享状态信息. 若是集群中的实例不须要共享状态,就使用local scope. 在 Java,这些选项做为一个 enum变量 Scope提供, 所以,当引用 Scope.CLUSTER 和 Scope.LOCAL, 就意味着是集群模式或本地模式.
为了探究ExecuteScript (语言独立的例子以下)状态管理的特征 , 你能够得到 StateManager的引用,经过调用 ProcessContext的 getStateManager() 方法实现 (recall that each engine gets a variable named "context" with an instance of ProcessContext). 而后调用 StateManager 对象的下面方法:
void setState(Map<String, String> state, Scope scope) - 在给定的scope更新组件状态的值, 设置为给定的值. 注意,这个值是 Map 数据结构; 概念 "component state" 全部的 key/value键值对的 Map. 该 Map被一次所有更新,从而提供原子性.
StateMap getState(Scope scope) - 返回组件在给定scope的当前状态. 该方法永不会返回 null; 对于 StateMap 对象,若是 state没有被设置, StateMap's 版本将是 -1, 而 map的值将是 empty. 常常,一个新的 Map<String,String> 被建立来存储更新的值,而后setState()或 replace() 被调用.
boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) - 更新组件的状态值 (在给定的 scope)为新的值,仅在当前值与给定的 oldValue同样时执行. 若是 state 被更新为新的值, 返回true; 不然返回 false,若是state's value 不等于oldValue.
void clear(Scope scope) - 从给定的scope下,清除组件状态全部的键值.
Use Case: 脚本从状态管理器获得当前的 key/value 对,而后在 script 中使用(如更新等).
Approach: 使用ProcessContext的getStateManager()方法, 而后从 StateManager调用 getStateMap() , 再 toMap() 转换为Map<String,String>形式的key/value对. 注意,StateMap 也有 get(key) 方法去简化得到 value的方法, 可是不如 Map用的广泛。必须在 StateManager 一次性设置完毕.
Examples:
Groovy
import org.apache.nifi.components.state.Scope def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
Jython
from org.apache.nifi.components.state import Scope oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope'); var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
JRuby
java_import org.apache.nifi.components.state.Scope oldMap = context.stateManager.getState(Scope::LOCAL).toMap()
注意: 只有 Scope class 被明确地在脚本中引用, 所以这是惟一被imported的. 若是你引用了 StateManager, StateMap, 等等,你须要 import 这些 classes.
Use Case: 脚本但愿经过新的包含key/value的映射值对来更新 state map.
Approach: 为了获得当前的 StateMap 对象, 再次用ProcessContext调用 getStateManager() 方法, 而后 StateManager调用getStateMap() . 例子中假定为新的 Map, 可是使用上面的配方 (经过 toMap() 方法), 你可使用存在的值建立新的 Map, 而后用于更新想要的记录. 注意,若是没有当前map (i.e. the StateMap.getVersion() returns -1),replace() 将不会工做, 所以例子中检查并相应地调用 setState() 或 replace(). 当从ExecuteScript的新实例运行时,该StateMap 版本将会是 -1, 当单次执行后, 若是鼠标右键 ExecuteScript processor,而后选择 View State, 将看到以下所示的信息:
Examples:
Groovy
import org.apache.nifi.components.state.Scope def stateManager = context.stateManager def stateMap = stateManager.getState(Scope.CLUSTER) def newMap = ['myKey1': 'myValue1'] if (stateMap.version == -1) { stateManager.setState(newMap, Scope.CLUSTER); } else { stateManager.replace(stateMap, newMap, Scope.CLUSTER); }
Jython
from org.apache.nifi.components.state import Scope stateManager = context.stateManager stateMap = stateManager.getState(Scope.CLUSTER) newMap = {'myKey1': 'myValue1'} if stateMap.version == -1: stateManager.setState(newMap, Scope.CLUSTER) else: stateManager.replace(stateMap, newMap, Scope.CLUSTER)
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope'); var stateManager = context.stateManager; var stateMap = stateManager.getState(Scope.CLUSTER); var newMap = {'myKey1': 'myValue1'}; if (stateMap.version == -1) { stateManager.setState(newMap, Scope.CLUSTER); } else { stateManager.replace(stateMap, newMap, Scope.CLUSTER); }
JRuby
java_import org.apache.nifi.components.state.Scope stateManager = context.stateManager stateMap = stateManager.getState(Scope::CLUSTER) newMap = {'myKey1'=> 'myValue1'} if stateMap.version == -1 stateManager.setState(newMap, Scope::CLUSTER) else stateManager.replace(stateMap, newMap, Scope::CLUSTER) end
Use Case: 清空 state map全部的e key/value 值.
Approach: 使用ProcessContext的getStateManager()方法, 而后调用StateManager的clear(scope)方法。
Examples:
Groovy
import org.apache.nifi.components.state.Scope context.stateManager.clear(Scope.LOCAL)
Jython
from org.apache.nifi.components.state import Scopecontext.state Manager.clear(Scope.LOCAL)
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope'); context.stateManager.clear(Scope.LOCAL);
JRuby
java_import org.apache.nifi.components.state.Scope context.stateManager.clear(Scope::LOCAL)
在 NiFi ARchive (NAR) 结构中, Controller Services-控制器服务被暴露为 interfaces, 在 API JAR中. 例如 , DistributedCacheClient 是一个从 ControllerService扩展来的接口, 位于 nifi-distributed-cache-client-service-api JAR中, 在 nifi-standard-services-api-nar NAR. 其余的 NARs 若是想要引用interfaces (去建立新的 client implementation, e.g.) 必须指定 nifi-standard-services-api-nar 做为父级 NAR, 而后在processor的子模块提供 API JARs 的实例。
这是一些底层的细节,可能须要的以提高 Controller Services的使用, 我提到这些主要是两个缘由:
Processors 老是倾向于使用 Controller Service 实例建立 property (如PropertyDescriptor 对象) 而且调用 identifiesControllerService(class) . 当 UI component被渲染时, 将会发现全部的实现了指望接口的 Controller Services , component's ID 被使用, 友好显示名称被显示给用户.
对于ExecuteScript, 咱们可让用户选择Controller Service 实例,经过让他指定名称或者 ID 来实现. 若是咱们容许用户指定name, 脚本将不得不执行一个查询Controller Service实例列表去找到匹配名称的元素。 这在上面的博客中提到了, 这里再也不重复. 若是用户输入实例的ID, 而后 (在 NiFi 1.0.0) 将会更加容易滴匹配对象并存取,在下面将会看到. 这个例子将使用DistributedMapCacheClientService 实例为 "distMapClient", 链接到DistributedMapCacheServer 实例 (在标准的缺省配置下, localhost:4557), 这里 client instance 的ID为 93db6734-0159-1000-b46f-78a8af3b69ed:
在ExecuteScript 配置中, dynamic property被建立, 名为 "clientServiceId" 而且设为 93db6734-0159-1000-b46f-78a8af3b69ed:
而后咱们使用clientServiceId.asControllerService(DistributedMapCacheClient), 这里参数是对DistributedMapCacheClient类对象的引用. 例如, 我有一个预先填充的缓存,字符串 key 'a' 设为字符串值 'hello'. 让 Groovy script 使用 DistributedMapCacheServer进行工做, 请查看 my article here.
一旦咱们有了一个 DistributedMapCacheClient 实例, 而后就能够调用get(key, serializer, deserializer)去获取值. 在这个例子中,由于keys 和 values 都是Strings, 咱们只须要一个 Serializer<String> 和 Deserializer<String> 实例传给 get() 方法. 该方法对于全部语言都是同样的,经过 StreamCallback 实例的建立(在本系列文章的 Part 2). 这个将从预先填充的服务器获得 key 'a' 的值,而且记录值("Result = hello")。
Use Case: 用户发布值到 DistributedMapCacheServer (如配置数据, e.g.),而后使用脚本进行访问.
Approach: 使用上面描述的方法,建立一个StringSerializer 和 StringDeserializer 对象, 而后经过ID获得DistributedMapCacheClientService 实例, 而后调用服务的 get() 方法. 记录下结果到日志,方便后面查看.
Examples:
Groovy
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient import org.apache.nifi.distributed.cache.client.Serializer import org.apache.nifi.distributed.cache.client.Deserializer import java.nio.charset.StandardCharsets def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String> def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String> def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient) def result = myDistClient.get('a', StringSerializer, StringDeserializer) log.info("Result = $result")
Jython
from org.python.core.util import StringUtil from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer # Define a subclass of Serializer for use in the client's get() method class StringSerializer(Serializer): def __init__(self): pass def serialize(self, value, out): out.write(value) # Define a subclass of Deserializer for use in the client's get() method class StringDeserializer(Deserializer): def __init__(self): pass def deserialize(self, bytes): return StringUtil.fromBytes(bytes) myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient) result = myDistClient.get('a', StringSerializer(), StringDeserializer()) log.info('Result = ' + str(result))
Javascript
var DistributedMapCacheClient = Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient'); var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer'); var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer'); var StandardCharsets = Java.type('java.nio.charset.StandardCharsets'); var StringSerializer = new Serializer(function(value, out) { out.write(value.getBytes(StandardCharsets.UTF_8)); }) var StringDeserializer = new Deserializer(function(arr) { // For some reason I had to build a string from the character codes in the "arr" array var s = ""; for(var i = 0; i < arr.length; i++) { s = s + String.fromCharCode(arr[i]); } return s; }) var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class); var result = myDistClient.get('a', StringSerializer, StringDeserializer); log.info("Result = "+ result);
JRuby
java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient java_import org.apache.nifi.distributed.cache.client.Serializer java_import org.apache.nifi.distributed.cache.client.Deserializer java_import java.nio.charset.StandardCharsets # Define a subclass of Serializer for use in the client's get() method class StringSerializer include Serializer def serialize(value, out) out.write(value.to_java.getBytes(StandardCharsets::UTF_8)) end end # Define a subclass of Deserializer for use in the client's get() method class StringDeserializer include Deserializer def deserialize(bytes) bytes.to_s end end myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class) result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new) log.info('Result = ' + result)
本文包含了更为复杂的一些例子,描述了如何使用支持的多种语言与NiFi API进行交互. 我可能添加一些其余的内容到本序列, 关于 scripting processors, 一些额外的特征可能被改进或添加进来 (如 ScriptedReportingTask将很快可用!). 我一向地欢迎任何问题、建议、说明等等. 但愿你能喜欢!