在大数据处理场景中,多表Join是很是常见的一类运算。可是对于分布式系统来讲,这是个很大的麻烦,因为数据分布在各个节点上,在作join操做以前必须先要shuffle,这会致使巨大的网络传输IO,致使速度很慢。算法
下面,介绍一种map-side-join,该类join使用场景是一个大表和一个小表的链接操做,其中,“小表”是指文件足够小,能够加载到内存中。该算法能够将join算子执行在Map端,无需经历shuffle和reduce等阶段,所以效率很是高。网络
下面给出实例代码分布式
// Fact table val flights = sc.parallelize(List( ("SEA", "JFK", "DL", "418", "7:00"), ("SFO", "LAX", "AA", "1250", "7:05"), ("SFO", "JFK", "VX", "12", "7:05"), ("JFK", "LAX", "DL", "424", "7:10"), ("LAX", "SEA", "DL", "5737", "7:10"))) // Dimension table val airports = sc.parallelize(List( ("JFK", "John F. Kennedy International Airport", "New York", "NY"), ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"), ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"), ("SFO", "San Francisco International Airport", "San Francisco", "CA"))) // Dimension table val airlines = sc.parallelize(List( ("AA", "American Airlines"), ("DL", "Delta Airlines"), ("VX", "Virgin America")))
须要把三个表join成以下格式:ide
Seattle New York Delta Airlines 418 7:00
San Francisco Los Angeles American Airlines 1250 7:05
San Francisco New York Virgin America 12 7:05
New York Los Angeles Delta Airlines 424 7:10
Los Angeles Seattle Delta Airlines 5737 7:10大数据
其中fact表是很是巨大的,而两个dimension表比较小,咱们能够把小表加载到内存中spa
val airportsMap = sc.broadcast(airports.map{case(a, b, c, d) => (a, c)}.collectAsMap) val airlinesMap = sc.broadcast(airlines.collectAsMap)
下面是map-side-join:
scala
flights.map{case(a, b, c, d, e) => (airportsMap.value.get(a).get, airportsMap.value.get(b).get, airlinesMap.value.get(c).get, d, e)}.collect
运行结果的部分展现:
code
res: Array[(String, String, String, String, String)] = Array(
内存
(Seattle, New York, Delta Airlines, 418, 7:00),
(San Francisco, Los Angeles, American Airlines, 1250, 7:05),
(San Francisco, New York, Virgin America, 12, 7:05),
(New York, Los Angeles, Delta Airlines, 424, 7:10),
(Los Angeles, Seattle, Delta Airlines, 5737, 7:10))