map-reduce 数据分析java
1.Hadoop API 开发步骤linux
肯定目标——开发软件(使用Eclipse等工具)——测试结果算法
2.如何在eclipse安装map-reduce插件windows
(1)查找eclipse的安装目录网络
在linux下查找app
whereis eclipseeclipse
/usr/lib/eclipseide
在windows下就更简单了工具
把hadoop/contrib./eclipse-plugin 文件下的hadoop-0.20.2-eclipse-plugin.jar 放到eclipse/plugins文件夹下oop
重启eclipse
(2)打开window——preferences——Hadoop Map/Reduce配置选项,首先设置hadoop文件目录
Window -> Preferences 选择 “Hadoop Map/Reduce”,点击“Browse...”选择Hadoop文件夹的路径(在windows上)。
这个步骤与运行环境无关,只是在新建工程的时候能将hadoop根目录和lib目录下的全部jar包自动导入。
File -> New -> Project 选择“Map/Reduce Project”,而后输入项目名称,建立项目。插件会自动把hadoop根目录和lib目录下的全部jar包导入。
File -> New -> Mapper 建立Mapper,自动继承mapred包里面的MapReduceBase并实现Mapper接口。
注意:这个插件自动继承的是mapred包里旧版的类和接口,新版的Mapper得本身写。
Reducer同理
(3)openperspective-other-map/reduce 打开一个hadoop视图
(4)show view把hadoop视图显示出来
(5)会出现map/reduce locations选项卡
右键 ——New hadoop location配置一个新的配置
Location name :一个名字
Map/Reduce Master DFSMaster
Host: localhost
Port: 9001 Port 9000
配置好后,在右边会出现一个DFSLocations
---DisConnect 会出现HDFS文件系统的目录树能够上传文件
3.新建一个hadoop map-reduce任务
File——New——project——Map/Reduce Project
4.案例:
(1)数据筛选程序
任务要求:
-现有一批路由日志。须要提取MAC地址和时间,删去其余内容
Apr 23 11:49:54 hostapd: wlan0 STA 14:7d:c5:9e:fb:84
Apr 23 11:49:52 hostapd: wlan0 STA 74:e5:0b:9e:fb:84
Apr 23 11:49:50 hostapd: wlan0 STA cc:dy:c5:9e:fb:84
Apr 23 11:49:44 hostapd: wlan0 STA cc:7d:c5:ee:fb:84
Apr 23 11:49:43 hostapd: wlan0 STA 74:7d:c5:9e:gb:84
Apr 23 11:49:42 hostapd: wlan0 STA 14:7d:c5:9e:fb:84
算法思路:
Hadoop 网络模板程序
public class Test_1 extends Configured implements Tool{
enum Counter{
LINESKIP //出错的行
}
public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>{
public void map(LongWritable key , Text value, Context context)throws IOException,InterruptedException{
String line=value.toString();//读取源数据
try{
//数据处理
String[] lineSplit=line.split(“ ”);
String mont=lineSplit[0];
String time=lineSplit[1];
String mac=lineSplit[6];
Text out=new Text(month+’ ’+time+’ ‘+mac);
context.write(NullWritable.get(),out);//输出key \t value
}catch(java.long.ArrayIndexOutOfBoundsException e){
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,”Test_1”); //任务名
job.setJarByClass(Test_1.class); //指定class
FileInputFormat.addInputPath(job,new Path(args[0]));//输入路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出路径
job.setMapperClass(Map.class);//调用上面Map类做为Map任务代码
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class); 指定输出的KEY的格式
job.setOutputValueClass(Text.class);//指定输出的vlaue格式
job.watiForCompletion(true);
return job.isSuccessful ? 0:1;
}
public static void main (String[] args) throws Exception
{
//运行任务
int res=ToolRunner.run(new Confiuration(),new Test_1(),args);
System.exit(res);
}
}
运行程序
运行按钮——Run Configurations
Arguments:
program arguments 运行参数
hdfs://localhost:9000/user/james/input hdfs://localhost:9000/user/james/output
输入路径 输出路径
(2)倒排索引
任务要求
-现有一批电话通讯清单,记录了用户A拨打用户B的记录
-须要作一个倒排索引,记录拨打给用户B的全部用户A
13599999999 10086
13899999999 120
13944444444 13800138000
13722222222 13800138000
18800000000 120
13722222222 10086
18944444444 10086
任务输出必须以下所示,主叫以’|’分隔
10086 13599999999|13722222222|18944444444
120 18800000000|
13800138000 13944444444|13722222222|
算法思路
程序:
public class Test_2 extends Configured implements Tool{
enum Counter{
LINESKIP //出错的行
}
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
public void map(LongWritable key , Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();//读取源数据
try{
//数据处理
String[] lineSplit=line.split(“ ”);
String anum=lineSplit[0];
String bnum=lineSplit[1];
Context.write(new Text(bnum),new Text(anum));//输出
}catch(java.long.ArrayIndexOutOfBoundsException e){
Context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
public static class Reduce extents Reducer<Text,Text,Text,Text>{
public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException{
String valueString;
String out=””;
for(Text value:values){
valueString=value.toString();
out+=valueString+”|”;
}
Context.write(key,new Text(out));
}
}
@Override
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,”Test_2”); //任务名
job.setJarByClass(Test_2.class); //指定class
FileInputFormat.addInputPath(job,new Path(args[0]));//输入路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出路径
job.setMapperClass(Map.class);//调用上面Map类做为Map任务代码
job.setReduceClass(Reduce.class); //调用上面Reduce类做为Reduce任务代码
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class); 指定输出的KEY的格式
job.setOutputValueClass(Text.class);//指定输出的vlaue格式
job.watiForCompletion(true);
return job.isSuccessful ? 0:1;
}
public static void main (String[] args) throws Exception
{
//运行任务
int res=ToolRunner.run(new Confiuration(),new Test_2(),args);
System.exit(res);
}
}
5.将程序打包输出
在project右键——Export——Jar File——选择位置(选上.classpath .project)(ttest_2.jar)——next——Main class(选择main class)——Finish
运行test_2.jar 文件就像运行wordCount程序同样
hadoop jar /home/james/hadoop/source/test_2.jar /home/james/Test_2 /home/james/output