使用mapreduce计算环比的实例

  最近作了一个小的mapreduce程序,主要目的是计算环比值最高的前5名,原本打算使用spark计算,但是本人目前spark还只是简单看了下,所以就先改用mapreduce计算了,今天和你们分享下这个例子,也算是对本身写的程序的总结了。java

  首先解释下环比,例如咱们要算本周的环比,那么计算方式就是本周的数据和上周数字的差值除以上周数值就是环比了,若是是月的环比就是本月和上月数据的差值除以上月数字就是本月环比了。不过本mapreduce实例不会直接算出比值,只是简单求出不一样时间段数值的差值,最终环比结果由业务系统进行运算了。正则表达式

  下面看看本人构造的测试数据了,测试数据分红两个文件,文件一的内容以下所示:算法

guanggu,1;90
hongshan,1;80
xinzhou,1;70
wuchang,1;95
hankou,1;85
hanyang,1;75

  第二个文件的测试数据以下:shell

guanggu,2;66
hongshan,2;68
xinzhou,2;88
wuchang,2;59
hankou,2;56
hanyang,2;38

  这里每行第一列的字段就是key了,key和value使用逗号分割,1;90是value值,value值包含两个内容,1为时间段标记,90就是数值,你们能够看到同一个key会有两个不一样的时间段(使用1和2来标记)。apache

  Mapreduce的运算逻辑以下:首先第一步咱们要求出环比数值,第二步就是排序了,作这个算法我曾考虑许久就是想把求环比值和排序两个过程合并,可是最后发现很难作到,只好将整个运算过程拆分红两个不一样mapreduce,第一个mapreduce计算环比,第二个进行排序,两者是迭代关系。这里解释下分红两个mapreduce缘由吧,主要缘由就是最原始数据很难把两个不一样时间段的数据按照key合并在一块儿变成一行数据,所以mapreduce计算时候必须有一个过程就是执行相同key合并操做,所以不得不分红两个步骤完成计算。数组

  接下来就是具体代码了,首先是第一个mapreduce,用来计算环比值的mapreduce了,它的map实现代码以下:服务器

import java.io.IOException;

import org.apache.hadoop.io.Text;
// 使用输入为object,text,输出为Text,Text的数据结构,Object实际上是行号,在本计算里意义不大,Text就是每行的内容
public class MrByAreaHBMap extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, Text>{
    
    private static String firstSeparator = ",";//每行的key和value值使用逗号分割

    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        /* 本map的逻辑很是简单,就是从行里拆分key和value,对于有些初学者可能疑惑,咱们到底如何让相同的key合并在一块儿了?这个就要看reduce计算了*/        
        Text areaKey = new Text();// reduce输入是Text类型
        Text areaVal = new Text();// reduce输入是Text类型
        String line = value.toString();
        if (line != null && !line.equals("")){
            String[] arr = line.split(firstSeparator);

            areaKey.set(arr[0]);
            areaVal.set(arr[1]);
            
            context.write(areaKey, areaVal);
        }

    }

}

  下面是reduce代码了,具体以下:数据结构

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MrByAreaHBReduce extends Reducer<Text, Text, Text, Text>{
    
    private static String firstSeparator = ";";
    private static String preFlag = "1";
    private static String nextFlag = "2";

    /*reduce的输入也是key,value的形式,不过这个输入是会将map里相同的key的值进行合并,合并形式就是一个数组形式,不过reduce方法里是经过迭代器进行数值处理*/
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        int num1 = 0,num2 = 0,hbNum = 0;
        for(Text value : values){
            String inVal = value.toString();
            String[] arr = inVal.split(firstSeparator);
            // 下面的逻辑是经过不一样时间段标记获取不一样时间段数值
            if (arr[0].equals(preFlag)){
                num1 = Integer.valueOf(arr[1]);
            }
            if (arr[0].equals(nextFlag)){
                num2 = Integer.valueOf(arr[1]);
            }
        }
        hbNum = num1 - num2;// 这里计算环比
        Text valueText = new Text();
        valueText.set(hbNum + "");
        Text retKey = new Text();
        /* 对reduce的key进行了修改,将原来key和各个时间段的数值合并在一块儿,这样读取计算结果时候就能够读取到原始计算数据了,这是key,value计算模式能够简陋的无奈之举*/
        retKey.set(key.toString() + firstSeparator + num1 + firstSeparator + num2);
        context.write(valueText,retKey);
    }

}

  求环比的mapredue代码介绍结束了,下面是排序的算法,排序算法更加简单,在计算环比的mapreduce输出里我将环比值和原始key进行了互换,而后输出到结果文件里,这个结果文件就是第二个mapreduce的输入了,下面咱们就要对这个新key进行排序,mapredcue计算模型里从map到reduce有默认的排序机制,若是map输出的key是字符类型那么排序规则就是按照字典进行排序,若是key是数字,那么就会按照数字由小到大进行排序,下面就是排序mapreduce的具体算法,map代码以下:app

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MrByAreaSortMap extends
        Mapper<LongWritable, Text, IntWritable, Text> {
    /* 咱们须要的排序是按照key的数值排序,不过这个排序是map的输出才作的,所以代码里输出的key使用了IntWritable类型 
       其实排序的map逻辑很是简单就是保证map的输出key是数字类型便可
    */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        /*reduce的输出结果文件格式是按照空格分隔的,不过也搞不清有几个空格,或者是tab分割了,这里使用正则表达式s+就不怕多少空格和tab了*/
        String[] arr = line.split("\\s+");
        IntWritable outputKey = new IntWritable(Integer.valueOf(arr[0]));
        Text outputValue = new Text();
        outputValue.set(arr[1]);
        context.write(outputKey, outputValue);
    }
}

  reduce代码以下:eclipse

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* reduce代码很让人吃惊吧,就是把map结果原样输出便可 */
public class MrByAreaSortReduce extends
        Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text text : values){
            context.write(key, text);
        }
    }

}

  代码里的注释对代码逻辑进行了详细的解释,这里就不累述了。

  下面就是调用两个mapreduce的main函数了,也就是咱们该如何执行mapreduce的方式,这个main函数仍是很是有特色的,特色一就是两个mapreduce有迭代关系,具体就是第一个mapredcue执行完毕后第二个mapredcue才能执行,或者说第一个mapredcue的输出就是第二个mapredcue的输入,特色二就是排序计算里咱们使用了map到reduce过程及shuffle过程里的默认排序机制,那么该机制运用可不是像mapreduce代码那么简单了,其实背后须要咱们更加深刻理解mapreduce的原理,这里咱们直接看代码了,代码以下:

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MrByAreaJob {
    public static void main(String[] args) throws IOException {
        // 一个mapreduce就是一个job 一个job须要一个单独的Configuration,我开始让两个job公用Configuration,最后mr报错
        Configuration conf01 = new Configuration();
        ControlledJob conJobHB = new ControlledJob(conf01);
        
        // 下面代码不少文章里都会提到这里就很少说了
        Job jobHB = new Job(conf01,"hb");
        jobHB.setJarByClass(MrByAreaJob.class);
        jobHB.setMapperClass(MrByAreaHBMap.class);
        jobHB.setReducerClass(MrByAreaHBReduce.class);
        jobHB.setMapOutputKeyClass(Text.class);
        jobHB.setMapOutputValueClass(Text.class);
        jobHB.setOutputKeyClass(Text.class);
        jobHB.setOutputValueClass(Text.class);
        
        conJobHB.setJob(jobHB);
        
        FileInputFormat.addInputPath(jobHB, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobHB, new Path(args[1]));
        
        Configuration conf02 = new Configuration();
        Job jobSort = new Job(conf02,"sort");
        jobSort.setJarByClass(MrByAreaJob.class);
        jobSort.setMapperClass(MrByAreaSortMap.class);
        jobSort.setReducerClass(MrByAreaSortReduce.class);
        // Partitioner是shuffle的一个步骤,一个Partitioner对应一个reduce
        // 假如这个mapredue有多个reduce,咱们如何保证排序的全局一致性,所以这里须要进行处理
        jobSort.setPartitionerClass(PartitionByArea.class);
        // map对数值排序默认是由小到大,可是需求是由大到小,所以须要咱们改变这种排序
        jobSort.setSortComparatorClass(IntKeyComparator.class);
        jobSort.setMapOutputKeyClass(IntWritable.class);
        jobSort.setMapOutputValueClass(Text.class);
        
        jobSort.setOutputKeyClass(IntWritable.class);
        jobSort.setOutputValueClass(Text.class);
        
        ControlledJob conJobSort = new ControlledJob(conf02);
        conJobSort.setJob(jobSort);
        
        // 这里添加job的依赖关系
        conJobSort.addDependingJob(conJobHB);
        
        // 能够看到第一个mapreduce的输出就是第二个的输入
        FileInputFormat.addInputPath(jobSort, new Path(args[1]));
        FileOutputFormat.setOutputPath(jobSort, new Path(args[2]));
        
        // 主控制job
        JobControl mainJobControl = new JobControl("mainHBSort");
        
        mainJobControl.addJob(conJobHB);
        mainJobControl.addJob(conJobSort);
        
        Thread t = new Thread(mainJobControl);
        t.start();
        
        while(true){
            if (mainJobControl.allFinished()){
                System.out.println(mainJobControl.getSuccessfulJobList());
                mainJobControl.stop();
                break;
            }
        }
    }
}

  这里有两个类尚未介绍,一个是IntKeyComparator,这是为了保证排序的mapreduce结果是按数字由大到小排序,代码以下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class IntKeyComparator extends WritableComparator {

    protected IntKeyComparator() {
        super(IntWritable.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return -super.compare(a, b);
    }
    
    

}

  另外一个类就是PartitionByArea,这个是保证排序不会由于reduce设置的个数而不能保证排序的全局一致性,代码具体以下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionByArea<IntWritable, Text> extends Partitioner<IntWritable, Text> {

    @Override
    public int getPartition(IntWritable key, Text value, int numReduceTasks) {
        int maxValue = 50;
        int keySection = 0;
        
        // numReduceTasks就是默认的reduce任务个数
        if (numReduceTasks > 1 && key.hashCode() < maxValue){
            int sectionValue = maxValue / (numReduceTasks - 1);
            int count = 0;
            while((key.hashCode() - sectionValue * count) > sectionValue){
                count++;
            }
            keySection = numReduceTasks - 1 - count;
        }
        
        return keySection;
    }

}

  这里特别要讲解的是PartitionByArea,这个原理我花了好一段时间才理解过来,partition是map输出为reduce对应作的分区,通常一个partition对应一个reduce,若是咱们将reduce任务槽设置为一个那么就不用更改Partition类,可是实际生产状况下reduce每每会配置多个,这个时候保证数据的总体排序就十分重要了,那么咱们如何保证其数据的总体有序了,这个时候咱们要找到输入数据的最大值,而后让最大值除以partition的数量的商值做为分割数据的边界,这样等分就能够保证数据的总体排序的有效性了。

  如今全部的代码都介绍完毕了,下面就是咱们该如何让这个代码运行了,我在写本代码时候使用的是ide是eclipse,不过我没有使用mapreduce插件,而是直接放在服务器上运行,下面我来描述下运行该mr的方式,具体以下:

  首先我在装有hadoop服务的服务器上使用root用户建立一个属于我本身的文件夹,这里文件夹的名字叫作xiajun,我经过ftp将源文件传递到xiajun目录下的javafile文件夹,执行以下命令:

mkdir /xiajun/javafile
javac –classpath /home/hadoop/hadoop/hadoop-core-0.20.2-cdh3u4.jar –d /xiajun/javaclass /xiajun/ javafile/*.java

  以上命令是编译源文件,将javafile文件夹的java代码编译到javaclass目录下。

  

Jar –cvf /xiajun/mymr.jar –C /xiajun/javaclass/ .

  这里将javaclass目录下class文件打成jar包,放在xiajun目录下。

  接下来咱们使用hadoop用户登陆:

su – hadoop

  之因此使用root用户编译,打jar包缘由是个人hadoop用户没有权限上传文件不得已而为之了。

  咱们首先将测试数据上传到HDFS上,接下来执行以下命令:

cd /hadoop/bin

  切换目录到bin目录下,而后执行:

hadoop jar mymr.jar cn.com.TestMain  输入目录  输出目录

  这里输入能够是具体文件也能够是目录,输出目录在HDFS上要不存在,若是存在hadoop会没法确认任务是否已经执行完毕,就会强制终止任务。

  两个mapreduce迭代执行日志很是让人失望,所以若是咱们发现任务没法正常执行,我如今都是一个个mapredcue执行查看错误日志。

  最后咱们看看应用服务应该如何调用这个mapreduce程序,这里我使用远程调用shell 的方式,代码以下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;


public class TestMain {

    /**
     * @param args
     * @throws IOException 
     * @throws ClassNotFoundException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) {
        String hostname = "192.168.1.200";
        String username = "hadoop";
        String pwd = "hadoop";
        
        Connection conn = new Connection(hostname);
        Session sess = null;
        long begin = System.currentTimeMillis();
        try {
            conn.connect();
            boolean isAuthenticated = conn.authenticateWithPassword(username, pwd);
            sess = conn.openSession();
            sess.execCommand("cd hadoop/bin && hadoop jar /xiajun/mymr.jar com.test.mr.MrByAreaJob /xiajun/areaHBinput /xiajun/areaHBoutput58 /xiajun/areaHBoutput68");
            
            InputStream stdout = sess.getStdout();
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
            StringBuilder sb = new StringBuilder();
            
            while(true){
                String line = br.readLine();
                if (line == null) break;
                sb.append(line);
            }
            
            System.out.println(sb.toString());
            
            long end = System.currentTimeMillis();
            System.out.println("耗时:" + (begin - end)/1000 + "秒");
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            sess.close();
            conn.close();
        }
    }

}

  好了,本文就此结束了。

相关文章
相关标签/搜索