NiFi 脚本执行器使用指南 (part 2)

NiFi 脚本执行器使用指南 (part 2)

简介

ExecuteScript让NiFi能够执行脚原本完成数据流程任务,从而能够编写本身的任务节点而不只仅是采用已有的任务节点,具备很强的灵活性。html

本文是介绍使用ExecuteScript来完成任务的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的“菜谱” 包括:java

Part 1 - 介绍 NiFi API 和 FlowFilesapache

  • 从incoming queue获得flow file
  • 建立一个新的 flow files
  • 与 flow file attributes一块儿工做
  • 转换 flow file
  • 日志 Logging

Part 2 - FlowFile I/O 和 Error Handlingruby

  • 从 flow file 中读取
  • 写入 flow file
  • 读/写 flow file
  • Error Handling

Part 3 - 高级特征session

  • 使用动态属性
  • 添加模块
  • 状态管理
  • 存取控制器服务

FlowFile I/O简介

NiFi 的 Flow files 由两个主要部件组成:attributes 和 content. Attributes 是关于 content / flow file的元数据, 咱们在 Part 1 看到了如何使用 ExecuteScript 来操纵这个属性. flow file 的内容, 核心是一个 bytes集合,没有继承的 structure, schema, format, 等等. 不一样的 NiFi processors 假定输入的 flow files 具备特定的 schema/format (或者从 attributes肯定如 "mime.type" 或者经过其余的方法). 这些 processors 而后按照假定的格式对内容进行处理 (将返回 "failure" 到relationship,若是不是的话). 常常 processors 将输出  flow files 以特定的格式, 这在 processors' 描述中有相应的说明( NiFi documentation).app

flow files 的 Input 和 Output (I/O) 经过 ProcessSession API 提供,经过 ExecuteScript (查看 Part 1 获得更多的信息) 的"session" 变量来访问。一个机制是传递一个 callback 对象到session.read() 或 session.write()的调用。对于FlowFile将建立一个 InputStream 和/或 OutputStream, 这个callback 对象将被激活,使用相应的 callback 接口, 而后这个InputStream 和/或 OutputStream 的引用被传递到 callback函数使用. 这里有三个 callback 接口, 每个有本身的应用环境:函数

InputStreamCallbackspa

这个 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一个 InputStream,用于读取 flow file的内容. 该 interface 有一个单一方法:.net

void process(InputStream in) throws IOException

该 interface 提供一个被管理的 input stream. 这个input stream自动打开和关闭,也能够手动关闭. 这是从 flow file读取的方法, 而且不能被写回去。日志

一个例子就是当你但愿处理一个输入 flow file, 可是建立了多个输出output flow files, 好比 SplitText processor 那样.

OutputStreamCallback

该 interface 被用于session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream写入内容到 flow file. 该 interface 具备单一的方法:

void process(OutputStream out) throws IOException

该 interface 提供被管理的 output stream. 这个output stream 被自动打开和关闭,也能够手动关闭。 - 重要的一点是,若是任何 streams 包装了这个 streams,全部打开的资源应该被清理.

例如, 在ExecuteScript中被建立数据 , 来自于外部文件, 而不是一个 flow file. 而后你可使用 session.create() 去建立一个新的FlowFile, 而后 session.write( flowFile, outputStreamCallback) 用于添加内容.

StreamCallback

该 interface 用于 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,为 flow file提供内容的读取和写入. 该 interface 有一个单一的方法:

void process(InputStream in, OutputStream out) throws IOException

该 interface 提供被管理的 output stream. 这个output stream 被自动打开和关闭,也能够手动关闭。 - 重要的一点是,若是任何 streams 包装了这个 streams,全部打开的资源应该被清理.

例如,你想处理一个传入 flow file 而且使用新的内容覆盖, 如 EncryptContent processor 那样.

 

由于这些  callbacks 是 Java objects, 脚本将建立一个而且传入 session 方法, 下面的方法将使用不一样的脚本语言进行演示. 而且,这里还有其余的读写 flow files方法, 包括:

  • 使用 session.read(flowFile) 返回 InputStream. 取代 InputStreamCallback, 将返回 InputStream 用于读取. 你必须 (close, e.g.) 手动管理 InputStream.
  • 使用 session.importFrom(inputStream, flowFile) 从 InputStream 写入到 FlowFile. 这将替代 借助OutputStreamCallback的session.write() 的使用.

嗯,下面是一些具体的方法:

Recipes

Recipe: 在callback中读取输入flow file的内容

Use Case: 具备一个传入链接执行 ExecuteScript ,而且从队列中获得 flow file 的内容进行处理.

Approach: 使用session的read(flowFile, inputStreamCallback) 方法。一个 InputStreamCallback 对象须要被传入 read() 方法. 注意到,由于InputStreamCallback 是一个对象, 内容只在该对象中可见。 若是你须要在 read() 方法以外访问, 须要使用更为全局化的变量. 这里的例子讲来自flow file的所有内容存储到 String (使用 Apache Commons' IOUtils class).

注意: 对于大的 flow files, 这并非最好的技术方法; 应该只读取须要的数据,并按照适应的方法处理。好比 SplitText, 你应该一次读一行而且在 InputStreamCallback中处理, 或者 session.read(flowFile) 方法 获得 InputStream 的引用,从而在 callback以外处理.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()

if(!flowFile)return

def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

// Do something with text here
} as InputStreamCallback)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
    pass
def process(self, inputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

# Do something with text here
# end class

flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
# implicit return at the end

Javascript

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

var flowFile = session.get();

if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,new InputStreamCallback(function(inputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        // Do something with text here
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import org.apache.nifi.processor.io.InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class JRubyInputStreamCallback
include InputStreamCallback

def process(inputStream)
    text = IOUtils.toString(inputStream)
    # Do something with text here
    end
end

jrubyInputStreamCallback = JRubyInputStreamCallback.new
flowFile = session.get()
if flowFile != nil
    session.read(flowFile, jrubyInputStreamCallback)
end

 

Recipe: 使用callback写入内容到输出 flow file

Use Case: 为输出的 flow file建立内容.

Approach: 使用session的write(flowFile, outputStreamCallback) 方法。一个OutputStreamCallback 对象须要传递给 write() 方法. 注意,由于 OutputStreamCallback 是一个对象, 所以内容之灾对象内部可见. 若是你须要在 write() 方法以外访问, 使用更为全局化变量. 西面的例子写入 String 到 flowFile.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile) return

def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
        outputStream.write(text.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback

# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
    pass

def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class

flowFile = session.get()

if(flowFile != None):
    flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end

Javascript

var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();

if(flowFile != null) {
// Create a new OutputStreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) {
        outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback

# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback
include OutputStreamCallback

def process(outputStream)
    outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
end
end

jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
flowFile = session.get()
if flowFile != nil
    flowFile = session.write(flowFile, jrubyOutputStreamCallback)
end

 

Recipe: 使用回调进行内容覆盖输入 flow file

Use Case: 重用输入 flow file可是但愿修改内容并传递到输出的 flow file.

Approach: 使用session的write(flowFile, streamCallback) 方法。一个StreamCallback 对象须要传递给 write() 方法. StreamCallback 同时提供了InputStream (从输入的 flow file) 和 outputStream (下一版本的 flow file), 所以你可使用InputStream去取得 flow file的当前内容, 而后修改他们而且写会到 flow file. 这将覆盖 flow file 的内容, 所以对于追加内容要采用读入内容添加的方式, 或者使用不一样的方法 (使用 session.append() 而不是session.write() ).

注意,由于 StreamCallback 是一个对象, 所以内容之灾对象内部可见. 若是你须要在 write() 方法以外访问, 使用更为全局化变量.

这个例子将反转输入flowFile (假定为 String) 的内容,并将反转后的字符串写入到新版的 flowFile.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile) return

def text = 'Hello world!'
// Cast a closure with an inputStream and outputStream parameter to StreamCallback

flowFile = session.write(flowFile, {inputStream, outputStream ->
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass

def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class

flowFile = session.get()

if(flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())

# implicit return at the end

Javascript

var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();

if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) {
    var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback

# Define a subclass of StreamCallback for use in session.write()

class JRubyStreamCallback
include StreamCallback
def process(inputStream, outputStream)
    text = IOUtils.toString(inputStream)
    outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
end
end

jrubyStreamCallback = JRubyStreamCallback.new
flowFile = session.get()
if flowFile != nil
    flowFile = session.write(flowFile, jrubyStreamCallback)
end

 

Recipe: 处理脚本处理期间的错误

Use Case: 在 script ( data validation 或者出现一个 exception)运行时出现错误, 而且你但愿可以优雅滴处理.

Approach: 对于exceptions, 使用脚本语言的exception-handling 机制  (通常是try/catch 代码块). 对于 data validation, 可使用相似的方法, 可是定义一个boolean 变量,如 "valid" 以及 if/else 语句,而不是try/catch 语句. ExecuteScript 定义了 "success" and "failure" relationships; 通常状况下,你的处理将转移 "good" flow files 到 success,而 "bad" flow files 到 failure (记录错误在后续的操做中).

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
try {
    // Something that might throw an exception here
    // Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
}

Jython

flowFile = session.get()

if(flowFile != None):
try:
    # Something that might throw an exception here
    # Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
except:
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)

# implicit return at the end

Javascript

var flowFile = session.get();
if(flowFile != null) {
try {
    // Something that might throw an exception here
    // Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
}
}

JRuby

flowFile = session.get()

if flowFile != nil
begin
    # Something that might raise an exception here
    # Last operation is transfer to success (failures handled in the rescue block)
    session.transfer(flowFile, REL_SUCCESS)
    rescue Exception => e
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
end
end

但愿这里介绍的基本的FlowFile I/O 和 错误处理有用, 欢迎任何的建议和改进! 在下一篇文章中,我将讨论一些高级的特征,如动态属性, 模块, 状态管理, 和 Controller Services的存取. 但愿对你有用!

英:https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

相关文章
相关标签/搜索