最近迷上了spark,写一个专门处理语料库生成词库的项目拿来练练手, github地址:https://github.com/LiuRoy/spark_splitter。代码实现参考wordmaker项目,有兴趣的能够看一下,此项目用到了很多很tricky的技巧提高性能,单纯只想看懂源代码能够参考wordmaker做者的一份简单版代码。python
这个项目统计语料库的结果和执行速度都还不错,但缺点也很明显,只能处理GBK编码的文档,并且不能分布式运行,恰好最近在接触spark,因此用python实现了里面的算法,使之能处理更大规模的语料库,而且同时支持GBK和UTF8两种编码格式。git
wordmaker提供了一个统计大规模语料库词汇的算法,和结巴分词的原理不一样,它不依赖已经统计好的词库或者隐马尔可夫模型,可是一样能获得不错的统计结果。原做者的文档提到是用多个线程独立计算各个文本块的词的信息,再按词的顺序分段合并,再计算各个段的字可能组成词的几率、左右熵,获得词语输出。下面就详细的讲解各个步骤:github
原始的C++代码挺长,可是用python改写以后不多,上文中的1 2 3步用spark实现很是简单,代码在split函数中。第3部过滤后的结果已经相对较小,能够直接取出放入内存中,再计算熵过滤,在split中执行target_phrase_rdd.filter(lambda x: _filter(x))
过滤的时候能够phrasedictmap作成spark中的广播变量,提高分布式计算的效率,由于只有一台机器,因此就没有这样作。split代码以下,原来很长的代码用python很容易就实现了。算法
1 def split(self): 2 """spark处理""" 3 raw_rdd = self.sc.textFile(self.corpus_path) 4 5 utf_rdd = raw_rdd.map(lambda line: str_decode(line)) 6 hanzi_rdd = utf_rdd.flatMap(lambda line: extract_hanzi(line)) 7 8 raw_phrase_rdd = hanzi_rdd.flatMap(lambda sentence: cut_sentence(sentence)) 9 10 phrase_rdd = raw_phrase_rdd.reduceByKey(lambda x, y: x + y) 11 phrase_dict_map = dict(phrase_rdd.collect()) 12 total_count = 0 13 for _, freq in phrase_dict_map.iteritems(): 14 total_count += freq 15 16 def _filter(pair): 17 phrase, frequency = pair 18 max_ff = 0 19 for i in xrange(1, len(phrase)): 20 left = phrase[:i] 21 right = phrase[i:] 22 left_f = phrase_dict_map.get(left, 0) 23 right_f = phrase_dict_map.get(right, 0) 24 max_ff = max(left_f * right_f, max_ff) 25 return total_count * frequency / max_ff > 100.0 26 27 target_phrase_rdd = phrase_rdd.filter(lambda x: len(x[0]) >= 2 and x[1] >= 3) 28 result_phrase_rdd = target_phrase_rdd.filter(lambda x: _filter(x)) 29 self.result_phrase_set = set(result_phrase_rdd.keys().collect()) 30 self.phrase_dict_map = {key: PhraseInfo(val) for key, val in phrase_dict_map.iteritems()}
进入spark_splitter/splitter目录,执行命令PYTHONPATH=. spark-submit spark.py
处理test/moyan.txt文本,是莫言全集,统计完成的结果在out.txt中,统计部分的结果以下,因为算法的缘由,带有 的 地 这种连词词语不能正确的切分。分布式
(我也不知道为何这个词这么多)函数