闲下来再回顾下spark 和 hive 的聚合函数 使用:java
spark自定义聚合函数类sql
class GroupConcatUDAF extends UserDefinedAggregateFunction{ /** * 指定输入字段的字段及类型 * group by 以后会有1到多个数据被归到一组,因此用Array()封装 */ override def inputSchema: StructType = { StructType(Array( StructField("str",StringType,true) )) } //聚合过程当中的中间结果集类型 override def bufferSchema: StructType ={ StructType(Array( StructField("strings",StringType,true) )) } //函数的返回类型 override def dataType: DataType = { StringType } override def deterministic: Boolean = { true } //为每一个分组的数据初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)="" } //指的是,每一个分组,有新的值进来时,如何进行分组的聚合计算 //至关于map的combiner,buffer里面存放着累计的执行结果,input是当前的执行结果 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getAs[String](0)+"|"+input.getAs[String](0) } //因为Spark是分布式的,因此一个分组的数据,可能会在不一样的节点上进行局部聚合,就是update //可是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并 //至关于reduce端的合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getAs[String](0) + buffer2.getAs[String](0) } //一个分组的聚合值,如何经过中间的聚合值,最后返回一个最终的聚合值 override def evaluate(buffer: Row): Any ={ buffer.getAs[String](0) } }
spark自定义聚合函数的调用:服务器
object 测试spark的聚合函数 extends App{ val spark=SparkSession.builder().appName("spark udaf").master("local[*]").getOrCreate() strCount() //1--测试strCount的使用+group_concat函数的使用 def strCount(): Unit ={ //导入隐式转化 import spark.implicits._ //构造用户的访问数据,并建立DataFrame val names=Array("张三","李四","王五","赵六","赵六","张三") val namesRDD: RDD[String] = spark.sparkContext.parallelize(names) //将RDD转换为DataFram val namesRowRDD=namesRDD.map(name=>Row(name)) val structType=StructType(Array( StructField("name",StringType,true) )) val namesDF=spark.sqlContext.createDataFrame(namesRowRDD,structType) //注册表 namesDF.createOrReplaceTempView("names") //定义和注册自定义函数 spark.sqlContext.udf.register("group_concat",new GroupConcatUDAF) //使用自定义函数 spark.sqlContext.sql("select name,concat_ws('|',collect_set(name)), concat_ws('|',collect_list(name)),group_concat(name) from names group by name").show() } }
下图为调用自定义聚合函数group_concat的结果,其实直接使用concat_ws()函数也能实现group_concat功能,不过若是须要保持顺序对应关系,则使用concat_ws('|',collect_list(name))。若须要去重则使用concat_ws('|',collect_set(name))。app
hive的自定义聚合函数---group_concat分布式
public class GroupConcat extends UDAF { public static class ConcatUDAFEvaluator implements UDAFEvaluator { //定义一个构造类,封装结果 public static class PartialResult{ String result; String delimiter; } private PartialResult partial; //init函数相似于构造函数,用于UDAF的初始化 public void init() { partial = null; } // iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean public boolean iterate(String value,String deli){ if (value == null){ return true; } if (partial == null){ partial = new PartialResult();//构造类 partial.result = new String("");//初始化值 if( deli == null || deli.equals("") ) { partial.delimiter = new String(",");//设置分隔符,没有设置默认使用 ',' } else { partial.delimiter = new String(deli);//设置分隔符 } } if ( partial.result.length() > 0 )//处理传入的值 { partial.result = partial.result.concat(partial.delimiter);//值 拼接 分隔符 } partial.result = partial.result.concat(value);//拼接每次传入的值 return true; } //terminatePartial无参数,其为iterate函数遍历结束后,返回轮转结果 public PartialResult terminatePartial(){ return partial; } //合并两个部分汇集值 public boolean merge(PartialResult other){ if (other == null){ return true; } if (partial == null){ partial = new PartialResult(); partial.result = new String(other.result); partial.delimiter = new String(other.delimiter); } else { if ( partial.result.length() > 0 ) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(other.result); } return true; } //terminate返回最终的汇集函数结果 * * @return public String terminate(){ return new String(partial.result); } } }
使用方法:ide
1.将程序打成jar包,上传至服务器。 2.进入hive客户端 3.添加jar包。 hive>add jar /opt/hive-1.0-SNAPSHOT.jar 4.建立临时函数 hive>create temporary function group_concat as 'GroupConcat'; 5.调用临时函数 hive>select group_concat (ykd018) as pdxCode from t_kc21k1 group by akc190;