ipyparallel WordCount实现

          i pyparallel 之中,能够利用多个engine同时运行一个任务来加快处理的速度。在ipyparallel之中,集群被抽象为view,包括direct_view和balanced_view。其中,direct_view是全部的engine的抽象,固然也能够自行指定由哪些engine构成,而balanced_view是多个engine通过负载均衡以后,抽象出来的由“单一”engine构成的view。利用ipyparallel并行化的基本思路是将要处理的数据首先进行切分,而后分布到每个engine上,而后将最终的处理结果合并,获得最终的结果,其思路和mapreduce相似。
         下面是一个ipyparallel的并行化wordcount实现,主要思路是:首先读取文件中的句子。利用dview的scatter方法将全部的句子切分红n块发送到每个engine上,正好每个engine一个。而后在每个engine上对切分以后的句子统计词频,最后归并全部engine处理以后的结果。
#!/usr/bin/env python
# coding: utf-8

import time

from itertools import repeat
from ipyparallel import Client, Reference
from urllib import urlretrieve
#对text进行wordcount处理
def wordfreq(text):
    """Return a dictionary of words and word counts in a string."""
    freqs = {}
    for word in text.split():
        lword = word.lower()
        freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#输出词频前n个的单词以及其出现的次数
def print_wordfreq(freqs, n=10):
    """Print the n most common words and counts in the freqs dict."""

    words, counts = freqs.keys(), freqs.values()
    items = zip(counts, words)
    items.sort(reverse=True)
    for (count, word) in items[:n]:
        print(word, count)

#自行实现的并行版本的word_freq,对若干行句子进行处理,返回词,出现次数 键值对
def myword_freq(texts):
    freqs = {}
    for str in texts:
        for word in str.split():
            lword = word.lower()
            freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#自行实现的并行版本的wordfreq,首先将texts[]分散传送至每个engine,而后在每个engine上执行程序myword_freq,返回求出的词 词频键值对
def myPwordfreq(view,lines):
    #将文本平均分布在每个engine上
    view.scatter('texts',lines,flatten=True)
    ar=view.apply(myword_freq,Reference('texts'))
    freqs_list=ar.get()
    #归并最终的处理结果 reduce it!
    word_set=set()
    for f in freqs_list:
        word_set.update(f.keys())
    freqs=dict(zip(word_set,repeat(0)))
    for f in freqs_list:
        for word,count in f.items():
            freqs[word]+=count
    return freqs

if __name__ == '__main__':
    # Create a Client and View
    rc = Client()

    dview = rc[:]
    # Run the serial version
    print("Serial word frequency count:")
    text = open('lines.txt').read()
    tic = time.time()
    freqs = wordfreq(text)
    toc = time.time()
    print_wordfreq(freqs, 10)
    print("Took %.3f s to calculate"%(toc-tic))
    # The parallel version
    print("\nParallel word frequency count:")
    lines=text.splitlines()
    tic=time.time()
    pfreqs=myPwordfreq(dview,lines)
    toc=time.time()
    print_wordfreq(pfreqs)
    print("Took %.3f s to calculate"%(toc-tic))
相关文章
相关标签/搜索