5V特征:java
GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBasenode
心跳是每3秒一次,linux
心跳返回结果带有NameNode给该DataNode的命令如删除块,
复制块等apache
若是超过10分钟没有收到某个DataNode 的心跳,则认为该
节点不可用编程
DataNode启动后向NameNode注册,windows
经过后,周期性(1小时)的向NameNode上报全部的块信息centos
当DataNode读取block的时候,从新计算checksum,和建立
时的对比缓存
DataNode 在其文件建立后三周验证其checksum安全
NameNode服务器
DataNode
NodeManager
ResourceManager
建立fsimage文件,存储fsimage信息
建立edits文件
加载fsimage和edits文件
生成新的fsimage和edits文件
等待DataNode注册与发送Block Report
向NameNode注册、发送Block Report
namenode启动时会进入安全模式,此时只可读不可写
NameNode 启动过程
在安全模式下,文件系统不容许修改
目的,是在系统启动时检查各个datanode数据的有效性
进入安全模式的三种方式
$ bin/hdfs dfsadmin -safemode enter
$ bin/hdfs dfsadmin -safemode leave
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
优势
缺点:
package com.ct.test; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; public class TestDemo { FileSystem fs = null; // public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { // //// FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"), //// new Configuration(), //// "chen"); //// //// boolean success = fs.mkdirs(new Path("/test")); //// //// System.out.println(success); //// test.setUp(); //// test.testMkdir(); //// test.testDelete(); // // // // // } @Before //获取文件对象 public void setUp() { Configuration conf = new Configuration(); conf.set("dfs.replication", "7"); try { fs = FileSystem.get(new URI("hdfs://centos01:8020"), conf, "chen"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //建立文件夹 @Test public void testMkdir() throws IllegalArgumentException, IOException { boolean success = fs.mkdirs(new Path("/result")); System.out.println(success); } //删除文件夹 public void testDelete() throws IllegalArgumentException, IOException { fs.delete(new Path("/result"), true); } @Test //上传文件 public void testUpload() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/input/testUpload.log")); FileInputStream input = new FileInputStream("F:/test.txt"); IOUtils.copy(input, out, 1024); } @Test public void testDownload() throws IllegalArgumentException, IOException { FSDataInputStream input = fs.open(new Path("/input/testUpload.log")); FileOutputStream out = new FileOutputStream("F:/test-copy.txt"); IOUtils.copy(input, out, 1024); } @Test public void testList() throws FileNotFoundException, IllegalArgumentException, IOException { RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true); while(ri.hasNext()) { LocatedFileStatus next = ri.next(); next.getBlockLocations(); String group = next.getGroup(); long len = next.getLen(); String owner = next.getOwner(); FsPermission permission = next.getPermission(); long blockSize = next.getBlockSize(); short rep = next.getReplication(); System.out.println(permission+"\t"+owner+"\t"+group); System.out.println(len+"\t"+blockSize+"\t"+rep); BlockLocation[] blockLocations = next.getBlockLocations(); for (BlockLocation blktn : blockLocations) { System.out.println("length:"+blktn.getLength()); System.out.println("offset:"+blktn.getOffset()); System.out.println(Arrays.toString(blktn.getHosts())); } } } }
ResourceManager
NodeManager
ApplicationMaster
Container
Map和Reduce 计算框架,编程模型 “分而治之”的思想, 分布式并行计算
对一些独立元素组成的列表的每个元素进行制定的操做,可高度并行
// step 1: Map Class /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO update paragram public static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub }
对一个列表元素进行合并
// step 2: Reduce Class /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub } }
// step 3: Driver ,component job, implements Tool public int run(String[] args) throws Exception { // 1: get configration Configuration configuration = getConf(); // 2: create Job Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); // run jar job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1 input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // 3.2: map job.setMapperClass(ModuleMapper.class); //TODO update paragram job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3: reduce job.setReducerClass(ModuleReducer.class); //TODO job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
package com.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDemo extends Configured implements Tool { /** * map 任务的定义 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 偏移量 LongWritable * VALUEIN 一行文本 Text * KEYOUT 单词 Text * VALUEOUT 1 IntWritable * * map任务 * 将一行文本拆分红单词 * * */ public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value); //1. 单词拆分 String[] vals = value.toString().split(" "); //2. 遍历输出 for (String val : vals) { keyOut.set(val); valueOut.set(1); context.write(keyOut, valueOut); System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut); } } } /** * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 单词 Text * VALUEIN 单词次数的集合 list的元素 IntWritable * KEYOUT 单词 Text * VALUEOUT 总次数 IntWritable * */ public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable valueOut = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.print("keyIn:"+key+"\t\t["); //1. 求次数综合 int sum = 0; for (IntWritable value : values) { sum += value.get(); System.out.print(value+",\t"); } System.out.println("]"); //2. 输出 valueOut.set(sum); context.write(key, valueOut); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2. 设置map类和reduce类 job.setMapperClass(WCMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new WordCountDemo(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
适合大小表join,将小表缓存在内存中,join发生在map端
只缓存一次,在Mapper子类中重写setup方法,在setup方法中将小表文件装入内存中
Mapper子类中map方法读取大表
package com.join; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> { HashMap<String, String> cacheMap = new HashMap<String, String>(); // 首相将小表读入内存 // 该方法只在每次任务开始时加载一次 @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String path = "F:\\input\\join\\dept.log"; FileReader fr = new FileReader(path); BufferedReader br = new BufferedReader(fr); String line = null; while((line=br.readLine()) != null) { String[] vals = line.split("\t"); cacheMap.put(vals[0], vals[1]); } br.close(); fr.close(); } // map端根据两张表的key进行合并 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); String deptno = cacheMap.get(vals[2]); String dname = cacheMap.get(deptno); context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1])); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(MJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new MapJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
适合两张大表join
package com.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceJoin extends Configured implements Tool { /* * 1 技术部 * 1002 rose 1 */ public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyOut = new Text(); Text valueOut = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); if(vals.length == 2) { keyOut.set(vals[0]); valueOut.set(vals[1]); }else { keyOut.set(vals[2]); valueOut.set(vals[0]+"\t"+vals[1]); } context.write(keyOut, valueOut); } } /* * keyIn:1 * valueIn List{[1007 lily], [1002 rose], [1001 jack], [技术部]} */ // reduce端合并是依靠MapReduce shuffle过程当中将相同key的行放入同一台机器 public static class RJReducer extends Reducer<Text, Text, Text, Text> { ArrayList<String> employees = new ArrayList<String>(); @Override protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String department = null; employees.clear(); //这里要注意清空list for (Text tmp : valueIn) { String[] vals = tmp.toString().split("\t"); // 根据length判断这是张什么表 if(vals.length == 1) { department = vals[0]; }else if(vals.length == 2) { employees.add(tmp.toString()); } } for (String employee : employees) { context.write(keyIn, new Text(employee+"\t"+department)); } } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(RJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(RJReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new ReduceJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
安装maven
修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)
<localRepository>D:/repository</localRepository>
配置eclipse的maven环境
windows->preferences->maven->
->installations->add->勾选本身安装的maven ->user settings->选择mave家目录/conf/settings
Windows安装hadoop
配置hadoop的环境变量
添加环境变量 HADOOP_HOME=hadoop解压目录 在PATH环境变量中追加 %HADOOP_HOME%/bin;
测试
hadoop -h
eclipse安装插件
eclipse配置插件参数,链接HDFS
<!--关闭hdfs的文件权限控制--> <property> <name>dfs.permissions</name> <value>false</value> </property>
eclipse->windows->show views->other->输入MapReduce->点击map reduce locations
右击->new hadoop locations
Map/Reduce Master
Mapreduce(V2) host:[hostname] port:8032 //resourcemanager 的默认端口号
DFS Master
DFS Master host:[hostname] port:8020
将lo4j.perperties文件拷贝到src/main/resources
yarn jar pv.jar /input/2015082818 /output
yarn jar pv.jar 类的全限定名 /input/2015082818 /output
不一样包中可能有相同类名,因此要指定类的全限定名
MapReduce框架核心部分(设计精髓):内核
map() 输出开始 到 reduce()输入开始 此阶段是shuffle
input -> map -> shuffle -> reduce -> output
map shuffle phase
reduce shuffle phase
shuffle主要操做
partitioner - map
sorter - map & reduce
combiner: map phase局部聚合操做 不是全部的MapReduce程序均可以进行局部聚合的
compress:map phase的输出数据压缩 针对全部MapReduce程序均可以进行设置
group - reduce
全部操做都是针对map()输出的<key, value>数据进行的
当达到环形缓冲区内存的80%默认状况下,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)
溢写
并非当即将缓冲区中的数据溢写到本地磁盘,而是须要通过一些操做
依据此MapReduce Job中Reduce Task个数进行分区决定map输出的数据被哪一个reduce任务进行处理分析默认状况下,依据key采用HashPartitioner
// 经过取余将数据分配到哪一个reduce处理 HashPartitioner int getParitition(key, value, numreducetask) { return ( key.hashCode&Integer.maxValue)%numreducetask; }
会对每一个分区中的数据进行排序,默认状况下依据key进行排序
将分区排序后的数据写到本地磁盘的一个文件中
反复上述的操做,产生多个小文件
当溢写结束后
各个分区的数据合并在一块儿(当MapTask处理数据完成之后,告知AppMaster,而后AppMaster通知全部的ReduceTask,各个ReduceTask主动到已经完成的MapTask的本地磁盘,去拉取属于本身要处理的数据(分区中))
最后每一个分区造成一个文件(map输出的数据最后在个文件中),分区的,而且各个分区的数据已经进行了排序。
分组group
将相同key的value值存入到list集合,造成新的key, list(value),将key/value对数据传递给reduce()函数进行处理。
最后将(key, list(value))传给 reduce()
FileInputFormat.setMaxInputSplitSize(job, size); 设置切片最大值 FileInputFormat.setMinInputSplitSize(job, size); 设置切片最小值
FileInputFormat public List<InputSplit> getSplits(JobContext job){。。。} protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } // minSize<=maxSize<blockSize 提升并发 // minSize>blockSize 下降并发
job.setNumReduceTasks(2); HashParitioner 决定map输出的类被哪一个reduce处理
package com.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 不用serializable * * 用Hadoop的Writable * */ public class Flow implements Writable { private long up; private long down; private long sum; public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } @Override public String toString() { return up + "\t" + down + "\t" + sum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { up = in.readLong(); down = in.readLong(); sum = in.readLong(); } }
public static class MyPartitioner extends Partitioner<Text, Flow> { @Override public int getPartition(Text key, Flow value, int numPartitions) { if(value.getSum()<1024) { return 0; }else if(value.getSum()<10*1024) { return 1; } return 2; } }
只能按照key排序,若是须要多重排序,须要自定义key
在shuffle过程当中自动排序,无需手动调用方法
public class MyKey implements WritableComparable<MyKey> //要排序的类要实现WritableComparable接口 @Override public int compareTo(MyKey o) { long result = o.getSum() - this.getSum(); if(result>0) { return 1; }else if(result<0) { return -1; } return o.getPhone().compareTo(this.getPhone()); }
map端的小reduce,对每一个map后的value进行reduce,减小数据传输
能够经过设置job.setCombinerClass(WCReducer.class);设置combiner
先后效果对比
原始数据 hello world hello hadoop hello world hello java keyIn:hadoop [1, ] keyIn:hello [1, 1, 1, 1, ] keyIn:java [1, ] keyIn:world [1, 1, ] keyIn:hadoop [1, ] keyIn:hello [2, 2, ] keyIn:java [1, ] keyIn:world [1, 1, ]
根据需求将key中相同的字段做为同一个key以减小键值对,做为一种优化的手段
重写 RawComparator 方法合并key中相同字段
经过 job.setGroupingComparatorClass(Mygroup.class); 调用
public static class Mygroup implements RawComparator<Person> { @Override public int compare(Person o1, Person o2) { // TODO Auto-generated method stub return 0; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
map方法把文件的行号当成key,因此要用LongWritable。