做者简介html
淳敏,物流架构师同时也是一位team leader,工做认真负责,曾在休假期间“面向大海编程”,不明觉厉java
在Hive中,用户能够自定义一些函数,用于扩展HiveQL的功能。Hive 自定义函数主要包含如下三种:git
Hive的UDF机制是须要用户实现: Resolver
和Evaluator
,其中Resolver
就用来处理输入,调用Evaluator
,Evaluator
就是具体功能的实现。sql
Hadoop提供了一个基础类org.apache.hadoop.hive.ql.exec.UDF
,在这个类中含有了一个UDFMethodResolver
的接口实现类DefaultUDFMethodResolver
的对象。apache
public class UDF {
private UDFMethodResolver rslv;
public UDF() {
this.rslv = new DefaultUDFMethodResolver(this.getClass());
}
......
}
复制代码
在DefaultUDFMethodResolver
中,提供了一个getEvalMethod
的方法,从切面调用UDF
的evaluate
方法编程
public class DefaultUDFMethodResolver implements UDFMethodResolver {
private final Class<? extends UDF> udfClass;
public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
}
}
复制代码
自定义UDF的实现上以继承org.apache.hadoop.hive.ql.exec.UDF
为基础,而后实现一个evaluate
方法,该方法会被DefaultUDFMethodResolver
对象执行。json
public class DAIsContainPoint extends UDF {
public Boolean evaluate(Double longitude, Double latitude, String geojson) {
Boolean isContained = false;
try {
Polygon polygon = JTSHelper.parse(geojson);
Coordinate center = new Coordinate(longitude, latitude);
GeometryFactory factory = new GeometryFactory();
Point point = factory.createPoint(center);
isContained = polygon.contains(point);
}catch (Throwable e){
isContained = false;
}finally {
return isContained;
}
}
}
复制代码
完成了代码定义以后须要对其进行打包,编译成一个jar
,注意: 最终的jar
中须要包含全部依赖的jar
,maven
编译上推荐使用maven-shade-plugin
数组
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
复制代码
最后产生的jar
文件须要在HIVE SQL中被引用架构
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint';
select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example;
复制代码
在Hive的聚合计算中,采用MapReduce的方式来加快聚合的速度,而UDAF就是用来撰写聚合类自定义方法的扩展方式。关于MapReduce须要补充知识的请看这里,为了更好的说明白UDAF咱们须要知道一下MapReduce
的流程app
回到Hive中来,在UDAF的实现中,首先须要继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
,并实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
接口。而后构造GenericUDAFEvaluator
类,实现MapReduce的计算过程,其中有3个关键的方法
iterate
:获取mapper,输送去作mergemerge
:combiner合并mapperterminate
:合并全部combiner返回结果而后再实现一个继承AbstractGenericUDAFResolver
的类,重载其getEvaluator
的方法,返回一个GenericUDAFEvaluator
的实例
public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo genericUDAFParameterInfo) throws SemanticException {
return new DAJoinStringEvaluator();
}
public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfos) throws SemanticException {
if (typeInfos.length != 1) {
throw new UDFArgumentTypeException(typeInfos.length - 1,
"Exactly one argument is expected.");
}
if (typeInfos[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
switch (((PrimitiveTypeInfo) typeInfos[0]).getPrimitiveCategory()) {
case STRING:
return new DAJoinStringEvaluator();
default:
throw new UDFArgumentTypeException(0,
"Only numeric or string type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
}
public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector mInput;
private Text mResult;
// 存储Geometry join的值的类
static class PolygonAgg implements AggregationBuffer {
Geometry geometry;
}
//定义:UDAF的返回类型,肯定了DAJoin自定义UDF的返回类型是Text类型
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
mResult = new Text();
mInput = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
//内存建立,用来存储mapper,combiner,reducer运算过程当中的相加总和。
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
PolygonAgg polygonAgg = new PolygonAgg();
reset(polygonAgg);
return polygonAgg;
}
public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
GeometryFactory factory = new GeometryFactory();
polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
}
//map阶段:获取每一个mapper,去进行merge
public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
assert (objects.length == 1);
merge(aggregationBuffer, objects[0]);
}
//在一个子的partial中combiner合并map返回结果
public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
return terminate(aggregationBuffer);
}
//combiner合并map返回结果
public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
if (partial != null) {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
Polygon polygon = JTSHelper.parse(geoJson);
polygonAgg.geometry = polygonAgg.geometry.union(polygon);
} catch (Exception e){
}
}
}
//reducer合并全部combiner返回结果
public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
Geometry buffer = polygonAgg.geometry.buffer(0);
mResult.set(JTSHelper.convert2String(buffer.convexHull()));
return mResult;
}catch (Exception e) {
return "";
}
}
}
}
复制代码
打包以后将其用在HIVE SQL中执行
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2';
create table udaf_example as
select id, da_join(da_range) as da_union_polygon
from example
group by id
复制代码
在UDTF的实现中,首先须要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,实现process
,initialize
和close
方法
initialize
返回StructObjectInspector对象,决定最后输出的column的名称和类型process
是对每个输入record进行处理,产生出一个新数组,传递到forward
方法中进行处理close
关闭整个调用的回调处,清理内存public class S2SimpleRegionCoverV2 extends GenericUDTF {
private final static int LEVEL = 16;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
List<String> structFieldNames = Lists.newArrayList("s2cellid");
List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
PrimitiveObjectInspectorFactory.javaLongObjectInspector);
return ObjectInspectorFactory
.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}
@Override
public void process(Object[] objects) throws HiveException {
String json = String.valueOf(objects[0]);
List<Long> s2cellids = toS2CellIds(json);
for (Long s2cellid: s2cellids){
forward(new Long[]{s2cellid});
}
}
public static List<Long> toS2CellIds(String json) {
GeometryFactory factory = new GeometryFactory();
GeoJsonReader reader = new GeoJsonReader();
Geometry geometry = null;
try {
geometry = reader.read(json);
} catch (ParseException e) {
geometry = factory.createPolygon(new Coordinate[]{});
}
List<S2Point> polygonS2Point = new ArrayList<S2Point>();
for (Coordinate coordinate : geometry.getCoordinates()) {
S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
polygonS2Point.add(s2LatLng.toPoint());
}
List<S2Point> points = polygonS2Point;
if (points.size() == 0) {
return Lists.newArrayList();
}
ArrayList<S2CellId> result = new ArrayList<S2CellId>();
S2RegionCoverer
.getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0), LEVEL, result);
List<Long> output = new ArrayList<Long>();
for (S2CellId s2CellId : result) {
output.add(s2CellId.id());
}
return output;
}
@Override
public void close() throws HiveException {
}
}
复制代码
在使用的时候和lateral view
连在一块儿用
add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar;
create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2';
drop table if exists temp.cm_s2_id_cover_list;
create table temp.cm_s2_id_cover_list as
select tb_s2cellid.s2cellid, source.shop_id
from (
select
geometry,
shop_id
from
example) source
lateral view da_cover(geometry) tb_s2cellid as s2cellid;
复制代码
阅读博客还不过瘾?
欢迎你们扫二维码经过添加群助手,加入交流群,讨论和博客有关的技术问题,还能够和博主有更多互动
![]()
博客转载、线下活动及合做等问题请邮件至 shadowfly_zyl@hotmail.com 进行沟通