本文主要介绍spark join相关操做。java
讲述spark链接相关的三个方法join,left-outer-join,right-outer-join,在这以前,咱们用hiveSQL先跑出告终果以方便进行对比。sql
咱们以实例来进行说明。个人实现步骤记录以下。app
一、数据准备ide
二、HSQL描述函数
三、Spark描述post
一、数据准备spa
咱们准备两张Hive表,分别是orders(订单表)和drivers(司机表),经过driver_id字段进行关联。数据以下:code
ordersblog
orders表有两个字段,订单id:order_id和司机id:driver_id。司机id将做为链接键。ci
经过select能够看到三条数据。
hive (gulfstream_test)> select * from orders; OK orders.order_id orders.driver_id 1000 5000 1001 5001 1002 5002 Time taken: 0.387 seconds, Fetched: 3 row(s)
drivers
drivers表由两个字段,司机id:driver_id和车辆id:car_id。司机id将做为链接键。
经过select能够看到两条数据。
hive (gulfstream_test)> select * from drivers; OK drivers.driver_id drivers.car_id 5000 100 5003 103 Time taken: 0.036 seconds, Fetched: 2 row(s)
二、HSQL描述
JOIN
天然链接,输出链接键匹配的记录。
能够看到,经过driver_id匹配的数据只有一条。
hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 Time taken: 36.079 seconds, Fetched: 1 row(s)
LEFT OUTER JOIN
左外连接,输出链接键匹配的记录,左侧的表不管匹配与否都输出。
能够看到,经过driver_id匹配的数据只有一条,不过全部orders表中的记录都被输出了,drivers中未能匹配的字段被置为空。
hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 1001 5001 NULL NULL 1002 5002 NULL NULL Time taken: 36.063 seconds, Fetched: 3 row(s)
RIGHT OUTER JOIN
右外链接,输出链接键匹配的记录,右侧的表不管匹配与否都输出。
能够看到,经过driver_id匹配的数据只有一条,不过全部drivers表中的记录都被输出了,orders中未能匹配的字段被置为空。
hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 NULL NULL 5003 103 Time taken: 30.089 seconds, Fetched: 2 row(s)
三、Spark描述
spark实现join的方式也是经过RDD的算子,spark一样提供了三个算子join,leftOuterJoin,rightOuterJoin。
在下面给出的例子中,咱们经过spark-hive读取了Hive中orders表和drivers表中的数据,这时候数据的表现形式是DataFrame,若是要使用Join操做:
1)首先须要先将DataFrame转化成了JavaRDD。
2)不过,JavaRDD实际上是没有join算子的,下面还须要经过mapToPair算子将JavaRDD转换成JavaPairRDD,这样就可使用Join了。
下面例子中给出了三种join操做的实现方式,在join以后,经过collect()函数把数据拉到Driver端本地,并经过标准输出打印。
须要指出的是
1)join算子(join,leftOuterJoin,rightOuterJoin)只能经过PairRDD使用;
2)join算子操做的Tuple2<Object1, Object2>类型中,Object1是链接键,我只试过Integer和String,Object2比较灵活,甚至能够是整个Row。
这里咱们使用driver_id做为链接键。 因此在输出Tuple2的时候,咱们将driver_id放在了前面。
Join.java
/* * spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar * */ public class Join implements Serializable { private transient JavaSparkContext javaSparkContext; private transient HiveContext hiveContext; /* * 初始化Load * 建立sparkContext, sqlContext, hiveContext * */ public Join() { initSparckContext(); initHiveContext(); } /* * 建立sparkContext * */ private void initSparckContext() { String warehouseLocation = System.getProperty("user.dir"); SparkConf sparkConf = new SparkConf() .setAppName("spark-join") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("yarn-client"); javaSparkContext = new JavaSparkContext(sparkConf); } /* * 建立hiveContext * 用于读取Hive中的数据 * */ private void initHiveContext() { hiveContext = new HiveContext(javaSparkContext); } public void join() { /* * 生成rdd1 * */ String query1 = "select * from gulfstream_test.orders"; DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id"); JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() { @Override public Tuple2<String, String> call(Row row) throws Exception { String orderId = (String)row.get(0); String driverId = (String)row.get(1); return new Tuple2<String, String>(driverId, orderId); } }); /* * 生成rdd2 * */ String query2 = "select * from gulfstream_test.drivers"; DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id"); JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() { @Override public Tuple2<String, String> call(Row row) throws Exception { String driverId = (String)row.get(0); String carId = (String)row.get(1); return new Tuple2<String, String>(driverId, carId); } }); /* * join * */ System.out.println(" ****************** join *******************"); JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2); Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator(); while (it1.hasNext()) { Tuple2<String, Tuple2<String, String>> item = it1.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } /* * leftOuterJoin * */ System.out.println(" ****************** leftOuterJoin *******************"); JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2); Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator(); while (it2.hasNext()) { Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } /* * rightOuterJoin * */ System.out.println(" ****************** rightOuterJoin *******************"); JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2); Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator(); while (it3.hasNext()) { Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } } public static void main(String[] args) { Join sj = new Join(); sj.join(); } }
执行结果
其中Optional.absent()表示的就是null,能够看到和HSQL是一致的。
Application ID is application_1508228032068_2746260, trackingURL: http://10.93.21.21:4040 ****************** join ******************* driver_id:5000, order_id:1000, car_id:100 ****************** leftOuterJoin ******************* driver_id:5001, order_id:1001, car_id:Optional.absent() driver_id:5002, order_id:1002, car_id:Optional.absent() driver_id:5000, order_id:1000, car_id:Optional.of(100) ****************** rightOuterJoin ******************* driver_id:5003, order_id:Optional.absent(), car_id:103 driver_id:5000, order_id:Optional.of(1000), car_id:100
因为数据量不大,我没有从执行效率上进行考量。
根据经验,通常在数据量较大的状况下,HSQL的执行效率会高一些,若是数据量较小,Spark会快。