哈喽~各位小伙伴们中秋快乐,很久没更新新的文章啦,今天分享如何使用mapreduce进行join操做。sql
在离线计算中,咱们经常不仅是会对单一一个文件进行操做,进行须要进行两个或多个文件关联出更多数据,相似与sql中的join操做。
今天就跟你们分享一下如何在MapReduce中实现join操做数据库
现有两张,一张是产品信息表,一张是订单表。订单表中只表存了产品ID,若是想要查出订单以及产品的相关信息就必须使用关联。
根据MapReduce特性,你们都知道在reduce端,相同key的key,value对会被放到同一个reduce方法中(不设置partition的话)。 利用这个特色咱们能够轻松实现join操做,请看下面示例。
ID | brand | model |
---|---|---|
p0001 | 苹果 | iphone11 pro max |
p0002 | 华为 | p30 |
p0003 | 小米 | mate10 |
id | name | address | produceID | num |
---|---|---|---|---|
00001 | kris | 深圳市福田区 | p0001 | 1 |
00002 | pony | 深圳市南山区 | p0001 | 2 |
00003 | jack | 深圳市坂田区 | p0001 | 3 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,须要用mapreduce程序来实现一下SQL查询运算:缓存
select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID
经过将关联的条件(prodcueID)做为map输出的key,将两表知足join条件的数据并携带数据所来源的文件信息,发往同一个 reduce task,在reduce中进行数据的串联
public class RJoinInfo implements Writable{ private String customerName=""; private String customerAddr=""; private String orderID=""; private int orderNum; private String productID=""; private String productBrand=""; private String productModel=""; // 0是产品,1是订单 private int flag; setter/getter
public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> { private static Logger logger = LogManager.getLogger(RJoinMapper.class); private RJoinInfo rJoinInfo = new RJoinInfo(); private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 输入方式支持不少中包括数据库等等。这里用的是文件,所以能够直接强转为文件切片 FileSplit fileSplit = (FileSplit) context.getInputSplit(); // 获取文件名称 String name = fileSplit.getPath().getName(); logger.info("splitPathName:"+name); String line = value.toString(); String[] split = line.split("\t"); String productID = ""; if(name.contains("product")){ productID = split[0]; String setProductBrand = split[1]; String productModel = split[2]; rJoinInfo.setProductID(productID); rJoinInfo.setProductBrand(setProductBrand); rJoinInfo.setProductModel(productModel); rJoinInfo.setFlag(0); }else if(name.contains("orders")){ String orderID = split[0]; String customerName = split[1]; String cutsomerAddr = split[2]; productID = split[3]; String orderNum = split[4]; rJoinInfo.setProductID(productID); rJoinInfo.setCustomerName(customerName); rJoinInfo.setCustomerAddr(cutsomerAddr); rJoinInfo.setOrderID(orderID); rJoinInfo.setOrderNum(Integer.parseInt(orderNum)); rJoinInfo.setFlag(1); } k.set(productID); context.write(k,rJoinInfo); } }
代码解释,这里根据split的文件名,判断是products仍是orders,
而后根据是product仍是orders获取不一样的数据,最用都以productID为Key发送给Reduce端并发
public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> { private static Logger logger = LogManager.getLogger(RJoinReducer.class); @Override protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException { List<RJoinInfo> orders = new ArrayList<>(); String productID = key.toString(); logger.info("productID:"+productID); RJoinInfo rJoinInfo = new RJoinInfo(); for (RJoinInfo value : values) { int flag = value.getFlag(); if (flag == 0) { // 产品 try { BeanUtils.copyProperties(rJoinInfo,value); } catch (IllegalAccessException e) { logger.error(e.getMessage()); } catch (InvocationTargetException e) { logger.error(e.getMessage()); } }else { // 订单 RJoinInfo orderInfo = new RJoinInfo(); try { BeanUtils.copyProperties(orderInfo,value); } catch (IllegalAccessException e) { logger.error(e.getMessage()); } catch (InvocationTargetException e) { logger.error(e.getMessage()); } orders.add(orderInfo); } } for (RJoinInfo order : orders) { rJoinInfo.setOrderNum(order.getOrderNum()); rJoinInfo.setOrderID(order.getOrderID()); rJoinInfo.setCustomerName(order.getCustomerName()); rJoinInfo.setCustomerAddr(order.getCustomerAddr()); // 只输出key便可,value可使用nullwritable context.write(rJoinInfo,NullWritable.get()); } } }
代码解释:根据productID会分为不一样的组发到reduce端,reduce端拿到后一组数据后,其中有一个产品对象和多个订单对象。
遍历每个对象,根据flag区分产品和订单。保存产品对象,获取每一个订单对象到一个集合中。当咱们对每一个对象都分好
类后,遍历订单集合将订单和产品信息集合,而后输出。app
注意:咱们这里效率虽然不是最高的,主要是想说明join的思路。iphone
public class RJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // conf.set("mapreduce.framework.name","yarn"); // conf.set("yarn.resourcemanager.hostname","server1"); // conf.set("fs.defaultFS","hdfs://server1:9000"); conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); // 若是是本地运行,能够不用设置jar包的路径,由于不用拷贝jar到其余地方 job.setJarByClass(RJoinDriver.class); // job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar"); job.setMapperClass(RJoinMapper.class); job.setReducerClass(RJoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(RJoinInfo.class); job.setOutputKeyClass(RJoinInfo.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input")); FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output")); boolean waitForCompletion = job.waitForCompletion(true); System.out.println(waitForCompletion); } }
==上面实现的这种方式有个缺点,就是join操做是在reduce阶段完成的,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜==ide
这种方式适用于关联表中有小表的情形: 能够将小表分发到全部的map节点,这样,map节点就能够在本地对本身所读到的大表数据进行join操做并输出结果, 能够大大提升join操做的并发度,加快处理速度。
在Mapper端咱们一次性加载数据或者用Distributedbache将文件拷贝到每个运行的maptask的节点上加载 这里咱们使用第二种,在mapper类中定义好小表进行join
static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{ private static Map<String, RJoinInfo> productMap = new HashMap<>(); // 在循环调用map方法以前会先调用setup方法。所以咱们能够在setup方法中,先对文件进行处理 @Override protected void setup(Context context) throws IOException, InterruptedException { //经过这几句代码能够获取到cache file的本地绝对路径,测试验证用 URI[] cacheFiles = context.getCacheFiles(); System.out.println(Arrays.toString(new URI[]{cacheFiles[0]})); // 直接指定名字,默认在工做文件夹的目录下查找 1⃣ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){ String line; while ((line = bufferedReader.readLine())!=null){ String[] split = line.split("\t"); String productID = split[0]; String setProductBrand = split[1]; String productModel = split[2]; RJoinInfo rJoinInfo = new RJoinInfo(); rJoinInfo.setProductID(productID); rJoinInfo.setProductBrand(setProductBrand); rJoinInfo.setProductModel(productModel); rJoinInfo.setFlag(0); productMap.put(productID, rJoinInfo); } } super.setup(context); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit(); String name = fileSplit.getPath().getName(); if (name.contains("orders")) { String line = value.toString(); String[] split = line.split("\t"); String orderID = split[0]; String customerName = split[1]; String cutsomerAddr = split[2]; String productID = split[3]; String orderNum = split[4]; RJoinInfo rJoinInfo = productMap.get(productID); rJoinInfo.setProductID(productID); rJoinInfo.setCustomerName(customerName); rJoinInfo.setCustomerAddr(cutsomerAddr); rJoinInfo.setOrderID(orderID); rJoinInfo.setOrderNum(Integer.parseInt(orderNum)); rJoinInfo.setFlag(1); context.write(rJoinInfo, NullWritable.get()); } } }
代码解释:这里咱们又重写了一个setup()方法,这个方法会在执行map()方法前先执行,所以咱们能够在这个方法中事先加载好数据。
在上述代码中,咱们直接指定名字就拿到了product.txt文件,这个究竟这个文件是怎么复制在maptask的节点上的呢,还要看下面的driver测试
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); job.setJarByClass(RJoinDemoInMapDriver.class); job.setMapperClass(RjoinMapper.class); job.setOutputKeyClass(RJoinInfo.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input")); FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2")); // 指定须要缓存一个文件到全部的maptask运行节点工做目录 // job.addFileToClassPath(); 将普通文件缓存到task运行节点的classpath下 // job.addArchiveToClassPath();缓存jar包到task运行节点的classpath下 // job.addCacheArchive();缓存压缩包文件到task运行节点的工做目录 // job.addCacheFile();将普通文件 1⃣ job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt")); // 设置reduce的数量为0 job.setNumReduceTasks(0); boolean waitForCompletion = job.waitForCompletion(true); System.out.println(waitForCompletion); }
代码解释:上述Driver中,咱们经过job.addCacheFile()指定了一个URI本地地址,运行时mapreduce就会将这个文件拷贝到maptask的运行工做目录中。spa
好啦~本期分享代码量偏多,主要是想分享如何使用mapreduce进行join操做的思路。下一篇我会再讲一下 计算共同好友的思路以及代码~code
公众号搜索:喜讯XiCent 获取更多福利资源~~~~
本文由博客一文多发平台 OpenWrite 发布!