Hive UDAF开发详解

说明

这篇文章是来自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不严格翻译,由于翻译的文章示例写得比较通俗易懂,此外,我把本身对于Hive的UDAF理解穿插到文章里面。html

udfa是Hive中用户自定义的汇集函数,hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF由于使用Java反射致使性能损失,并且有些特性不能使用,已经被弃用了;在这篇博文中咱们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到如下两个抽象类:java

[java] view plain copygit

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver  github

  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  sql

源码连接

博文中的全部的代码和数据能够在如下连接找到:hive examples
shell

示例数据准备

首先先建立一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。apache

[plain] view plain copyapi

  1. ~$ cat ./people.txt  跨域

  2.   

  3. John Smith  app

  4. John and Ann White  

  5. Ted Green  

  6. Dorothy  

把该文件上载到hdfs目录/user/matthew/people中:

[plain] view plain copy

  1. hadoop fs -mkdir people  

  2. hadoop fs -put ./people.txt people  

下面要建立hive外部表,在hive shell中执行


[sql] view plain copy

  1. CREATE EXTERNAL TABLE people (name string)  

  2. ROW FORMAT DELIMITED FIELDS   

  3.     TERMINATED BY '\t'   

  4.     ESCAPED BY ''   

  5.     LINES TERMINATED BY '\n'  

  6. STORED AS TEXTFILE   

  7. LOCATION '/user/matthew/people';  


相关抽象类介绍

建立一个GenericUDAF必须先了解如下两个抽象类:

[java] view plain copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver   

[java] view plain copy

  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  

为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助咱们写好并隐藏mapreduce,向上提供简洁的sql函数,因此咱们要结合Mapper、Combiner与Reducer来帮助咱们理解这个函数。要记住在Hadoop集群中有若干台机器,在不一样的机器上Mapper与Reducer任务独立运行。

因此大致上来讲,这个UDAF函数读取数据(mapper),汇集一堆mapper输出到部分汇集结果(combiner),而且最终建立一个最终的汇集结果(reducer)。由于咱们跨域多个combiner进行汇集,因此咱们须要保存部分汇集结果。

AbstractGenericUDAFResolver

Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪一个Evaluator进行处理。

[java] view plain copy

  1. <span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>  

GenericUDAFEvaluator

UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。

在理解Evaluator以前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。


ObjectInspector

做用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不一样的输入输出格式,不一样的Operator上使用不一样的格式。能够参考这两篇文章:first post on Hive UDFsHive中ObjectInspector的做用,里面有关于objectinspector的介绍。

Model

Model表明了UDAF在mapreduce的各个阶段。

[java] view plain copy

  1. public static enum Mode {  

  2.     /** 

  3.      * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 

  4.      * 将会调用iterate()和terminatePartial() 

  5.      */  

  6.     PARTIAL1,  

  7.         /** 

  8.      * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: 

  9.      * 将会调用merge() 和 terminatePartial()  

  10.      */  

  11.     PARTIAL2,  

  12.         /** 

  13.      * FINAL: mapreduce的reduce阶段:从部分数据的聚合到彻底聚合  

  14.      * 将会调用merge()和terminate() 

  15.      */  

  16.     FINAL,  

  17.         /** 

  18.      * COMPLETE: 若是出现了这个阶段,表示mapreduce只有map,没有reduce,因此map端就直接出结果了:从原始数据直接到彻底聚合 

  19.       * 将会调用 iterate()和terminate() 

  20.      */  

  21.     COMPLETE  

  22.   };  

通常状况下,完整的UDAF逻辑是一个mapreduce过程,若是有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),若是还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些状况下的mapreduce,只有mapper,而没有reducer,因此就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

GenericUDAFEvaluator的方法

[java] view plain copy

  1. // 肯定各个阶段输入输出参数的数据格式ObjectInspectors  

  2. public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;  

  3.   

  4. // 保存数据汇集结果的类  

  5. abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  

  6.   

  7. // 重置汇集结果  

  8. public void reset(AggregationBuffer agg) throws HiveException;  

  9.   

  10. // map阶段,迭代处理输入sql传过来的列数据  

  11. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  

  12.   

  13. // map与combiner结束返回结果,获得部分数据汇集结果  

  14. public Object terminatePartial(AggregationBuffer agg) throws HiveException;  

  15.   

  16. // combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。  

  17. public void merge(AggregationBuffer agg, Object partial) throws HiveException;  

  18.   

  19. // reducer阶段,输出最终结果  

  20. public Object terminate(AggregationBuffer agg) throws HiveException;  

图解Model与Evaluator关系


Model各阶段对应Evaluator方法调用




Evaluator各个阶段下处理mapreduce流程

实例

下面将讲述一个汇集函数UDAF的实例,咱们将计算people这张表中的name列字母的个数。

下面的函数代码是计算指定列中字符的总数(包括空格)

代码

[java] view plain copy

  1. @Description(name = "letters", value = "_FUNC_(expr) - 返回该列中全部字符串的字符总数")  

  2. public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {  

  3.   

  4.     @Override  

  5.     public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  

  6.             throws SemanticException {  

  7.         if (parameters.length != 1) {  

  8.             throw new UDFArgumentTypeException(parameters.length - 1,  

  9.                     "Exactly one argument is expected.");  

  10.         }  

  11.           

  12.         ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);  

  13.           

  14.         if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){  

  15.             throw new UDFArgumentTypeException(0,  

  16.                             "Argument must be PRIMITIVE, but "  

  17.                             + oi.getCategory().name()  

  18.                             + " was passed.");  

  19.         }  

  20.           

  21.         PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;  

  22.           

  23.         if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){  

  24.             throw new UDFArgumentTypeException(0,  

  25.                             "Argument must be String, but "  

  26.                             + inputOI.getPrimitiveCategory().name()  

  27.                             + " was passed.");  

  28.         }  

  29.           

  30.         return new TotalNumOfLettersEvaluator();  

  31.     }  

  32.   

  33.     public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {  

  34.   

  35.         PrimitiveObjectInspector inputOI;  

  36.         ObjectInspector outputOI;  

  37.         PrimitiveObjectInspector integerOI;  

  38.           

  39.         int total = 0;  

  40.   

  41.         @Override  

  42.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  

  43.                 throws HiveException {  

  44.               

  45.             assert (parameters.length == 1);  

  46.             super.init(m, parameters);  

  47.              

  48.              //map阶段读取sql列,输入为String基础数据格式  

  49.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  

  50.                 inputOI = (PrimitiveObjectInspector) parameters[0];  

  51.             } else {  

  52.             //其他阶段,输入为Integer基础数据格式  

  53.                 integerOI = (PrimitiveObjectInspector) parameters[0];  

  54.             }  

  55.   

  56.              // 指定各个阶段输出数据格式都为Integer类型  

  57.             outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  

  58.                     ObjectInspectorOptions.JAVA);  

  59.             return outputOI;  

  60.   

  61.         }  

  62.   

  63.         /** 

  64.          * 存储当前字符总数的类 

  65.          */  

  66.         static class LetterSumAgg implements AggregationBuffer {  

  67.             int sum = 0;  

  68.             void add(int num){  

  69.                 sum += num;  

  70.             }  

  71.         }  

  72.   

  73.         @Override  

  74.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {  

  75.             LetterSumAgg result = new LetterSumAgg();  

  76.             return result;  

  77.         }  

  78.   

  79.         @Override  

  80.         public void reset(AggregationBuffer agg) throws HiveException {  

  81.             LetterSumAgg myagg = new LetterSumAgg();  

  82.         }  

  83.           

  84.         private boolean warned = false;  

  85.   

  86.         @Override  

  87.         public void iterate(AggregationBuffer agg, Object[] parameters)  

  88.                 throws HiveException {  

  89.             assert (parameters.length == 1);  

  90.             if (parameters[0] != null) {  

  91.                 LetterSumAgg myagg = (LetterSumAgg) agg;  

  92.                 Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  

  93.                 myagg.add(String.valueOf(p1).length());  

  94.             }  

  95.         }  

  96.   

  97.         @Override  

  98.         public Object terminatePartial(AggregationBuffer agg) throws HiveException {  

  99.             LetterSumAgg myagg = (LetterSumAgg) agg;  

  100.             total += myagg.sum;  

  101.             return total;  

  102.         }  

  103.   

  104.         @Override  

  105.         public void merge(AggregationBuffer agg, Object partial)  

  106.                 throws HiveException {  

  107.             if (partial != null) {  

  108.                   

  109.                 LetterSumAgg myagg1 = (LetterSumAgg) agg;  

  110.                   

  111.                 Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  

  112.                   

  113.                 LetterSumAgg myagg2 = new LetterSumAgg();  

  114.                   

  115.                 myagg2.add(partialSum);  

  116.                 myagg1.add(myagg2.sum);  

  117.             }  

  118.         }  

  119.   

  120.         @Override  

  121.         public Object terminate(AggregationBuffer agg) throws HiveException {  

  122.             LetterSumAgg myagg = (LetterSumAgg) agg;  

  123.             total = myagg.sum;  

  124.             return myagg.sum;  

  125.         }  

  126.   

  127.     }  

  128. }  


代码说明

这里有一些关于combiner的资源,Philippe Adjiman 讲得不错。


AggregationBuffer 容许咱们保存中间结果,经过定义咱们的buffer,咱们能够处理任何格式的数据,在代码例子中字符总数保存在AggregationBuffer 。


[java] view plain copy

  1. /** 

  2. * 保存当前字符总数的类 

  3. */  

  4. static class LetterSumAgg implements AggregationBuffer {  

  5.     int sum = 0;  

  6.     void add(int num){  

  7.         sum += num;  

  8.     }  

  9. }  


这意味着UDAF在不一样的mapreduce阶段会接收到不一样的输入。Iterate读取咱们表中的一行(或者准确来讲是表),而后输出其余数据格式的汇集结果。

artialAggregation合并这些汇集结果到另外相同格式的新的汇集结果,而后最终的reducer取得这些汇集结果真后输出最终结果(该结果或许与接收数据的格式不一致)。

在init()方法中咱们指定输入为string,结果输出格式为integer,还有,部分汇集结果输出格式为integer(保存在aggregation buffer中);terminate()terminatePartial()二者输出一个integer


[java] view plain copy

  1. // init方法中根据不一样的mode指定输出数据的格式objectinspector  

  2. if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  

  3.     inputOI = (PrimitiveObjectInspector) parameters[0];  

  4. else {  

  5.     integerOI = (PrimitiveObjectInspector) parameters[0];  

  6. }  

  7.   

  8. // 不一样model阶段的输出数据格式  

  9. outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  

  10.                     ObjectInspectorOptions.JAVA);  


iterate()函数读取到每行中列的字符串,计算与保存该字符串的长度

[java] view plain copy

  1. public void iterate(AggregationBuffer agg, Object[] parameters)  

  2.     throws HiveException {  

  3.     ...  

  4.     Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  

  5.     myagg.add(String.valueOf(p1).length());  

  6.     }  

  7. }  


Merge函数增长部分汇集总数到AggregationBuffer

[java] view plain copy

  1. public void merge(AggregationBuffer agg, Object partial)  

  2.         throws HiveException {  

  3.     if (partial != null) {  

  4.                   

  5.         LetterSumAgg myagg1 = (LetterSumAgg) agg;  

  6.                   

  7.         Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  

  8.                   

  9.         LetterSumAgg myagg2 = new LetterSumAgg();  

  10.                   

  11.         myagg2.add(partialSum);  

  12.         myagg1.add(myagg2.sum);  

  13.     }  

  14. }  


Terminate()函数返回AggregationBuffer中的内容,这里产生了最终结果。

[java] view plain copy

  1. public Object terminate(AggregationBuffer agg) throws HiveException {  

  2.     LetterSumAgg myagg = (LetterSumAgg) agg;  

  3.     total = myagg.sum;  

  4.     return myagg.sum;  

  5. }  

使用自定义函数

[plain] view plain copy

  1. ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;  

  2. CREATE TEMPORARY FUNCTION letters as 'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';  

  3.   

  4. SELECT letters(name) FROM people;  

  5. OK  

  6. 44  

  7. Time taken: 20.688 seconds  

相关文章
相关标签/搜索