原文地址:itweknow.cn/detail?id=7… ,欢迎你们访问。java
上篇文章咱们简要介绍了一下Avro是啥,以及其几种数据类型。那么经过这篇文章咱们一块儿来实践一下Avro在MapReduce中的使用。git
一个maven项目 Hadoop集群,若是你尚未安装的话,请戳这里,查看以前的文章。github
本篇文章是一个简单的用例,使用的例子是一个txt文件中存储了大量的学生信息,这些学生有姓名、年龄、爱好和班级信息,咱们要作的事情就是经过MapReduce程序找到各个班级年龄最大的学生。apache
咱们须要hadoop以及avro相关的包。json
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.8.2</version>
</dependency>
复制代码
前面也说到了每一个学生有姓名、年龄、爱好、班级四个字段的信息,因此咱们定义了以下的Avro模式来描述一个学生。命名为Student.avsc,存放在resources目录下。bash
{
"type": "record",
"name": "StudentRecord",
"doc": "A student",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "hobby", "type": "string"},
{"name": "class", "type": "string"}
]
}
复制代码
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<GenericRecord>> {
private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
private StudentRecordParser parser = new StudentRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValid()) {
// 数据合法。
record.put("name", parser.getName());
record.put("age", parser.getAge());
record.put("hobby", parser.getHobby());
record.put("class", parser.getClazz());
context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
}
}
}
复制代码
上面的代码中你能够看到咱们自定义了一个StudentRecordParser的类来解析一行记录,因为篇幅的缘由这里就不展现了,你能够在后面提供的源码中找到。其实不难看出,Map程序主要作的事情就是将咱们存放在txt中的记录解析成一个个的GenericRecord对戏,而后以班级名称为键,record为值传递给Reducer作进一步处理。app
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null || ((Integer)max.get("age") <
(Integer) record.get("age"))) {
max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
max.put("name", record.get("name"));
max.put("age", record.get("age"));
max.put("hobby", record.get("hobby"));
max.put("class", record.get("class"));
}
}
context.write(new AvroKey<>(max), NullWritable.get());
}
}
复制代码
Reducer的逻辑其实也比较简单,就是经过循环比较的方式找到年龄最大的学生。maven
public class StudentAgeMaxDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 注释1:为了解决在Hadoop集群中运行时咱们使用的Avro版本和集群中Avro版本不一致的问题。
configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Job job = Job.getInstance(configuration);
job.setJarByClass(StudentAgeMaxDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(StudentAgeMaxMapper.class);
job.setReducerClass(StudentAgeMaxReducer.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
复制代码
和以前的MapReduce实战中实例比较,咱们这里使用AvroJob来配置做业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。ide
在打包的时候咱们须要将依赖也打到jar包中,否则后面在集群中运行的时候会报找不到AvroJob类的错误。可经过在pom.xml中添加以下插件来解决打包的问题。oop
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
复制代码
准备输入文件,input.txt。
zhangsan 23 music class1
lisi 24 pingpong class2
wangwu 24 dance class1
liuyi 25 music class1
chener 25 dance class2
zhaoliu 22 dance class2
sunqi 22 pingpong class1
zhouba 23 music class2
wujiu 26 dance class1
zhengshi 21 dance class2
复制代码
将输入文件上传到HDFS上
hadoop fs -mkdir /input
hadoop fs -put input.txt /input
复制代码
将jar拷贝到集群中任意一台Hadoop机器上。
运行下面的命令执行jar包
export HADOOP_CLASSPATH=${你的jar包名}
export HADOOP_USER_CLASSPATH_FIRST=true
hadoop jar {你的jar包名} {主类路径} /input /output
复制代码
将运行结果拷贝到本地
hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
复制代码
运行结果查看
root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro
{"name":"wujiu","age":26,"hobby":"dance","class":"class1"}
{"name":"chener","age":25,"hobby":"dance","class":"class2"}
复制代码
想要项目源码吗?戳这里就有哦。