五、map端获取数据来源于那个表

一、需求

获取map端处理的数据来自于那个文件java

二、理解

主要是map端的setup方法有一个Context参数,里面包含了此map对应的split是来自于哪一个文件apache

三、代码

  • 一、GetSourceMapper.classapp

    package com.bigdata.surfilter.mapgetsource;
    
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
      import java.io.IOException;
    
      /**
       * [@Author](https://my.oschina.net/arthor) liufu
       * @CreateTime 2016/7/26  16:44
       */
      public class GetSourceMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
          //一个map默认处理一个block,那么他的数据必定来自于同一个文件
          String sourceFile = null;
          Text outPutText = null;
          [@Override](https://my.oschina.net/u/1162528)
          protected void setup(Context context) throws IOException, InterruptedException {
              FileSplit inputSplit = (FileSplit)context.getInputSplit();
              Path path = inputSplit.getPath();
              sourceFile = path.getName();
              outPutText = new Text();
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              outPutText.set(value.toString() + " recevie from " + sourceFile);
              context.write(outPutText, null);
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          protected void cleanup(Context context) throws IOException, InterruptedException {
              outPutText = null;
          }
      }
  • 二、ApplicationRun.classide

    package com.bigdata.surfilter;
    
      import com.bigdata.surfilter.flowcount.*;
      import com.bigdata.surfilter.mapgetsource.GetSourceMapper;
      import com.bigdata.surfilter.wordcount.WordCountMapper;
      import com.bigdata.surfilter.wordcount.WordCountReduce;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
      import java.io.IOException;
    
      /**
       * @Author liufu
       * @CreateTime 2016/7/25  15:55
       */
      public class ApplicationRun {
          public static void main(String[] args) throws IOException {
              Configuration conf = new Configuration();
    
              //在集群中使用hadoop jar 命令去启动,就不须要设置下面这两行,由于hadoop安装目录下的/etc/hadoop中配置文件有这些配置
      //        conf.set("fs.default.name", "hdfs://192.168.0.186:9000");
              //假装本身的身份为root
      //        System.setProperty("HADOOP_USER_NAME", "root");
    
              Job job = new Job(conf, "getsourcemapper");
    
              //经过classpath中主类找到jar
              job.setJarByClass(ApplicationRun.class);
    
              //job的map端和reduce端代码
              job.setMapperClass(GetSourceMapper.class);
    
              //设置map端和reduce输出的类型,这样才可以作反射获得对应的类
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(NullWritable.class);
    
              //job 如何读取数据,如何写出数据
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
    
              //job 的数据从哪里来;  绑定输入目录,能够使用setInputPaths, 也能够使用 addInputPaths
              FileInputFormat.setInputPaths(job, new Path("/mapgetsource/input1/"),new Path("/mapgetsource/input2/"));
    
              //写到哪里去
              FileOutputFormat.setOutputPath(job, new Path("/mapgetsource/output/"));
    
              //不须要reduce
              job.setNumReduceTasks(0);
    
              try {
                  boolean b = job.waitForCompletion(true);
                  System.exit(b == true ? 0 : 1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
          }
      }
相关文章
相关标签/搜索