本章的模式有一个共同点:不会改变原来的记录。这种模式是找到一个数据的子集,或者更小,例如取前十条,或者很大,例如结果去重。这种过滤器模式跟前面章节的不一样是,从更小的粒度认识数据,例如特殊用户生成的记录,或文本中用得最多的前10个动词。简单的说,过滤器容许你更清楚的看清数据,像在显微镜下同样。也能够认为是搜索的一种形式。若是你对找出全部有着特殊信息的记录感兴趣,你就能够过滤出不匹配搜索条件的记录。java
抽样,一种通用的过滤程序,是指取出数据的一个样本,好比,取某字段值最高的几个,或随机取几个记录。取样能用来获得更小的,仍具备表明性的子数据集,在其上面进行分析,就没必要处理庞大的数据集。不少机器学习算法在大数据集上运行低效,因此须要建模使其可运行在更小的子数据集。web
子样本也能够用于开发目的。简单的抽取前一千条记录不是好的抽样方法,由于取的数据类似并不能表明整个数据集。一个均匀的抽样可以提供更好的数据的视图,并容许你的程序或分析能在现实数据上完成,甚至取样很是小。正则表达式
本章列出了四种这样的模式:filtering, Bloom filtering, top ten, and distinct。找出数据一部分的方法有不少,每种模式都有细微的差异,甚至都在作相同的事情。算法
咱们将会看到MapReduce中几个简明的使用。Filtering, Bloom filtering,和 simple random能够只使用map端完成,不用reducer。sql
最基本的模式,filtering,为其它模式充当了一种抽象形式。Filtering基于某种条件评估每一条记录,决定它的去留。数据库
过滤出咱们不感兴趣的数据并保存起来。apache
考虑用一个评估方法f,处理记录,若是返回true,保留,若是返回false,丢掉。api
你的数据集很大,你想更专一于数据的一部分并可能在上面作随后的分析。这个本身可能对数据集有表明意义或者就像在大海中捞针。无论用怎样的方式,你须要使用MapReduce并行处理数据,并找到须要的数据,过程可能有点费劲。缓存
例如,你可能只对含有Hadoop的记录感兴趣,指的是那些文本中有hadoop词,或有hadoop标签的记录。Filtering能找到跟hadoop相关的记录并保存,丢掉其它记录。服务器
像hadoop同样的大数据处理系统,一般是关于把全部数据拿到一个本地机器。Filtering就是这种把数据子集拉出来并发送到分析程序的方法。Filtering也用于放大匹配条件的你好奇的数据。对子数据集的探索可能会致使代价更昂贵,而且基于更小数据集上行为的分析更复杂。
Filtering有着普遍的应用。仅须要作的是用指定的规则解析出特定的“records”并决定是否保存。
过滤器模式多是这本书里最简单的。图3-1展现告终构图。
map(key, record):
if we want to keep record then
emit key,value
Figure 3-1. The structure of the filter pattern
Filtering是MapReduce中惟一不须要reduce的。由于它不处理聚合运算。每一条记录单独处理评估去留。
Mapper对每条输入记录应用评估方法。输出键值对跟输入同样,由于记录不会改变。若是评估方法返回true就输出。
Job的输出是经过了选择条件的子记录集。若是格式不变,在大数据集上跑的job也能在过滤后的数据集上跑。
Closer view of data
准备个特殊的记录有共性或有趣的子数据集,作进一步检查。例如,马里兰的电话局可能只关心国际通话中马里兰的去电。
Tracking a thread of events
抽取一系列连续事件做为大数据集的案例研究。例如,你能够经过分析apache web服务器的日志了解特殊用户怎样与你的网站交互。一个特殊用户的行为可能遍及于其它行为中,因此很难找出发生了什么。靠根据用户的ip过滤,就可以获得特殊用户行为更好的视图。
Distributed grep
Grep,一个使用正则表达式找出感兴趣的文本行的强大的工具,很容易并行应用正则表达式匹配每一行并输出。
Data cleansing
数据有时是脏的,或者很难理解,未完成,和错误的格式。数据可能丢失了字段,date类型的字段不能格式化成date,或二进制数据的随即字节可能存在。Filtering能检验每条记录是否结构良好,并去除任何垃圾数据。
Simple random samping
若是你想获得数据集的一个简单随机采样,可使用filtering,用一个评估方法随即返回true或false。在简单随即采样中,数据集中的每条记录都有相同的几率被选中。能够根据源数据的数量算出百分比,获得要返回的记录的数量。例如,数据集有一万亿,你想获得一百万数据,那么评估方法应该每一百万次返回一次true,由于一百万个一百万是一万亿。
Removing low scoring data
若是你能根据排序给数据评分,你能够根据不知足某一临界条件来过滤数据。若是以前已经知道有些数据对分析也没意义,能够给这些记录评比较低的分。这跟随后讲到的top ten模式有相同的目的,除了数据量。
Sql:过滤器模式跟select语句中使用where条件是等同的。记录保持不变,一些被简单的过滤,例如:
SELECT * FROM table WHERE value < 3;
Pig:filter是关键词
b = FILTER a BY value < 3;
这种模式基本上同MapReduce等效率,由于job只有map。下面是map-only job高效的缘由:
·因为没有reducer,就少了数据在map和reduce之间传输数据的阶段。全部的map任务处理本地数据,而后放回本地磁盘。
·因为没有reducer,排序阶段和reduce阶段时没有的。一般不会占用太多时间,但聚沙成塔。
须要注意的一件事情是:输出文件的数量和大小。由于job只有mapper,获得的输出文件都是以part-m-为前缀的。你会发现若是过滤掉不少数据,输出文件的数据量会不多,这会在一段时间后NameNode的可扩展性上带来问题。
若是你顾虑这些小文件,但容忍job运行骚微长一点,可使用identity reducer收集结果集。这样,mapper会把所有输出数据给reducer,但reducer不会作任何处理,仅仅每一个reducer输出一个文件。合适的reducer数量取决于将被写到文件系统的数据量的大小和你想处理多少小文件。
Grep 做为流行的文本过滤工具能够追溯到unix和类unix系统中的使用。对一个文件进行行扫描,匹配指定的样式就输出。咱们要在大文本数据上并行作正则表达式搜索。在这个例子中,咱们将展现在MapReduce中怎样应用正则表达式。
Mapper code。Mapper很简单,由于使用java内建api处理正则表达式。若是文本行匹配样式,就输出这一行。不然忽略这一行。咱们使用setup方法获取job配置的正则。
public static class GrepMapper extends Mapper<Object, Text, NullWritable, Text> { private String mapRegex = null; public void setup(Context context) throws IOException, InterruptedException { mapRegex = context.getConfiguration().get("mapregex"); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (value.toString().matches(mapRegex)) { context.write(NullWritable.get(), value); } } }
只有map,没有combiner,没有reducer。全部输出记录被写到本地文件系统。
在srs中,咱们想抽取大数据的每条记录都同等几率被选择的子数据集,这样有效减少数据集,并在更易于管理的部分数据上作有表明性的分析工做。
实现srs做为过滤操做不是filtering模式的一种程序,但结构是相同的。取代以记录内容为过滤条件的方法,这里生成一个随机数,用来对应一个值,保留对应的记录。
Mapper code。在mapper代码里,从setup方法里获取过滤器的百分率配置值,在map方法里会用到。
Map中,检查随机数的生成。随机数在0到1之间,因此根据与临界值的比较,能够决定是否保留记录。
public static class SRSMapper extends Mapper<Object, Text, NullWritable, Text> { private Random rands = new Random(); private Double percentage; protected void setup(Context context) throws IOException, InterruptedException { // Retrieve the percentage that is passed in via the configuration // like this: conf.set("filter_percentage", .5); // for .5% String strPercentage = context.getConfiguration().get("filter_percentage"); percentage = Double.parseDouble(strPercentage) / 100.0; } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (rands.nextDouble() < percentage) { context.write(NullWritable.get(), value); } } }
在这个只有map的job里,没有combiner或reducer。全部输出记录被写到本地文件系统。当使用一个小的百分比,你会发现文件数据量小且文件数量多。若是是这样,就把reducer数量设为1,但不指定reducer类。这样会告诉MapReduce框架执行默认的identity reducer把map的输出收集为一个文件。随后能够用hadoop fs –cat查看文件。
Bloom filtering跟前面的模式作一样的事情,但对每条记录的评估方法不同。
这种过滤器是指咱们保存预先定义的值得集合。若是输出由一点错误是不要紧的,由于咱们还打算进一步检查。这些预先定义的值叫热点值。
对每条记录,抽取一种特色。若是这种特色是预约义的值集合里面的成员,保留,不然丢掉,或反之。
Bloom filtering在查看每一条记录和决定是否保留问题上跟通用的filtering类似。然而,有两个主要不一样点区别于通用filtering。首先,咱们想使用热点值,基于某种集合的隶属关系过滤数据。例如:若是用户字段是预先定义的用户列表里的,咱们保留或去除这条记录。其次,集合的隶属关系用bloom filter来评估,在附录A有描述。在某种意义上,bloom filtering是一种不关心join右边数据值得join操做。(左链接)
这种模式跟第五章的replicated join模式有点相像。拿一个列表跟另外一个比较,作一些join逻辑的排序,仅使用map任务。取代replicated join中用分布式缓存复制热点值列表到各处,咱们发送一个bloom filter对象到分布式缓存。这样就容许使用的bloom filter对象取代列表自己,这容许执行更大的数据集的操做,由于bloom filter更简洁。并且不存在列表大小受内存限制的状况,只受bloom filter定义的feature limitations限制。
使用bloom filter,在这种状况下计算集合隶属关系有一种后果:有时会获得一种错的判断。就是说,一个值被判断为集合的元素而实际上不是。若是bloom filter判断出一个值不是集合的成员,咱们必须保证它的正确性。关于为何这种状况发生的更多信息,参考附录 A。然而,在一些状况下,这不是大问题。这章最后的一个例子代码中,咱们将会收集至关大量的有趣的单词,若是一条记录中包含了有趣单词中的一个,保留该条记录。咱们作这个的目的是想靠去掉不感兴趣的内容而使咱们的数据更有意义。若是使用bloom filter表明单词列表,有时一个单词可能成为列表的成员,虽然列表不该该有它。这种状况下,若是咱们意外保存了一些记录,咱们任然要达到咱们过滤掉大多数垃圾数据的目的。
下面是使用bloom filtering的条件:
·数据能被分割为记录,就像filtering里的。
·能从每条记录抽取的特性都在热点值里。
·要有一个预先设定的热点值得条目的集合。
·能容忍一些错误。(例如不该该经过检查的经过了)
Figure 3-2. The structure of the Bloom filtering pattern
图3-2展现了bloom filtering的结构,和它是怎样分红两个主要组件的。首先,要训练出值得列表。结果被存在hdfs上。下面是filtering的MapReduce job,跟本章前一个模式相同,除了用到分布式缓存。由于记录被一条条分析而且没有聚合要作,因此没有reducer。
第一阶段训练值得列表。即从存储的地方load数据并把每一个条目加到Bloom filter。训练好的bloom filter对象存储到已知的hdfs目录。
第二步,作具体的过滤。Map任务启动后,从分布式缓存加载bloom filter。而后,在map方法里,迭代记录检查时候知足隶属热点值列表。每条记录或者经过或者没经过隶属关系的检查。
当数据改变的时候,bloom filter须要从新训练。所以使用懒加载模式设置bloom filter是合适的。
Job的输出是那些经过了bloom filter资格测试的子数据集。你应该预料到输出数据中的一些记录可能并不在热点值中,由于bloom filter有必定概率出错。
Removing most of the nonwatched values
最简单的例子是去除不经常使用的值。例如,你只对有含有10000个单词的列表里的单词感兴趣,用hadoop处理。你拿到这个数据列表,训练出bloom filter,而后检查文本数据,看看每条记录是否命中其中的一个单词,命中保存继续执行,没命中不保存。虽然可能获得一些错误的记录,但也没多大问题,由于你已经去掉了大多数数据。
Prefiltering a data set for an expensive set membership check
有时,检查某个值是不是集合的成员的代价是昂贵的。例如,你可能作涉及到webservice或外部数据库去检验值是否在集合中。这种情形可能很是稀少,但可能忽然出现。替代周期性的把列表放到集群,你能够在数据源所在系统产生一个bloom filter并放进去。一旦在适当的位置部署了bloom filter并过滤掉大部分数据,你能够对数据的来源作第二次检查。若是bloom filter能去掉95%以上的数据,你将看到在外部只须要命中剩下的5%。使用这种途径,能够达到100%的准确率,并不会对外部系统带来大量查询的性能问题。
随后的第五章,咱们会看到一种模式叫“Reduce Side Join with Bloom Filtering”,就是用bloom filter减小发送到reducers的数据量。提早决定记录是不是咱们想要的,能显著减小网络带宽。
Bloom filter在数据分析领域相对较新,极可能由于在大数据性能方面特别收益于这种以前方法论里没有的东西。在hive sql和pig里,bloom filter能够实现为用户自定义的方法,但做为本书写做时,并无当即可用的原生功能。
这种模式的性能很是相似于简单filtering。从分布式缓存加载bloom filter花不了多少时间,由于数据相对较小。根据bloom filter检查值也是相对轻松的操做,由于每次都执行常数级别的时间。
bloom filter一个最基本的应用正如它所设计之目的:描述数据集。做为本例,bloom filter用一些热点关键词列表训练。咱们使用bloom filter测试评论里的每一个单词是否在热点列表里。若是返回true,整个记录输出。不然忽略。这里咱们不关心bloom filter产生的不可避免的false误报为true的状况。下一个例子详细说明一种使用HBase验证positive bloom filter的方法。
问题:给出用户评论数据,过滤出绝大多数不包含指定关键词的评论。
Bloom filter training。为了演示怎样使用hadoopbloom filters,下面的代码段生成预先决定的单词的集合。这是一个通用的程序,输入参数为一个gzip文件或含有gzip文件的目录,文件里元素的数量,但愿的误报率,最终输出文件名。
public classBloomFilterDriver {
public staticvoid main(String[]args) throws Exception{
// Parse command line arguments
Path inputFile= newPath(args[0]);
intnumMembers =Integer.parseInt(args[1]);
floatfalsePosRate =Float.parseFloat(args[2]);
Path bfFile= newPath(args[3]);
// Calculate our vector size and optimal K value based on approximations
intvectorSize =getOptimalBloomFilterSize(numMembers,falsePosRate);
intnbHash =getOptimalK(numMembers,vectorSize);
// Create new Bloom filter
BloomFilter filter= newBloomFilter(vectorSize,nbHash,
Hash.MURMUR_HASH);
System.out.println("Training Bloom filter of size " + vectorSize
+" with " + nbHash + " hash functions, "+ numMembers
+" approximate number of records, and " + falsePosRate
+" false positive rate");
// Open file for read
String line= null;
intnumElements =0;
FileSystem fs= FileSystem.get(newConfiguration());
for(FileStatus status: fs.listStatus(inputFile)) {
BufferedReader rdr= newBufferedReader(newInputStreamReader(
newGZIPInputStream(fs.open(status.getPath()))));
System.out.println("Reading " + status.getPath());
while((line= rdr.readLine()) !=null) {
filter.add(newKey(line.getBytes()));
++numElements;
}
rdr.close();
}
System.out.println("Trained Bloom filter with " + numElements
+" entries.");
System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
FSDataOutputStream strm= fs.create(bfFile);
filter.write(strm);
strm.flush();
strm.close();
System.exit(0);
}
}
代码中,一个新的bloomfilter对象由最优矢量大小和基于输入参数的最优数量的hash方法(k)。liststatus返回的每一个文件按行读,每一行都用来训练bloom filter。全部输入文件处理完后,bloom filter以输入参数为名字序列化到文件中。由于bloomfilter也是writable对象,因此序列化不值一提。简单的使用FileSystem对象建立一个流 FSDataOutputStream,把这个流传给过滤器的写方法,而后刷新并关闭流。
随后bloom filter能很容易得从hdfs反序列化回来。仅仅使用FileSystem对象打开文件并传给bloomfilter的readfilelds方法。Bloom filter的反序列化会在下面map代码的setup方法里演示。
Mapper code。Hadoop框架为每一个mapper在执行大量的map调用以前,调用一次setup方法。这里,在map方法使用以前,bloom filter从分布式缓存反序列化回来。分布式缓存是hadoop通用功能,可以保证hdfs上的文件也会在须要这个文件的每一个task的本地文件系统也存在。Bloom filter就被填充了一个热点单词列表。
在map方法里,评论从每一个输入记录中提取。评论被标记为单词,每一个单词清洗掉无关字符。清洗后的单词就可以使用bloom filter测试。
Notice:bloom filter是在单词的字节层次上训练。单词相同但大小写不一样,会被认为不一样。除非你的逻辑须要大小写敏感,最好训练和测试以前转换成小写。
public static classBloomFilteringMapper extends
Mapper<Object,Text, Text, NullWritable> {
privateBloomFilter filter =new BloomFilter();
protectedvoid setup(Context context)throws IOException,
InterruptedException{
// Get file from the DistributedCache
URI[]files = DistributedCache.getCacheFiles(context
.getConfiguration());
System.out.println("Reading Bloom filter from: "
+files[0].getPath());
// Open local file for read.
DataInputStream strm= newDataInputStream(newFileInputStream(
files[0].getPath()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
}
publicvoid map(Object key,Text value,Context context)
throwsIOException,InterruptedException {
Map<String,String> parsed = transformXmlToMap(value.toString());
// Get the value for the comment
String comment= parsed.get("Text");
StringTokenizer tokenizer= newStringTokenizer(comment);
// For each word in the comment
while(tokenizer.hasMoreTokens()) {
// If the word is in the filter, output the record and break
String word= tokenizer.nextToken();
if(filter.membershipTest(newKey(word.getBytes()))) {
context.write(value,NullWritable.get());
break;
}
}
}
}
Bloom filters能帮助费劲的任务减小没必要要的代价。下面的例子,bloom filter使用至少有1500声誉值的用户id训练。在查询hbase获取更多用户信息以前,咱们使用bloom filter 作初始环境的测试。依靠消除没必要要的查询,咱们加速执行时间。
问题:给出用户评论列表,过滤掉声誉不超过1500的用户的评论。
Mapper code。跟前面的例子同样,用到了反序列化。这个bloom filter使用有声誉的至少1500用户的id训练。仅仅是全部用户的1.5%,因此将会过滤出大量不须要的查询。除了bloom filter,hbase table的链接也会在setup里获取。
在map方法里,抽取用户的id,用bloom filter作检查,若是检查经过,hbase会用这个id去hbase table查询出用户的全部数据。这里,靠验证用户真实的声誉至少1500,来做废可能出现的误报错误。
public static classBloomFilteringMapper extends
Mapper<Object,Text, Text, NullWritable> {
privateBloomFilter filter =new BloomFilter();
privateHTable table =null;
protectedvoid setup(Context context)throws IOException,
InterruptedException{
// Get file from the Distributed Cache
URI[]files = DistributedCache.getCacheFiles(context
.getConfiguration());
System.out.println("Reading Bloom filter from: "
+files[0].getPath());
// Open local file for read.
DataInputStream strm= newDataInputStream(newFileInputStream(
files[0].getPath()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
// Get HBase table of user info
Configuration hconf= HBaseConfiguration.create();
table= newHTable(hconf,"user_table");
}
publicvoid map(Object key,Text value,Context context)
throwsIOException,InterruptedException {
Map<String,String> parsed = transformXmlToMap(value.toString());
// Get the value for the comment
String userid= parsed.get("UserId");
// If this user ID is in the set
if(filter.membershipTest(newKey(userid.getBytes()))) {
// Get the reputation from the HBase table
Result r= table.get(newGet(userid.getBytes()));
intreputation =Integer.parseInt(newString(r.getValue(
"attr".getBytes(),"Reputation".getBytes())));
// If the reputation is at least 1500,
// write the record to the file system
if(reputation>= 1500) {
context.write(value,NullWritable.get());
}
}
}
}
Query Buffer Optimization
前面查询hbase的例子方法相对幼稚。这里只是为了演示怎么应用这种模式,能够进一步优化。Hbase提供了分批查询,因此理想的状况下,能够预缓存必定大小的查询结果。这个常数取决于内存能充裕的存多少查询。而后把查询刷进hbase执行进一步处理并返回结果。若是昂贵的操做能缓存,这是推荐的作法。只须要记得在mapper或reducer的cleanup方法里刷新缓存。Context对象能用来写输出,就像map或reduce方法。
附: