首先查看数据源结构:java
CREATE TABLE `article` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`allowed_add_tag` int(2) DEFAULT NULL,
`attitudes` varchar(255) DEFAULT NULL,
`attitudes_id` int(11) DEFAULT NULL,
`banana_count` int(11) DEFAULT NULL,
`big_cover_image` varchar(255) DEFAULT NULL,
`channel_id` int(11) DEFAULT NULL,
`channel_name` varchar(255) DEFAULT NULL,
`channel_path` varchar(255) DEFAULT NULL,
`comment_count` int(11) DEFAULT NULL,
`contribute_time` datetime DEFAULT NULL,
`cover_image` varchar(255) DEFAULT NULL,
`description` varchar(255) DEFAULT NULL,
`essense` int(2) DEFAULT NULL,
`favorite_count` int(11) DEFAULT NULL,
`latest_active_time` datetime DEFAULT NULL,
`latest_comment_time` datetime DEFAULT NULL,
`like_count` int(11) DEFAULT NULL,
`link` varchar(255) DEFAULT NULL,
`parent_channel_id` int(11) DEFAULT NULL,
`parent_channel_name` varchar(255) DEFAULT NULL,
`parent_realm_id` int(11) DEFAULT NULL,
`realm_id` int(11) DEFAULT NULL,
`realm_name` varchar(255) DEFAULT NULL,
`recommended` int(2) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`tag_list` varchar(255) DEFAULT NULL,
`title` varchar(255) DEFAULT NULL,
`top_level` int(2) DEFAULT NULL,
`tudou_domain` int(2) DEFAULT NULL,
`type_id` int(11) DEFAULT NULL,
`user_avatar` varchar(255) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`view_count` int(11) DEFAULT NULL,
`view_only` int(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13103 DEFAULT CHARSET=utf8mb4;
复制代码
这里我将其中的数据导出为csv文件。正则表达式
在这个例子中,我要作的是根据帖子发布时间,统计全天中每隔30分钟的发帖个数。sql
InputFormat 接口 - 该接口指定输入文件的内容格式。shell
其中getSplits函数将全部输入数据分红numSplits个split,每一个split交给一个map task处理。bash
getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每一个record解析成key/value对。app
首先先编写工具类Times.java - period(str:String, format:String)方法,该方法的做用为:负载均衡
根据传入的字符串和时间格式获取一天中改时间的时间区间,如:dom
输入:"2018-10-18 22:05:11", "yyyy-MM-dd HH:mm:ss"分布式
输出: "201810182200-201810182230"ide
方法以下:
public static String period(String time, String format) {
Objects.requireNonNull(time);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
int m = dateTime.getMinute();
LocalDateTime start = dateTime.withMinute(m < 30 ? 0 : 30);
LocalDateTime end = null;
if (m < 30) {
end = dateTime.withMinute(30);
} else {
end = dateTime.plusHours(1);
end = end.withMinute(0);
}
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
return start.format(dateTimeFormatter) + "-" + end.format(dateTimeFormatter);
}
复制代码
测试输入:
period("2018-11-11 23:34", "yyyy-MM-dd HH:mm");
返回结果:
201811112330-201811120000
TimeMapper.java代码为:
public class TimeMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String time = Matchers.stringCutBymatchers(value.toString(), "[0-9]{4}[/][0-9]{1,2}[/][0-9]{1,2}[ ][0-9]{1,2}[:][0-9]{1,2}[:][0-9]{1,2}");
Objects.requireNonNull(time);
String result = Times.period(time, "yyyy/MM/dd HH:mm:ss");
context.write(new Text(result), new LongWritable(1));
}
}
复制代码
因为按行读取.csv文件而且一行中的时间格式为yyyy/MM/dd HH:mm:ss,所以直接用正则表达式截取时间。而后获取时间区段,而后将<时间区段, 1>传递给reduce().
Matchers.stringCutBymatchers():
public static String stringCutBymatchers(String str, String mstr) {
Pattern patternn = Pattern.compile(mstr);
Matcher matcher = patternn.matcher(str);
String result = null;
if (matcher.find()) {
result = matcher.group(0);
}
return result;
}
复制代码
reduce()阶段的操做就很简单了,只要统计时间区段出现的次数就行了
TimeReduce.java:
public class TimeReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long counts = 0L;
for (LongWritable val : values) {
counts += val.get();
}
context.write(key, new LongWritable(counts));
}
}
复制代码
main方法以下:
TimeApp.java:
public class TimeApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("请输入input目录和output目录");
System.exit(2);
}
Job job = Job.getInstance(conf, "acfun-time");
job.setJarByClass(CSVApp.class);
job.setMapperClass(TimeMapper.class);
job.setReducerClass(TimeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
Path path = new Path(otherArgs[otherArgs.length - 1]);// 取第1个表示输出目录参数(第0个参数是输入目录)
FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
复制代码
最终文件目录以下:
这里我采用和hadoop-example同样的启动方法,设置一个总Main.java
public class Main {
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("citycount", CSVApp.class, "统计文章中出现的城市个数");
pgd.addClass("timecount", TimeApp.class, "统计文章时段发帖数目");
exitCode = pgd.run(args);
} catch (Throwable e) {
e.printStackTrace();
}
System.exit(exitCode);
}
}
复制代码
根据命令参数来选择须要执行的job。
打包并上传后执行。
执行
yarn jar com.dust-1.0-SNAPSHOT.jar timecount /acfun/input/dust_acfun_article.csv /acfun/output
复制代码
等待job执行完成:
执行完成以后经过
hdfs dfs -cat /acfun/output/part-r-00000
复制代码
查看结果
以后只要将该文件的数据提取出来画成图表就能直观地查看发帖时段了。
Mapreduce中用户能够进行操做的类:
用户须要实现该接口以指定输入文件的内容格式。该接口有两个方法
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
复制代码
其中getSplits函数将全部输入数据分红numSplits个split,每一个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每一个record解析成key/value对。
Hadoop自己提供了一些InputFormat:
InputFormat | 介绍 |
---|---|
TextInputFormat | 文中的每一行都是记录,即key - 行的偏移量;value - 行的内容. key:LongWritable - value:Text |
KeyValueTextInputFormat | 文本文件中的每一行都是一条记录。 第一个分隔符分隔每一行。分隔符以前的全部内容都是键,后面的全部内容都是值。分隔符由键值设置。 输入行属性中的分隔符,默认为tab[\t]字符.key:Text - value:Text |
SequenceFileInputFormat<K, V> | 用于读取序列文件的inputformat。 键和值是用户定义的。 sequence文件是一个hadoop特定的压缩二进制文件格式。它被优化用于在一个mapreduce做业的输出之间传递数据到一些其余mapreduce做业的输入. key:K - value:V |
NLineInputFormat | 每一个分割都保证有正好N行,mapred行输入格式linespermap属性,默认为1,设置N.key:LongWritable - value:Text |
2.Mapper接口 用户需继承Mapper接口实现本身的Mapper,Mapper中必须实现的函数是
void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter ) throws IOException 复制代码
其中,是经过Inputformat中的RecordReader对象解析处理 的,OutputCollector获取map()的输出结果,Reporter保存了当前task处理进度。
Hadoop自己提供了一些Mapper供用户使用:
Class | 介绍 |
---|---|
IdentityMapper<K, V> | 实现Mapper <K,V,K,V>并将输入直接映射到输出 |
InverseMapper<K, V> | 实现Mapper <K,V,V,K>并将输入直接映射到输出 |
RegexMapper | 实现Mapper <K,Text,Text,LongWritable>并为每一个正则表达式匹配生成(match,1)对 |
TokenCountMapper | 实现Mapper <K,Text,Text,LongWritable>并在输入值被标记化时生成(token,1)对 |
用户需继承该接口实现本身的Partitioner以指定map task产生的key/value对交给哪一个reduce task处理,好的Partitioner能让每一个reduce task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition( K2 key, V2 value, int numPartitions)
该函数返回对应的reduce task ID。
用户若是不提供Partitioner,Hadoop会使用默认的(其实是个hash函数)。
Combiner使得map task与reduce task之间的数据传输量大大减少,可明显提升性能。大多数状况下,Combiner与Reducer相同。
用户需继承Reducer接口实现本身的Reducer,Reducer中必须实现的函数是
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter ) throws IOException 复制代码
Hadoop自己提供了一些Reducer供用户使用:
Class | 介绍 |
---|---|
IdentityReduce<K, V> | 实现Reduce <K,V,K,V>并将输入直接映射到输出 |
LongSumReduce | 实现Reduce <K,LongWritable,K,LongWritable>并肯定与给定键对应的全部值的总和 |
用户经过OutputFormat指定输出文件的内容格式,不过它没有split。每一个reduce task将其数据写入本身的文件,文件名为part-nnnnn,其中nnnnn为reduce task的ID。
Hadoop自己提供了几个OutputFormat:
OutputFormat | 介绍 |
---|---|
TextOutputFormat | 将每条记录写为一行文本。 键和值写为字符串,并由tab(\t)分隔,可在mapred中更改。textoutputformat分隔符属性 |
SequenceFileOutputFormat | 以hadoop的专有序列文件格式写入键/值对。 与SequenceFileInputFormat一块儿使用 |
NullOutputFormat | 不作输出 |