在前面咱们看到了UDF、UDTF、UDAF的实现并非很简单,并且还要求对Java比较熟悉,而Hive设计的初衷是方便那些非Java人员使用。所以,Hive提供了另外一种数据处理方式——Streaming,这样就能够不须要编写Java代码了,其实Streaming处理方式能够支持不少语言。可是,Streaming的执行效率一般比对应编写的UDF或改写InputFormat对象的方式要低。管道中序列化而后反序列化数据一般时低效的。并且以一般的方式很难调试整个程序。python
Hive中提供了多种语法来使用Streaming,包括:app
可是,注意MAP()实际上并不是在Mapper阶段执行Streaming,正如REDUCE()实际上并不是在Reducer阶段执行Streaming。所以,相同的功能,一般建议使用TRANSFORM()语句,这样能够避免产生疑惑。函数
Streaming的实现须要TRANSFORM()函数和USING关键字,TRANSFORM()的参数是表的列名,USING关键字用于指定脚本。本节的数据仍然使用Hive UDF教程(一)中所使用的employee表。oop
例一:Streaming使用Linux命令spa
先看Streaming直接使用Linux系统中的命令cat来查询表,cat.q是HiveQL文件,内容以下:.net
SELECT TRANSFORM(e.name, e.salary) USING '/bin/cat' AS name, salary FROM employee e;
执行结果:设计
hive (mydb)> SOURCE cat.q; OK Time taken: 0.044 seconds Query ID = root_20160120000909_2de2d4f9-b50c-4ed1-a876-768c0127f067 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1453275977382_0001, Tracking URL = http://master:8088/proxy/application_1453275977382_0001/ Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job -kill job_1453275977382_0001 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2016-01-20 00:10:16,258 Stage-1 map = 0%, reduce = 0% 2016-01-20 00:10:22,942 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.12 sec MapReduce Total cumulative CPU time: 1 seconds 120 msec Ended Job = job_1453275977382_0001 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Cumulative CPU: 1.12 sec HDFS Read: 1040 HDFS Write: 139 SUCCESS Total MapReduce CPU Time Spent: 1 seconds 120 msec OK John Doe 100000.0 Mary Smith 80000.0 Todd Jones 70000.0 Bill King 60000.0 Boss Man 200000.0 Fred Finance 150000.0 Stacy Accountant 60000.0 Time taken: 24.758 seconds, Fetched: 7 row(s)
例二:Streaming使用Python脚本调试
下面,在对比下Hive的sum()函数,和使用sum.py的Python脚本执行状况,先看Hive的sum()函数执行:code
hive (mydb)> SELECT sum(salary) FROM employee; Query ID = root_20160120012525_1abf156b-d44b-4f1c-b2c2-3604e4c1bba0 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1453281391968_0002, Tracking URL = http://master:8088/proxy/application_1453281391968_0002/ Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job -kill job_1453281391968_0002 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2016-01-20 01:25:20,364 Stage-1 map = 0%, reduce = 0% 2016-01-20 01:25:31,620 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.55 sec 2016-01-20 01:25:42,394 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.73 sec MapReduce Total cumulative CPU time: 2 seconds 730 msec Ended Job = job_1453281391968_0002 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.73 sec HDFS Read: 1040 HDFS Write: 9 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 730 msec OK 720000.0 Time taken: 33.891 seconds, Fetched: 1 row(s)
而后,在看Streaming的方式执行,sum.py脚本:orm
#!/usr/bin/env python import sys def sum(arg): global total total += arg if __name__ == "__main__": total = 0.0 for arg in sys.stdin: sum(float(arg)) print total;
HiveQL脚本sum.q:
SELECT TRANSFORM(salary) USING 'python /root/experiment/hive/sum.py' AS total FROM employee;
最后是执行结果:
hive> source sum.q; OK Time taken: 0.022 seconds Query ID = root_20160120002626_0ced0b93-e4e8-4f3a-91d0-f2aaa06b5f11 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1453278047512_0002, Tracking URL = http://master:8088/proxy/application_1453278047512_0002/ Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job -kill job_1453278047512_0002 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2016-01-20 00:26:28,341 Stage-1 map = 0%, reduce = 0% 2016-01-20 00:26:36,185 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.4 sec MapReduce Total cumulative CPU time: 1 seconds 400 msec Ended Job = job_1453278047512_0002 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Cumulative CPU: 1.4 sec HDFS Read: 1040 HDFS Write: 9 SUCCESS Total MapReduce CPU Time Spent: 1 seconds 400 msec OK 720000.0 Time taken: 17.048 seconds, Fetched: 1 row(s)
使用transform的时候不能查询别的列,若是须要列a的话能够直接放到transform里,而后将其不做处理,直接输出便可
add file /home/xxx/udf/python/xxx.py; select transform(m, p, consume, cnt) using 'python xxx.py' as (mid, pid, trans_at, total_cnt) from xxx_table;