上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现;java
1、需求apache
实现两个“表”的join操做,其中一个表数据量小,一个表很大,这种场景在实际中很是常见,好比“订单日志” join “产品信息”centos
2、分析api
--原理阐述:适用于关联表中有小表的情形;能够将小表分发到全部的map节点,这样,map节点就能够在本地对本身所读到的大表数据进行join并输出最终结果,能够大大提升join操做的并发度,加快处理速度缓存
--示例:先在mapper类中预先定义好小表,进行join服务器
--并用distributedcache机制将小表的数据分发到每个maptask执行节点,从而每个maptask节点能够从本地加载到小表的数据,进而在本地便可实现join并发
3、代码实现app
package com.empire.hadoop.mr.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用一个hashmap来加载保存产品信息表 Map<String, String> pdInfoMap = new HashMap<String, String>(); Text k = new Text(); /** * 经过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据以前调用一次 能够用来作一些初始化工做 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt"))); String line; while (StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split("\t"); pdInfoMap.put(fields[0], fields[2]); } br.close(); } // 因为已经持有完整的产品信息表,因此在map方法中就能实现join逻辑了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); //job.setJar("D:/mapsidejoin.jar"); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定须要缓存一个文件到全部的maptask运行节点工做目录 /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中 /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中 /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工做目录 /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工做目录 // 将产品表文件缓存到task工做节点的工做目录中去 //job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt")); job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt")); //map端join的逻辑不须要reduce阶段,设置reducetask数量为0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
4、执行程序ide
#上传jar Alt+p lcd d:/ put mapsidejoin.jar #准备hadoop处理的数据文件 cd /home/hadoop/apps/hadoop-2.9.1 hadoop fs -mkdir -p /rjoin/mapjoinsideinput hadoop fs -mkdir -p /rjoin/mapjoincache hdfs dfs -put order.txt /rjoin/mapjoinsideinput hdfs dfs -put product.txt /rjoin/mapjoincache #运行mapsidejoin程序 hadoop jar mapsidejoin.jar com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput
5、运行效果oop
IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #87 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters [IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #87 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 36ms [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=189612 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=218 HDFS: Number of bytes written=108 HDFS: Number of read operations=5 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3057 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=3057 Total vcore-milliseconds taken by all map tasks=3057 Total megabyte-milliseconds taken by all map tasks=3130368 Map-Reduce Framework Map input records=4 Map output records=4 Input split bytes=125 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=99 CPU time spent (ms)=350 Physical memory (bytes) snapshot=117669888 Virtual memory (bytes) snapshot=845942784 Total committed heap usage (bytes)=16121856 File Input Format Counters Bytes Read=93 File Output Format Counters Bytes Written=108 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328) [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #88 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport [IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #88 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 0ms [pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@303c7016 [Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.
6、运行结果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /rjoin/mapjoinsideoutput/part-m-00000 1001 20150710 P0001 2 小米5 1002 20150710 P0001 3 小米5 1002 20150710 P0002 3 锤子T1 1003 20150710 P0003 3 锤子
最后寄语,以上是博主本次文章的所有内容,若是你们以为博主的文章还不错,请点赞;若是您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,而且欢迎随时跟博主沟通交流。