近日因为工做所需,须要使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但因为种种缘由,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,因此只能临阵磨枪了,花了两天时间,大体看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,固然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。
一旦你学会了UDF的使用,就意味着,你能够以更加灵活的方式来使用Pig,使它扩展一些为咱们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子:
你从HDFS上读取的数据格式,若是使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,若是咱们定义了一种特殊的编码存储或序列化方式,那么当咱们使用默认的Pig来加载的时候,就会发现加载不了,这时候咱们的UDF就派上用场了,咱们只须要自定义一个LoadFunction和一个StoreFunction就能够解决,这种问题。
html
本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试经过:
咱们先来看下定义一个UDF扩展类,须要几个步骤:java
序号 步骤 说明node
1 在eclipse里新建一个java工程,并导入pig的核心包 java项目数据库
2 新建一个包,继承特定的接口或类,重写自定义部分 核心业务apache
3 编写完成后,使用ant打包成jar 编译时须要pig依赖,但不用把pig的jar包打入UDF中app
4 把打包完成后的jar上传到HDFS上 pig运行时候须要加载使用框架
5 在pig脚本里,注册咱们自定义的udf的jar包 注入运行时环境eclipse
6 编写咱们的核心业务pig脚本运行 测试是否运行成功ide
项目工程截图以下: 函数
核心代码以下:
package com.pigudf;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;
/**
* 自定义UDF类,对字符串转换大写
* @author qindongliang
* */
public class MyUDF extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
//判断是否为null或空,就跳过
if(input==null||input.size()==0){
return null;
}
try{
//获取第一个元素
String str=(String) input.get(0);
//转成大写返回
return str.toUpperCase();
}catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ",e);
}
}
}
关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件必定要上传到HDFS上,除非你是local模式):
grunt> cat s.txt
zhang san,12
Song,34
long,34
abC,12
grunt>
咱们在看下,操做文件和jar包是放在一块儿的:
grunt> ls
hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3> 1295
hdfs://dnode1:8020/tmp/udf/s.txt<r 3> 36
grunt>
最后,咱们看下pig脚本的定义:
--注册自定义的jar包
REGISTER pudf.jar;
--加载测试文件的数据,逗号做为分隔符
a = load 's.txt' using PigStorage(',');
--遍历数据,对name列转成大写
b = foreach a generate com.pigudf.MyUDF((chararray)$0);
--启动MapReduce的Job进行数据分析
dump b
最后,咱们看下结果,只要过程不出现异常和任务失败,就证实咱们的udf使用成功:
Counters:
Total records written : 4
Total bytes written : 64
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_1419419533357_0147
2014-12-30 18:10:24,394 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2014-12-30 18:10:24,395 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-12-30 18:10:24,396 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2014-12-30 18:10:24,405 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2014-12-30 18:10:24,405 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(ZHANG SAN,12)
(SONG,34)
(LONG,34)
(ABC,12)
结果没问题,咱们的UDF加载执行成功,若是咱们还想将咱们的输出结果直接写入到HDFS上,能够在pig脚本的末尾,去掉dump命令,加入 store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,固然咱们能够自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。