putmerge程序的大致流程是?java
1、根据用户定义的参数设置本地目录和HDFS的目录文件程序员
2、提取本地输入目录中每一个文件的信息数据库
3、建立一个输出流写入到HDF文件网络
4、遍历本地目录中的每一个文件,打开一个输入流来读取该文件,剩下就是一个标准的Java文件复制过程了app
具体程序以下:框架
public static void main(String[] args) throws IOException {ide
Configuration conf = new Configuration();函数
FileSystem hdfs = FileSystem.get(conf);oop
FileSystem local = FieSystem.getLocal(conf);spa
// 设定输入目录与输出文件
Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);
try {
// 获得本地文件列表
FileStatus[] inputFiles = local.listStatus(inputDir);
// 生成HDFS输出流
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
// 打开本地输入流
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
那么如今有数据了,还要对它进行处理、分析以及作其余的操做。
MapReduce程序经过操做键/值对来处理数据,通常形式为
map: (K1, V1) -> list(K2, V2)
reduce:(K2, list(V2)) -> list(K3, V3)
Hadoop数据类型有哪些?
MapReduce框架并不容许它们是任意的类。
虽然咱们常常把某些键与值称为整数、字符串等,但它们实际上并非Integer、String等哪些标准的Java类。为了让键/值对能够在集群上移动,MapReduce框架提供了一种序列化键/值对的方法。所以,只有那些支持这种序列化的类可以在这个框架中充当键或者值。
更具体的Hadoop类型说明
实现Writable接口的类能够是值
而实现WritableComparable<T>接口的类既能够是键也能够是值
注意WritableComparable<T>接口是Writable和java.lang.Comparable<T>接口的组合,对于键而言,咱们须要这个比较,由于它们将在Reduce阶段进行排序,而值仅会被简单地传递。
键/值对常用的数据类型列表,这些类均实现WritableComparable接口
类 |
描述 |
BooleanWritable |
标准布尔变量的封装 |
ByteWritable |
单字节数的封装 |
DoubleWritable |
双字节数的封装 |
FloatWritable |
浮点数的封装 |
IntWritable |
整数的封装 |
LongWritable |
Long的封装 |
Text |
使用UTF-8格式的文本封装 |
NullWritable |
无键值时的站位符 |
如何自定义数据类型?
只要它实现了Writable(或WritableComparable<T>)接口。
定义一个Edge类型用于表示一个网络的边界
public class Edge implements WritableComparable<Efge> {
private String departureNode;
private String arrivalNode;
public String getDepartureNode() {
return departureNode;
}
// 说明如何读入数据
@Override
public void readFields(DataInput in) throws IOException {
departureNode = in.readUTF();
arrivalNode = in.readUTF();
}
// 说明如何写出数据
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(departureNode);
out.writeUTF(arrivalNode);
}
// 定义数据排序
@Override
public int compareTo(Edge o) {
return (departureNode.compareTo(o.departureNode) != 0)
? departureNode.compareTo(o.departureNode)
: arrivalNode.compareTo(o.arrivalNode);
}
}
Mapper类是什么?
一个类要做为mapper,需继承MapReduceBase基类并实现Mapper接口。
mapper和reducer的基类均为MapReduceBase类
其中包含一些函数或方法:
1、void configure(JobConf job),该函数提取XML配置文件或者应用程序主类中的参数,在数据处理以前调用该函数。
2、void close(),做为map任务结束前的最后一个操做,该函数完成全部的结尾工做,如关闭数据库链接、打开文件等。
Mapper接口负责数据处理阶段,它采用Mapper<K1, V1, K2, V2>Java泛型,这里键类和值类分别实现WritableComparable和Writable接口。
Mapper类只有是一个方法-map,用于处理一个单独的键/值对。
void map (K1 key,
V1 value,
OutputCollector<K2, V2> output,
Reporter reporter
) throws IOException
上面这个map函数的参数都是什么意思?
该函数处理一个给定的键/值对 (K1, V1),生成一个键/值对 (K2, V2) 的列表 (该列表页可能为空)
OutputCollector接收这个映射过程的输出
Reporter可提供对mapper相关附加信息的记录
Hadoop提供了一些有用的mapper实现,这些实现是?
类 |
描述 |
IdentityMapper<K, V> |
实现Mapper<K, V, K, V>, 将输入直接映射到输出 |
InverseMapper<K, V> |
实现Mapper<K, V, V, K>, 反转键/值对 |
RegexMapper<K> |
实现Mapper<K, Text, Text, LongWritable>, 为每一个常规表达式的匹配项生成一个 (match, 1) 对 |
TokenCountMapper<K> |
实现Mapper<K, Text, Text, LongWritable>, 当输入的值为分词时, 生成一个(token, 1) 对 |
Reducer是什么?
一个类要做为reducer,需继承MapReduceBase基类并实现Reducer接口。
以便于容许配置和清理。
此外,它还必须实现Reducer接口使其具备以下的单一方法:
void reduce (K2 key,
Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter
) throws IOException
当reducer任务接收来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序,并将相同键的值归并。而后调用reduce()函数,并经过迭代处理哪些与指定键相关联的值,生成一个 (可能为空的) 列表 (K3, V3)
OutputCollector接收reduce阶段的输出,并写入输出文件
Reporter可提供对reducer相关附加信息的记录,造成任务进度
一些很是有用的由Hadoop预约义的Reducer实现
类 |
描述 |
IdentityReducer<K, V> |
实现Reducer<K, V, K, V>, 将输入直接映射到输出 |
LongSumReducer<K> |
实现<K, LongWritable, K, LongWritable>, 计算与给定键相对应的全部值的和 |
注:虽然咱们将Hadoop程序称为MapReduce应用,可是在map和reduce两个阶段之间还有一个极其重要的步骤:将mapper的结果输出给不一样的reducer。这就是partitioner的工做。
初次使用MapReduce的程序员一般有一个误解?
仅须要一个reducer? 采用单一的reducer能够在处理以前对全部的数据进行排序。
No,采用单一的reducer忽略了并行计算的好处。
那么就应该使用多个reducer是么?但须要解决一个问题,如何肯定mapper应该把键/值对输出给谁。
默认的做法是对键进行散列来肯定reducer。Hadoop经过HashPartitioner类强制执行这个策略。但有时HashPartitioner会出错。
HashPartitioner会出什么错?
假如你使用Edge类来分析航班信息来决定从各个机场离港的乘客数目,这些数据多是:
(San Francisco, Los Angeles) Chuck Lam
(San Francisco, Dallas) James Warren
若是你使用HashPartitioner,这两行能够被送到不一样的reducer, 离港的乘客数目被处理两次而且两次都是错误的
如何为你的应用量身定制partitioner呢?
上面的状况,我但愿具备相同离港地的全部edge被送往相同的reducer,怎么作呢?只要对Edge类的departureNode成员进行散列就能够了:
public class EdgePartitioner implements Partitioner<Edge, Writable> {
@Override
public int getPartition (Edge key, Writable value, int numPartitions) {
return key.getDepartureNode().hashCode() % numPartitions;
}
@Override
public void configure(JobConf conf) { }
}
一个定制的partitioner只须要实现configure()和getPartition()两个函数,前者将Hadoop对做业的配置应用在patitioner上,然后者返回一个介于0和reduce任务数之间的整数,指向键/值对将要发送的reducer
Combiner:本地reduce
在许多MapReduce应用场景中,咱们不妨在分发mapper结果以前作一下 "本地Reduce"。再考虑一下WordCount的例子,若是做业处理的文件中单词 "the" 出现了574次,存储并洗牌一次 ("the", 574) 键/值对比许屡次 ("the", 1) 更为高效。这种处理步骤被称为合并。
预约义mapper和Reducer类的单词计数
public class WordCount {
public static void main (String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCount.class);
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(TokenCountMapper.class); // Hadoop本身的TokenCountMapper
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReduver.class); // Hadoop本身的LongSumReducer
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用Hadoop预约义的类TokenCountMapper和LongSumReducer,编写MapReduce分厂的容易,Hadoop也支持生成更复杂的程序,这里只是强调Hadoop容许你经过最小的代码量快速生成实用的程序。