Apache NiFi 已经有一些远程命令的执行器,如 ExecuteProcess 和 ExecuteStreamCommand。能够经过操做系统的ssh命令远程执行脚本,还能够带上不一样的参数。一般,这经过一个基于bash脚本执行器来执行,能够带多个ssh命令。html
另一种方法,可使用 ExecuteScript 或者 InvokeScriptedProcessor, 经过第三方库经过ssh去执行远程命令。本文中, 我使用 Groovy 做为脚本语言而且使用 Sshoogr, Sshoogr用于Groovy的 SSH DSL。
Sshoogr 能够经过 SDK Manager 安装或下载 (例如从 Maven Central ). 关键须要全部的支持库 (包括Sshoogr 和依赖库)在同一个目录中。将 ExecuteScript's Module Directory 属性指向该目录, 而后全部的 JARs 都将可以在 Groovy script中使用。
为了使这个例子更加灵活,我采用了一些常规的作法。首先, 将要执行的commands放在流文件的每一行中。而后, 配置参数 (如hostname, username, password, 以及 port) 做为动态属性添加到ExecuteScript。 注意,password 属性在这里大小写不敏感; 若是愿意, 使用InvokeScriptedProcessor而且明确地添加这个属性 (指明 password 大小写敏感).git
为了 Sshoogr 可以工做 (至少在这个例子中), 须要远端节点的 RSA key 配置在NiFi 用户的 ~/.ssh/known_hosts 文件中。觉得操做系统的差别, 有些状况下 SSH 链接将失败,主要由于严格的 host key 检查, 所以在 script 咱们将在 Sshoogr中关闭这个功能。github
回到script. 我启动一个标准的代码来获取输入的 flow file, 而后使用session.read() 得到每一行,保存到一个命令的列表对象中:apache
def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile, {inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback)
而后关闭严格host key 检查:bash
options.trustUnknownHosts = true
而后使用 Sshoogr DSL 去执行命令(上一步中,从 flow file读取的列表):session
def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true, failOnError: false, command: commands.join('\n')) }
这里使用前述的 ExecuteScript 动态属性 (sshHostname, sshUsername, sshPassword, sshPort)。后面, 有一个截屏显示被加到了ExecuteScript的 配置之中.
这个 exec() 方法是Sshoogr的强大方法. 这里但愿命令表的输出放到outgoing的flowfile. 设置"failOnError" 为 false以让全部的命令都会尝试执行, 但你也许但愿设置为 true,这样命令失败时后续的将再也不执行. 这个最终的参数是"command". 若是你不使用 "named-parameter" (aka Map) 的exec() 版本, 而只是执行命令 commands (如,不设置 "showOutput" ), 那么 exec() 将接收一个集合:ssh
exec(commands)
在咱们, exec() 方法转换"command" 参数为字符串, 替代List "commands", 将其转换为换行符分割的字符串,使用 join('\n')进行转换。
script 的下一部分是复写传入的 flow file, 若是我没有建立一个新的. 若是使用ExecuteScript 建立了一个新的 flow file, 确保移除原来的. 若是使用 InvokeScriptedProcessor, 你须要定义 "original" 关系而且路由原来的 incoming flow file 到新的。maven
flowFile = session.write(flowFile, { outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('\n'.bytes) } } as OutputStreamCallback)
最后, 使用 result.exitStatus, 咱们确认下路由到 flow file 是成功仍是失败:oop
session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
这个例子中所有的 script 以下:spa
import static com.aestasit.infrastructure.ssh.DefaultSsh.* def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile, {inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback) options.trustUnknownHosts = true def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true, failOnError: false, command: commands.join('\n')) } flowFile = session.write(flowFile, { outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('\n'.bytes) } } as OutputStreamCallback) session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
搬到 ExecuteScript 的配置中:
这里我链接到 Hortonworks Data Platform (HDP) Sandbox来运行Hadoop commands, 缺省的设置如上所示,我将 script 黏贴进 Script Body, 将 Module Directory 指向个人Sshoogr下载目录. 我添加这些到一个简单数据流,该数据流建立了一个 flow file, 填充 commands, 在远端执行, 而后从session中返回的output输出到 logs :
这个例子中,我运行下面的两个命令:
hadoop fs -ls /user echo "Hello World!" > here_i_am.txt
在 log中 (after LogAttribute runs with payload included), 看见输出以下:
在sandbox,我看见文件已被建立:
如你所见,本文描述了使用 ExecuteScript 采用 Groovy 和 Sshoogr 去执行存储在输入流文件中的远程命令. 我建立了 Gist of the template, 不过这是用 Apache NiFi 1.0的beta版本建立的,有可能没法在其余版本中使用(如0.x). 但这里已经包含了全部的内容,你能够容易地建立本身的数据流。
原文参考:
http://funnifi.blogspot.jp/2016/08/executing-remote-commands-in-nifi-with.html