工做流调度引擎---Oozie

Oozie使用教程java

一.   Oozie简介node

Apache Oozie是用于Hadoop平台的一种工做流调度引擎。linux

  1. 做用

- 统一调度hadoop系统中常见的mr任务启动hdfs操做、shell调度、hive操做等。web

- 使得复杂的依赖关系时间触发事件触发使用xml语言进行表达开发效率提升。sql

- 一组任务使用一个DAG来表示,使用图形表达流程逻辑更加清晰。shell

- 支持不少种任务调度,能完成大部分hadoop任务处理。apache

- 程序定义支持EL常量和函数,表达更加丰富。浏览器

  1. 架构

 

 

  1. 访问

- 经过浏览器访问 http://master:11000/oozie/bash

 

 

- 经过HUE访问架构

 

 

  1. 概念

- workflow:工做流

- coordinator:多个workflow能够组成一个coordinator,能够把前几个workflow的输出做为后- 一个workflow的输入,也能够定义workflow的触发条件,来作定时触发

- bundle:是对一堆coordinator的抽象

二.   Oozie操做

  1. Oozie shell

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.4" name="shell-wf">

    <start to="shell-node"/>

    <action name="shell-node">

        <shell xmlns="uri:oozie:shell-action:0.2">

            <job-tracker>${jobTracker}</job-tracker>

            <name-node>${nameNode}</name-node>

            <configuration>

                <property>

                    <name>mapred.job.queue.name</name>

                    <value>${queueName}</value>

                </property>

            </configuration>

            <exec>echo</exec>

            <argument>my_output=Hello Oozie</argument>

            <capture-output/>

        </shell>

        <ok to="check-output"/>

        <error to="fail"/>

    </action>

    <decision name="check-output">

        <switch>

            <case to="end">

                ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}

            </case>

            <default to="fail-output"/>

        </switch>

    </decision>

    <kill name="fail">

        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

    </kill>

    <kill name="fail-output">

        <message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>

    </kill>

    <end name="end"/>

</workflow-app>

 

 

- 执行oozie cli命令

 

 

执行命令后会返回一个job的id,在web的监控页面或Hue的页面能够查看其信息。

 

 

Job的有向无环图:

 

 

  1. Oozie fs

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="fs">
    <start to="fs-node"/>
    <action name="fs-node">
      <fs>
       <delete path='/home/kongc/oozie'/>
         <mkdir path='/home/kongc/oozie1'/>
         <move source='/home/kongc/spark-application' target='/home/kongc/oozie1'/>
      </fs>
      <ok to="end"/>
      <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 执行oozie cli命令

  1. Oozie Sqoop

- 编写job.properties文件

 

 

- 编写配置文件

#HSQL Database Engine 1.8.0.5
#Tue Oct 05 11:20:19 SGT 2010
hsqldb.script_format=0
runtime.gc_interval=0
sql.enforce_strict_size=false
hsqldb.cache_size_scale=8
readonly=false
hsqldb.nio_data_file=true
hsqldb.cache_scale=14
version=1.8.0
hsqldb.default_table_type=memory
hsqldb.cache_file_scale=1
hsqldb.log_size=200
modified=no
hsqldb.cache_version=1.7.0
hsqldb.original_version=1.8.0
hsqldb.compatible_version=1.8.0

 

 

- 编写sql文件

CREATE SCHEMA PUBLIC AUTHORIZATION DBA
CREATE MEMORY TABLE TT(I INTEGER NOT NULL PRIMARY KEY,S VARCHAR(256))
CREATE USER SA PASSWORD ""
GRANT DBA TO SA
SET WRITE_DELAY 10
SET SCHEMA PUBLIC
INSERT INTO TT VALUES(1,'a')
INSERT INTO TT VALUES(2,'a')
INSERT INTO TT VALUES(3,'a')

 

 

- 编写workflow.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf">
    <start to="sqoop-node"/>
 
    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/sqoop"/>
                <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir /user/oozie/${examplesRoot}/output-data/sqoop -m 1</command>
            <file>db.hsqldb.properties#db.hsqldb.properties</file>
            <file>db.hsqldb.script#db.hsqldb.script</file>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 执行oozie cli命令

  1. Oozie Java

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-kc">
    <start to="java-node"/>
    <action name="java-node">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>org.apache.oozie.example.DemoJavaMain</main-class>
            <arg>Hello</arg>
            <arg>Oozie!</arg>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 执行oozie cli命令

  1. Oozie Hive

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf">
    <start to="hive2-node"/>
 
    <action name="hive2-node">
        <hive2 xmlns="uri:oozie:hive2-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/hive2"/>
                <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <jdbc-url>${jdbcURL}</jdbc-url>
            <script>script.q</script>
            <param>INPUT=/user/oozie/${examplesRoot}/input-data/table</param>
            <param>OUTPUT=/user/oozie/${examplesRoot}/output-data/hive2</param>
        </hive2>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 编写hive脚本

INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM test_machine;

 

 

- 执行oozie cli命令

  1. Oozie Impala

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app name="shell-impala" xmlns="uri:oozie:workflow:0.4">
      <start to="shell-impala-invalidate"/>
      <action name="shell-impala-invalidate">
               <shell xmlns="uri:oozie:shell-action:0.1">
                         <job-tracker>${jobTracker}</job-tracker>
                         <name-node>${nameNode}</name-node>
                         <configuration>
                                  <property>
                                            <name>mapred.job.queue.name</name>
                                            <value>${queueName}</value>
                                  </property>
                         </configuration>
                         <exec>${EXEC}</exec>
                         <file>${EXEC}#${EXEC}</file>
               </shell>
               <ok to="end"/>
               <error to="kill"/>
      </action>
      <kill name="kill">
               <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
      </kill>
      <end name="end"/>
</workflow-app>

 

 

- 编写impala脚本文件

#!/bin/bash
impala-shell -i slave2:21000 -q "
select count(*) from test_machine"
echo 'Hello Shell'

 

 

- 执行oozie cli命令

  1. Oozie MapReduce

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wyl">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.SampleMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.SampleReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 执行oozie cli命令

  1. Oozie Spark

- 编写job.properties文件

 

 

- 编写workflow.xml文件

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
    <start to='spark-node' />
 
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/spark"/>
            </prepare>
            <master>${master}</master>
            <name>Spark-FileCopy</name>
            <class>org.apache.oozie.example.SparkFileCopy</class>
            <jar>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
            <arg>${nameNode}/user/oozie/${examplesRoot}/input-data/text/data.txt</arg>
            <arg>${nameNode}/user/oozie/${examplesRoot}/output-data/spark</arg>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
 
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>

 

 

- 执行oozie cli命令

  1. Oozie 定时任务

-    定义job.properties

 nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
examplesRoot=examples
 
oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/aggregator/coordinator.xml
start=2010-01-01T01:00Z
end=2010-01-01T03:00Z

 

 

-    定义coordinator.xml

 <coordinator-app name="aggregator-coord" frequency="${coord:hours(1)}" start="${start}" end="${end}" timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.2">
    <controls>
        <concurrency>1</concurrency>
    </controls>
 
    <datasets>
        <dataset name="raw-logs" frequency="${coord:minutes(20)}" initial-instance="2010-01-01T00:00Z" timezone="UTC">
            <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
        <dataset name="aggregated-logs" frequency="${coord:hours(1)}" initial-instance="2010-01-01T01:00Z" timezone="UTC">
            <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
        </dataset>
    </datasets>
 
    <input-events>
        <data-in name="input" dataset="raw-logs">
            <start-instance>${coord:current(-2)}</start-instance>
            <end-instance>${coord:current(0)}</end-instance>
        </data-in>
    </input-events>
 
    <output-events>
        <data-out name="output" dataset="aggregated-logs">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
 
    <action>
        <workflow>
            <app-path>${nameNode}/user/${coord:user()}/${examplesRoot}/apps/aggregator</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>inputData</name>
                    <value>${coord:dataIn('input')}</value>
                </property>
                <property>
                    <name>outputData</name>
                    <value>${coord:dataOut('output')}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

 

 

三.   Oozie实例

  1. 设计工做流

 

 

  1. 编写job.properties文件
 nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
examplesRoot=examples
streamingMapper=/bin/cat
streamingReducer=/usr/bin/wc
 
oozie.use.system.libpath=true
 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/demo
 

 

  1. 在workflow.xml文件定义节点
 <workflow-app xmlns="uri:oozie:workflow:0.2" name="demo-wf">
 
    <start to="cleanup-node"/>
 
    <action name="cleanup-node">
        <fs>
            <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo"/>
        </fs>
        <ok to="fork-node"/>
        <error to="fail"/>
    </action>
 
    <fork name="fork-node">
        <path start="pig-node"/>
        <path start="streaming-node"/>
    </fork>
 
    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/pig-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.map.output.compress</name>
                    <value>false</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=/user/oozie/${examplesRoot}/input-data/text</param>
            <param>OUTPUT=/user/oozie/${examplesRoot}/output-data/demo/pig-node</param>
        </pig>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>
 
    <action name="streaming-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/streaming-node"/>
            </prepare>
            <streaming>
                <mapper>${streamingMapper}</mapper>
                <reducer>${streamingReducer}</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
 
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>
 
    <join name="join-node" to="mr-node"/>
    
    
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/mr-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
 
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.DemoMapper</value>
                </property>
                <property>
                    <name>mapred.mapoutput
.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.DemoReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/pig-node,/user/oozie/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/mr-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="decision-node"/>
        <error to="fail"/>
    </action>
 
    <decision name="decision-node">
        <switch>
            <case to="hdfs-node">${fs:exists(concat(nameNode, '/user/oozie/examples/output-data/demo/mr-node')) == "true"}</case>
            <default to="end"/>
        </switch>
    </decision>
 
    <action name="hdfs-node">
        <fs>
            <move source="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/mr-node"
                  target="/user/oozie/${examplesRoot}/output-data/demo/final-data"/>
        </fs>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
 
    <end name="end"/>
 
</workflow-app>

 

 

  1. 提交命令

Hue页面提交做业

 

 

 

  1. 查看执行状态

 

 

四.   总结

  1. EL函数

- 基本的EL函数

String firstNotNull(String value1, String value2)

String concat(String s1, String s2)

String replaceAll(String src, String regex, String replacement)

String appendAll(String src, String append, String delimeter)

String trim(String s)

String urlEncode(String s)

String timestamp()

String toJsonStr(Map) (since Oozie 3.3)

String toPropertiesStr(Map) (since Oozie 3.3)

String toConfigurationStr(Map) (since Oozie 3.3)

 

- WorkFlow EL

String wf:id() – 返回当前workflow做业ID

String wf:name() – 返回当前workflow做业NAME

String wf:appPath() – 返回当前workflow的路径

String wf:conf(String name) – 获取当前workflow的完整配置信息

String wf:user() – 返回启动当前job的用户

String wf:callback(String stateVar) – 返回结点的回调URL,其中参数为动做指定的退出状态

int wf:run() – 返回workflow的运行编号,正常状态为0

Map wf:actionData(String node) – 返回当前节点完成时输出的信息

int wf:actionExternalStatus(String node) – 返回当前节点的状态

String wf:lastErrorNode() – 返回最后一个ERROR状态推出的节点名称

String wf:errorCode(String node) – 返回指定节点执行job的错误码,没有则返回空

String wf:errorMessage(String message) – 返回执行节点执行job的错误信息,没有则返回空

- HDFS EL

boolean fs:exists(String path)

boolean fs:isDir(String path)

long fs:dirSize(String path) – 目录则返回目录下全部文件字节数;不然返回-1

long fs:fileSize(String path) – 文件则返回文件字节数;不然返回-1

long fs:blockSize(String path) – 文件则返回文件块的字节数;不然返回

  1. 注意事项

- job.properties文件能够不上传到hdfs中,是在执行oozie job ...... -config时,批定的linux本地路径

- workflow.xml文件,必定要上传到job.properties的oozie.wf.application.path对应的hdfs目录下。

- job.properties中的oozie.use.system.libpath=true指定oozie使用系统的共享目录。

- job.properties中的oozie.libpath=${nameNode}/user/${user.name}/apps/mymr,能够用来执行mr时,做业导出的jar包存放位置,不然可能报找不到类的错误。

- oozie调度做业时,本质也是启动一个mapreduce做业来调度,workflow.xml中设置的队列名称为调度做业mr的队列名称。因此若是想让做业运行在指定的队列时,须要在mr或hive中指定好。

相关文章
相关标签/搜索