这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战前端
在函数式语言里,map表示对一个列表(List)中的每一个元素作计算,reduce表示对一个列表中的每一个元素作迭代计算。java
它们具体的计算是经过传入的函数来实现的,map和reduce提供的是计算的框架。web
MapReduce
将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map
和Reduce
MapReduce
采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分红许多独立的分片(split),这些分片能够被多个Map任务并行处理MapReduce
设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,由于,移动数据须要大量的网络传输开销MapReduce
框架采用了Master/Slave
架构,包括一个Master
和若干个Slave
。Master
上运行JobTracker
(yarn上ResourceManager),Slave
上运行TaskTracker
(yarn上Nodemanager)MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Taskapache
结点说明:编程
用户编写的MapReduce
程序经过Client
提交到JobTracker
端,用户可经过Client
提供的一些接口查看做业运行状态。markdown
JobTracker
负责资源监控和做业调度;JobTracker
监控全部TaskTracker
与Job
的健康情况,一旦发现失败,就将相应的任务转移到其余节点;JobTracker
会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源。网络
TaskTracker
会周期性地经过“心跳”将本节点上资源的使用状况和任务的运行进度汇报给JobTracker
,同时接收JobTracker
发送过来的命令并执行相应的操做(如启动新任务、杀死任务等)。TaskTracker
使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task
获取到一个slot
后才有机会运行,而Hadoop
调度器的做用就是将各个TaskTracker
上的空闲slot
分配给Task
使用。slot 分为Map slot
和Reduce slot
两种,分别供Map Task
和Reduce Task
使用。架构
Task分为Map Task
和Reduce Task
两种,均由TaskTracker
启动。app
结构缺点:框架
架构思想 体系结构
ResourceManager • 处理客户端请求 • 启动/监控ApplicationMaster • 监控NodeManager • 资源分配与调度 NodeManager • 单个节点上的资源管理 • 处理来自ResourceManger的命令 • 处理来自ApplicationMaster的命令 ApplicationMaster • 为应用程序申请资源,并分配给内部任务 • 任务调度、监控与容错
步骤1:用户编写客户端应用程序,向YARN提交应用程序,提交的内容包括
ApplicationMaster
程序、启动ApplicationMaster
的命令、用户程序等 步骤2:YARN
中的ResourceManager
负责接收和处理来自客户端的请求,为应用程序分配一个容器,在该容器中启动一个ApplicationMaster
步骤3:ApplicationMaster
被建立后会首先向ResourceManager
注册 步骤4:ApplicationMaster
采用轮询的方式向ResourceManager
申请资源 步骤5:ResourceManager
以“容器”的形式向提出申请的ApplicationMaster
分配资源 步骤6:在容器中启动任务(运行环境、脚本) 步骤7:各个任务向ApplicationMaster
汇报本身的状态和进度 步骤8:应用程序运行完成后,ApplicationMaster
向ResourceManager
的应用程序管理器注销并关闭本身
➢ 不一样的Map任务之间不会进行通讯 ➢ 不一样的Reduce任务之间也不会发生任何信息交换 ➢ 用户不能显式地从一台机器向另外一台机器发送消息 ➢ 全部的数据交换都是经过MapReduce框架自身去实现的
例子
Hadoop 自定义的序列化接口。当要在进程间传递对象或持久化对象的时候,就须要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。Map 和 Reduce 的 key、value 数据格式均为 Writeable 类型,其中 key 还需实现WritableComparable 接口。Java 基本类型对应 writable 类型的封装以下:
Java primitive | Writable implementation |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | ShortWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
enum | EnumWritable |
Map | MapWritable |
(2)InputFormat 用于描述输入数据的格式。提供两个功能:
getSplits()
数据分片,按照某个策略将输入数据切分红若干个split
,以便肯定Map
任务个数以及对应的split
;createRecordReader()
,将某个split
解析成一个个key-value
对。FileInputFormat
是全部以文件做为数据源的InputFormat
实现基类,小文件不会进行分片,记录读取调用子类TextInputFormat
实现;
TextInputFormat
是默认处理类,处理普通文本文件,以文件中每一行做为一条记录,行起始偏移量为 key
,每一行文本为 value;CombineFileInputFormat
针对小文件设计,能够合并小文件;KeyValueTextInputFormat
适合处理一行两列并以tab
做为分隔符的数据;NLineInputFormat
控制每一个 split
中的行数。(3)OutputFormat
主要用于描述输出数据的格式。Hadoop 自带多种 OutputFormat 的实现。
TextOutputFormat
默认的输出格式,key 和 value 中间用 tab 分隔;SequenceFileOutputFormat
,将 key 和 value 以 SequenceFile 格式输出;SequenceFileAsOutputFormat
,将 key 和 value 以原始二进制格式输出;MapFileOutputFormat
,将 key 和 value 写入 MapFile 中;MultipleOutputFormat
,默认状况下 Reducer 会产生一个输出,用该格式能够实现一个Reducer 多个输出。(4)Mapper/Reducer
封装了应用程序的处理逻辑,主要由 map、reduce 方法实现。
(5)Partitioner
根据 map 输出的 key 进行分区,经过 getPartition()方法返回分区值,默认使用哈希函
数。分区的数目与一个做业的reduce任务的数目是同样的。HashPartitioner是默认的Partioner。
一、计数统计类应用 仿照 WordCount 例子,编写“TelPubXxx”类实现对拨打公共服务号码的电话信息的统计。给出的一个文本输入文件以下,第一列为电话号码、第二列为公共服务号码,中间以空格隔开。 13718855152 11216810117315 110 39451849 112 13718855153 110 13718855154 112 18610117315 114 18610117315 114 MapReduce 程序执行后输出结果以下,电话号码之间用“|”链接: 110 13718855153|16810117315 112 13718855154|39451849|13718855152 114 18610117315|18610117315
运行成功
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TelPubZqc {
public static class TelMap extends Mapper<Object, Text, Text, Text> {
private Text pub = new Text();
private Text tel = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//Map (Key Value)
String[] s=value.toString().split(" ");
tel.set(s[0]);
pub.set(s[1]);
context.write(pub,tel);
}
}
public static class TelReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder s= new StringBuilder();
for (Text val : values) {
if(s.toString().equals("")){
s.append(val.toString());
}
else s.append("|").append(val.toString());
}
result.set(String.valueOf(s));
context.write(key, result);// 输出结果
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();// 加载hadoop配置
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"input/input.txt","output/outputTel"};
if (otherArgs.length < 2) {
System.err.println("Usage: PubTel <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");// 设置环境参数
job.setJarByClass(TelPubZqc.class);// 设置程序主类
job.setMapperClass(TelMap.class);// 设置用户实现的Mapper类
job.setCombinerClass(TelReducer.class);
job.setReducerClass(TelReducer.class);// 设置用户实现的Reducer类
job.setOutputKeyClass(Text.class);// 设置输出key类型
job.setOutputValueClass(Text.class); // 设置输出value类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径
System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做业并等待结束
}
}
复制代码
二、两表联结 Join 应用 仿照单表关联例子,编写“RelationXxx”类实现多表关联。中文文本文件转成 UTF-8 编码格式,不然会乱码。 输入 score.txt:
studentid | classid | score |
---|---|---|
s003001 | fd3003 | 84 |
s003001 | fd3004 | 90 |
s003002 | fd2001 | 71 |
s002001 | fd1001 | 66 |
s001001 | fd1001 | 98 |
s001001 | fd1002 | 60 |
输入 major.txt: | ||
classid | classname | deptname |
-- | -- | -- |
fd1001 | 数据挖掘 | 数学系 |
fd2001 | 电子工程 | 电子系 |
fd2002 | 电子技术 | 电子系 |
fd3001 | 大数据 | 计算机系 |
fd3002 | 网络工程 | 计算机系 |
fd3003 | Java 应用 | 计算机系 |
fd3004 | web 前端 | 计算机系 |
输出结果: | ||
classid | classname | deptname |
-- | -- | -- |
fd1001 | 数据挖掘 | 数学系 |
fd1001 | 数据挖掘 | 数学系 |
fd2001 | 电子工程 | 电子系 |
fd3003 | Java 应用 | 计算机系 |
fd3004 | web 前端 | 计算机系 |
![]() |
将其中须要的东西传到hdfs中去。
没有报错。查看结果
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class RelationZqc {
public static int time = 0;
public static class RelationMap extends Mapper<Object, Text, Text, Text> {
private Text classID = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String filename=((FileSplit)context.getInputSplit()).getPath().getName();
String[] s = value.toString().split(" ");
if(filename.equals("score.txt")){
classID.set(s[1]);
String val="1," + s[0] + "," + s[2];
context.write(classID,new Text(val));
}
else if (filename.equals("major.txt")){
if(!s[0].equals("classid")){
classID.set(s[0]);
String val = "2," + s[1] + "," + s[2];
context.write(classID,new Text(val));
}
}
}
}
public static class RelationReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String[][] studentTable=new String[10][2];
String[] data;
String classID = "nil";
if(time == 0){
context.write(new Text("classid"), new Text("classname deptname studentid score"));
time++;
}
int cnt = 0;
for (Text val : values) {
data = val.toString().split(",");
if(data[0].equals("1")){
studentTable[cnt][0] = data[1];
studentTable[cnt][1] = data[2];
cnt = cnt + 1;
}
else if(data.length == 3 && data[0].equals("2")){
classID = data[1] + " " + data[2];
}
}
for(int i = 0; i < cnt; i++){
if(classID.equals("nil")) continue;
String s=classID+" "+studentTable[i][0]+" "+studentTable[i][1];
result.set(s);
context.write(key, result);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();// 加载hadoop配置
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"input/score.txt", "input/major.txt", "output/outputRelationZqc"};
// String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: Relation <in> <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "RelationZqc");// 设置环境参数
job.setJarByClass(RelationZqc.class);// 设置程序主类
job.setMapperClass(RelationMap.class);// 设置用户实现的Mapper类
job.setReducerClass(RelationReduce.class);// 设置用户实现的Reducer类
job.setOutputKeyClass(Text.class);// 设置输出key类型
job.setOutputValueClass(Text.class); // 设置输出value类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径
System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做业并等待结束
}
}
复制代码
三、简单排序类应用编写 MapReduce 程序“SortXxx” 类,要求输入文件 sort1.txt、sort2.txt、sort3.txt 内容,由程序随机生成若干条数据并存储到 HDFS 上,每条数据占一行,数据能够是日期也能够是数字;输出结果为两列数据,第一列是输入文件中的原始数据,第二列是该数据的排位。 运行成功
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class SortZqc {
public static class SortMap extends Mapper<Object,Text,IntWritable,IntWritable>{
private static IntWritable data = new IntWritable();
//实现map函数
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
String line=value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
public static class SortReduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
IntWritable n = new IntWritable(1); //用n表明位次
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
for(IntWritable val:values){
context.write(key,n);
n = new IntWritable(n.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();// 加载hadoop配置
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"input/sort1.txt","input/sort2.txt","input/sort3.txt","output/outputSortZqc"};
if (otherArgs.length < 2) {
System.err.println("Usage: data sort <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "data sort");// 设置环境参数
job.setJarByClass(SortZqc.class);// 设置程序主类
job.setMapperClass(SortMap.class);// 设置用户实现的Mapper类
job.setCombinerClass(SortReduce.class);
job.setReducerClass(SortReduce.class);// 设置用户实现的Reducer类
job.setOutputKeyClass(IntWritable.class);// 设置输出key类型
job.setOutputValueClass(IntWritable.class); // 设置输出value类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径
System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做业并等待结束
}
}
复制代码
小生凡一,期待你的关注。