先举一个简单的例子: 打个比方咱们有三我的斗地主, 要数数牌够不够, 一种最简单的方法能够找一我的数数是否是有54张(传统单机计算); 还能够三我的各分一摞牌数各自的(Map阶段), 三我的的总数加起来汇总(Reduce阶段).java
因此MapReduce的思想即: "分治"+"汇总". 大数据量下, 一台机器处理不了的数据, 就用多台机器, 以分布式集群的形式来处理.apache
关于Map与Reduce有不少文章将这两个词直译为映射和规约, 其实Map的思想就是各自负责一块实行分治, Reduce的思想即: 将分治的结果汇总. 干吗翻译的这么生硬呢(故意让人以为大数据很神秘么?)编程
仍是很简单的模式: 包含8个步骤app
咱们那最简单的单词计数来举例(号称大数据的HelloWorld), 先让你们跑起来看看现象再说.分布式
按照MapReduce思想有两个主要步骤, Mapper与Reducer, 剩余的东西Hadoop都帮助咱们实现了, 先入门实践再了解原理;ide
MapReducer有两种运行模式: 1,集群模式(生产环境);2,本地模式(试验学习)oop
前提: 学习
1, 下载一个Hadoop的安装包, 放到本地, 并配置到环境变量里面;大数据
2, 下载一个hadoop.dll放到hadoop的bin目录下翻译
建立Maven工程, 导入依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.10.1</version> </dependency>
数据文件D:\Source\data\demo_result1\xx.txt
hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
开始编写代码
第一步, 建立Mapper类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class BaseMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); Text keyout = new Text(); LongWritable valueout = new LongWritable(1); for (String word : words) { keyout.set(word); context.write(keyout, valueout); } } }
第二步, 建立Reducer类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class BaseReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int x = 0; for (LongWritable value : values) { x += value.get(); } context.write(key, new LongWritable(x)); } }
第三步, 建立Job启动类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MainJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), MainJob.class.getName()); //集群运行时候: 要打包 job.setJarByClass(MainJob.class); //1, 读取输入文件解析类 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in")); //2, 设置Mapper类 job.setMapperClass(BaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3, 设置shuffle阶段的分区, 排序, 规约, 分组 //7, 设置Reducer类 job.setReducerClass(BaseReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //8, 设置文件输出类以及输出地址 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_result1")); //启动MapReduceJob boolean completion = job.waitForCompletion(true); return completion?0:1; } public static void main(String[] args) { MainJob mainJob = new MainJob(); try { Configuration configuration = new Configuration(); configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); int run = ToolRunner.run(configuration, mainJob, args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }