在Hadoop上用Python实现WordCount

1、简单说明python

  本例中咱们用Python写一个简单的运行在Hadoop上的MapReduce程序,即WordCount(读取文本文件并统计单词的词频)。这里咱们将要输入的单词文本input.txt和Python脚本放到/home/data/python/WordCount目录下。app

cd /home/data/python/WordCountoop

vi input.txt测试

输入:spa

There is no denying thatblog

hello python排序

hello mapreduceip

mapreduce is goodhadoop

 

2、编写Map代码input

  这里咱们建立一个mapper.py脚本,从标准输入(stdin)读取数据,默认以空格分隔单词,而后按行输出单词机器词频到标准输出(stdout),整个Map处理过程不会统计每一个单词出现的总次数,而是直接输出“word 1”,以便做为Reduce的输入进行统计,确保该文件是可执行的(chmod +x /home/data/python//WordCount/mapper.py)。

cd /home/data/python//WordCount

vi mapper.py

#!/usr/bin/env python
# -*- coding:UTF-8 -*-

import sys

for line in sys.stdin:     #sys.stdin为读取数据,遍历读入数据的每一行

    line = line.strip()   #删除开头和结尾的空格

    words = line.split()  #以默认空格分隔行单词到words列表

    for word in words:

    #输出全部单词,格式为“单词,1”以便做为Reduce的输入

    print('%s\t%s' %(word,1))

#截图以下:

 

3、编写Reduce代码

  这里咱们建立一个reducer.py脚本,从标准输入(stdin)读取mapper.py的结果,而后统计每一个单词出现的总次数并输出到标准输出(stdout),

确保该文件是可执行的(chmod +x /home/data/python//WordCount/reducer.py)

cd /home/data/python//WordCount

vi reducer.py

#!/usr/bin/env python 

# -*- coding:UTF-8 -*-

import sys

current_word = None    #当前单词

current_count = 0     #当前单词频数

word = None

for line in sys.stdin: 

    line = line.strip()    #删除开头和结尾的空格

    #解析mapper.py输出做为程序的输入,以tab做为分隔符

    word,count = line.split('\t',1)

    try:

   count = int(count)   #转换count从字符型为整型

    except ValueError:

       continue

    #要求mapper.py的输出作排序操做,以便对链接的word作判断,hadoop会自动排序

    if current_word == word:    #若是当前的单词等于读入的单词

       current_count += count    #单词频数加1

    else:

       if current_word:    #若是当前的单词不为空则打印其单词和频数

 

           print('%s\t%s' %(current_word,current_count))

       current_count = count   #不然将读入的单词赋值给当前单词,且更新频数

       current_word = word

 

if current_word == word   #输出最后一个word统计

    print('%s\%s' %(current_word,current_count))

#截图以下:

 

4、本地测试代码

  咱们能够在Hadoop平台运行以前在本地测试,校验mapper.py与reducer.py运行的结果是否正确。注意:测试reducer.py时须要对mapper.py的输出作排序(sort)操做,不过,Hadoop环境会自动实现排序。

 #在本地运行mapper.py:

cd /home/data/python/WordCount/

#记得执行: chmod +x /home/data/python//WordCount/mapper.py

cat input.txt | ./mapper.py

 

#在本地运行reducer.py

#记得执行:chmod +x /home/data/python//WordCount/reducer.py

cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py

 

#这里注意:利用管道符“|”将输出数据做为mapper.py这个脚本的输入数据,并将mapper.py的数据输入到reducer.py中,其中参数sort -k 1,1是将reducer的输出内容按照第一列的第一个字母的ASCII码值进行升序排序。

 

 

 

5、在Hadoop平台上运行代码

在hadoop运行代码,前提是已经搭建好hadoop集群

一、建立目录并上传文件

  首先在HDFS上建立文本文件存储目录,这里我建立为:/WordCound

hdfs dfs -mkdir /WordCound

#将本地文件input.txt上传到hdfs的/WordCount上。

hadoop fs -put /home/data/python/WordCount/input.txt /WordCount

hadoop fs -ls /WordCount       #查看在hdfs中/data/WordCount目录下的内容

 

二、执行MapReduce程序

  为了简化咱们执行Hadoop MapReduce的命令,咱们能够将Hadoop的hadoop-streaming-3.0.0.jar加入到系统环境变量/etc/profile中,在/etc/profile文件中添加以下配置:

首先在配置里导入hadoop-streaming-3.0.0.jar

vi /etc/profile

HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.0.0.jar 

export HADOOP_STREAM 

source /etc/profile    #刷新配置

 

#执行如下命令:

hadoop jar $HADOOP_STREAM -file /home/data/python/WordCount/mapper.py -mapper ./mapper.py -file /home/data/python/WordCount/reducer.py -reducer ./reducer.py -input /WordCount -output /output/word1

获得:

而后,输入如下命令查看结果:

hadoop fs -ls /output/word1

hadoop fs -cat /output/word1/part-00000    #查看分析结果

 

能够发现,结果与以前测试的时候是一致的,那么恭喜你,大功告成!

相关文章
相关标签/搜索