Pig是一种数据流编程语言,由一系列操做和变换构成,每个操做或者变换都对输入进行处理,而后产生输出结果,总体操做表示一个数据流。Pig的执行环境将数据流翻译为可执行的内部表示,在Pig内部,这些变换操做被转换为一系列的MapReduce做业。java
Pig自身有许多个方法,有时候须要咱们本身定制特定的处理方法即UDF。apache
UDF具体的步骤以下:编程
第一步,继承计算类或者过滤类或者加载类或者存储类,重写里面的须要实现的方法,将写好的类进行打包生成jar文件。诸如命名为example.jarless
第二步,进入Pig的grunt中,利用register将打包的文件注册进入Pig中。进入Pig的grunt中,当前本地路径就是用户输入Pig时候所在的路径。打包文件必定要加上它所在的路径。如register example.jar。编程语言
第三步,直接使用该自定义的UDF,在使用的过程当中须要加上该类的权限定包名,若是这里example.jar的包结构为com.whut.FilterFunct。则引用的时候就是com.whut.FilterFunct(参数)。注意类的名称就是使用时候的方法名,必需要区分大小写。ide
第四步,为本身的UDF定义别名,这样使用的时候就不准要加包名了,如函数
define Goog com.whut.FilterFunct()。这样使用的时候就直接利用Goog了。grunt
自定义过滤UDF:oop
过滤UDF须要继承FilterFunc。实现其exec方法。该方法返回的是boolean型。在对温度统计的时候,就能够利用过滤UDF来过滤是否正确的气温。this
package whut; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.pig.FilterFunc; import org.apache.pig.FuncSpec; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; //删除记录中不符合要求的记录 //pig的自定义函数,过滤函数 public class IsGoodQuality extends FilterFunc{ @Override public Boolean exec(Tuple tuple) throws IOException { // TODO Auto-generated method stub if(tuple ==null ||tuple.size()==0) return false; try{ Object obj=tuple.get(0); if(obj==null) return false; //这里强制转换为一个××× int i=(Integer)obj; return i==0 ||i==1 || i==2 || i==3; }catch(ExecException e) { throw new IOException(e); } } }
这里的参数是一个元组,能够包含多个输入参数,在方法中直接利用get(索引位置)来直接获取。
自定义加载函数UDF
在Pig中常常会使用到加载外部文件,通常使用Load进行加载,如Load 'input/tempdata' as (a:chararray,b:int) 。这里默认使用了内部加载存储函数,PigStorage。
即Load 'input/tempdata' using PigStorage() as (a:chararray,b:int)。这里PigStorage默认的每一行的字段分割符是制表符,固然也能够传递一个本身的字段分割符号。有时候每一行是一串字符串,想从中取出某一个字段,则就须要本身定义一个加载函数。如下面这个文件为例子。
aaaaa1990aaaaaa0039a bbbbb1991bbbbbb0045a ccccc1992cccccc0011c ddddd1993dddddd0043d eeeee1994eeeeee0047e aaaaa1990aaaaaa0037a bbbbb1991bbbbbb0027a ccccc1992cccccc0032c ddddd1993dddddd0090d eeeee1994eeeeee0091e aaaaa1980aaaaaa0041a bbbbb1981bbbbbb0050a ccccc1992cccccc0020c ddddd1993dddddd0033d eeeee1984eeeeee0061e aaaaa1980aaaaaa0054a bbbbb1991bbbbbb0075a ccccc1982cccccc0011c ddddd1993dddddd0003d eeeee1974eeeeee0041e aaaaa1990aaaaaa0039a bbbbb1961bbbbbb0041a ccccc1972cccccc0070c ddddd1993dddddd0042d eeeee1974eeeeee0043e aaaaa1990aaaaaa0034a bbbbb1971bbbbbb0025a ccccc1992cccccc0056c ddddd1993dddddd0037d eeeee1984eeeeee0038e aaaaa1990aaaaaa0049a bbbbb1991bbbbbb0011a ccccc1962cccccc0012c ddddd1993dddddd0023d eeeee1984eeeeee0031e aaaaa1980aaaaaa0094a bbbbb1971bbbbbb0045a ccccc1992cccccc0041c ddddd1993dddddd0003d eeeee1984eeeeee0081e aaaaa1960aaaaaa0099a bbbbb1971bbbbbb0050a ccccc1952cccccc0055c ddddd1963dddddd0043d eeeee1994eeeeee0041e aaaaa1990aaaaaa0031a bbbbb1991bbbbbb0020a ccccc1952cccccc0030c ddddd1983dddddd0013d eeeee1974eeeeee0061e aaaaa1980aaaaaa0071a bbbbb1961bbbbbb0060a ccccc1992cccccc0080c ddddd1953dddddd0033d eeeee1964eeeeee0051e aaaaa1960aaaaaa0024a bbbbb1951bbbbbb0035a ccccc1952cccccc0048c ddddd1953dddddd0053d eeeee1954eeeeee0048e
为了从中取出年份和温度,则就须要本身定义加载函数,这里每一列序号以0开始。自定义加载函数须要继承LoadFunc。具体的代码以下。
package whut; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; class Range { //列的索引以0开始 //字段分割的列的位置 private int start; private int end; //根据输入来解析 //字符串格式必须是(2~3,5~6) public static List<Range> parse(String cutStr)throws Exception { List<Range> rangeList=new ArrayList<Range>(); //首先要判断是否格式正确 boolean state=cutStr.matches("\\d+~\\d+(,\\d+~\\d+)*"); if(!state) { throw new Exception("InputForat Error:\n" + "Usage:number~number,number~number;Such 2~7,10~19"); } //先截取几个字段的列起止位置如2~8 String[] splits=cutStr.split(","); //遍历长度设置Range for(int i=0;i<splits.length;i++) { Range range=new Range(); String sub=splits[i]; String[] subSplits=sub.split("~"); int subStart=Integer.parseInt(subSplits[0]); int subEnd=Integer.parseInt(subSplits[1]); if(subStart>subEnd) throw new Exception("InputForat Error:\n" + "Detail:first number must less than second number"); range.setStart(subStart); range.setEnd(subEnd); rangeList.add(range); } return rangeList; } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public String getSubString(String inStr) { String res=inStr.substring(start, end); return res; } } //定义加载函数,从每一行字符串提出年份,温度 public class LineLoadFunc extends LoadFunc{ private static final Log LOG=LogFactory.getLog(LineLoadFunc.class); //负责产生元组的各个字段 private final TupleFactory tupleFactory=TupleFactory.getInstance(); //负责读取输入记录 private RecordReader reader; //存每一个字段的集合 private List<Range> ranges; //传递参数设置列的位置分割 public LineLoadFunc(String cutPattern)throws Exception { ranges=Range.parse(cutPattern); } //设置文件的加载位置 @Override public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); } //设置加载文件的输入文件格式 //为每个分片创建一个RecordReader @Override public InputFormat getInputFormat() throws IOException { return new TextInputFormat(); } @Override public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { this.reader=reader; } @Override public Tuple getNext() throws IOException { // TODO Auto-generated method stub try{ if(!reader.nextKeyValue()) return null; //TextInputFormat //key:LongWritable,value:Text Text value=(Text)reader.getCurrentValue(); String line=value.toString(); //设置每个元组有几个字段 Tuple tuple=tupleFactory.newTuple(ranges.size()); for(int i=0;i<ranges.size();i++) { Range range=ranges.get(i); if(range.getEnd()>line.length()) { throw new ExecException("InputFormat:Error\n" + "field length more than total length"); } //必须使用DataByteArray来构造字段的类型 tuple.set(i, new DataByteArray(range.getSubString(line))); } return tuple; }catch(InterruptedException e) { throw new ExecException(); } } }
具体使用的方法就是按照刚才所说的步骤进行的。