在《Apache Flink 漫谈系列 - SQL概览》中咱们介绍了JOIN算子的语义和基本的使用方式,介绍过程当中你们发现Apache Flink在语法语义上是遵循ANSI-SQL标准的,那么再深思一下传统数据库为啥须要有JOIN算子呢?在实现原理上面Apache Flink内部实现和传统数据库有什么区别呢?本篇将详尽的为你们介绍传统数据库为何须要JOIN算子,以及JOIN算子在Apache Flink中的底层实现原理和在实际使用中的优化!mysql
在《Apache Flink 漫谈系列 - SQL概览》中我对JOIN算子有过简单的介绍,这里咱们以具体实例的方式让你们对JOIN算子加深印象。JOIN的本质是分别从N(N>=1)张表中获取不一样的字段,进而获得最完整的记录行。好比咱们有一个查询需求:在学生表(学号,姓名,性别),课程表(课程号,课程名,学分)和成绩表(学号,课程号,分数)中查询全部学生的姓名,课程名和考试分数。以下:sql
JOIN的本质是数据拼接,那么若是咱们将全部数据列存储在一张大表中,是否是就不须要JOIN了呢?若是真的能将所需的数据都在一张表存储,我想就真的不须要JOIN的算子了,但现实业务中真的能作到将所需数据放到同一张大表里面吗?答案是否认的,核心缘由有2个:数据库
产生数据的源头是同一个系统,可是数据冗余的沉重代价,迫使咱们会遵循数据库范式,进行表的设计。简说NF以下: 数据结构
固然还有 4NF,5NF,不过在实际的数据库设计过程当中作到BCNF已经足够了!(并不是否认4NF,5NF存在的意义,只是我的尚未遇到必定要用4NF,5NF的场景,设计每每会按存储成本,查询性能等综合因素考量)并发
JOIN 在传统数据库中有以下分类:数据库设计
OUTER JOIN分布式
JOIN 在SQL89和SQL92中有不一样的语法,以INNER JOIN为例说明:性能
SELECT a.colA, b.colA FROM tab1 AS a , tab2 AS b WHERE a.id = b.id and a.other > b.other
SELECT a.colA, b.colA FROM tab1 AS a JOIN tab2 AS b ON a.id = b.id WHERE a.other > b.other
本篇中的后续示例将应用SQL92语法进行SQL的编写,语法以下:优化
tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition]
在《Apache Flink 漫谈系列 - SQL概览》中对JOIN语义有过简单介绍,这里会进行展开介绍。 咱们以开篇示例中的三张表:学生表(学号,姓名,性别),课程表(课程号,课程名,学分)和成绩表(学号,课程号,分数)来介绍各类JOIN的语义。spa
交叉链接会对两个表进行笛卡尔积,也就是LEFT表的每一行和RIGHT表的全部行进行联接,所以生成结果表的行数是两个表行数的乘积,如student和course表的CROSS JOIN结果以下:
mysql> SELECT * FROM student JOIN course; +------+-------+------+-----+-------+--------+ | no | name | sex | no | name | credit | +------+-------+------+-----+-------+--------+ | S001 | Sunny | M | C01 | Java | 2 | | S002 | Tom | F | C01 | Java | 2 | | S003 | Kevin | M | C01 | Java | 2 | | S001 | Sunny | M | C02 | Blink | 3 | | S002 | Tom | F | C02 | Blink | 3 | | S003 | Kevin | M | C02 | Blink | 3 | | S001 | Sunny | M | C03 | Spark | 3 | | S002 | Tom | F | C03 | Spark | 3 | | S003 | Kevin | M | C03 | Spark | 3 | +------+-------+------+-----+-------+--------+ 9 rows in set (0.00 sec)
如上结果咱们获得9行=student(3) x course(3)。交叉联接通常会消耗较大的资源,也被不少用户质疑交叉联接存在的意义?(任什么时候候咱们都有质疑的权利,同时也建议咱们养成本身质疑本身“质疑”的习惯,就像小时候不理解父母的“废话”同样)。
咱们以开篇的示例说明交叉联接的巧妙之一,开篇中咱们的查询需求是:在学生表(学号,姓名,性别),课程表(课程号,课程名,学分)和成绩表(学号,课程号,分数)中查询全部学生的姓名,课程名和考试分数。开篇中的SQL语句获得的结果以下:
mysql> SELECT -> student.name, course.name, score -> FROM student JOIN score ON student.no = score.s_no -> JOIN course ON score.c_no = course.no; +-------+-------+-------+ | name | name | score | +-------+-------+-------+ | Sunny | Java | 80 | | Sunny | Blink | 98 | | Sunny | Spark | 76 | | Kevin | Java | 78 | | Kevin | Blink | 88 | | Kevin | Spark | 68 | +-------+-------+-------+ 6 rows in set (0.00 sec)
如上INNER JOIN的结果咱们发现少了Tom同窗的成绩,缘由是Tom同窗没有参加考试,在score表中没有Tom的成绩,可是咱们可能但愿虽然Tom没有参加考试但仍然但愿Tom的成绩可以在查询结果中显示(成绩 0 分),面对这样的需求,咱们怎么处理呢?交叉联接能够帮助咱们:
mysql> SELECT -> stu.no, c.no, stu.name, c.name -> FROM student stu JOIN course c 笛卡尔积 -> ORDER BY stu.no; -- 排序只是方便你们查看:) +------+-----+-------+-------+ | no | no | name | name | +------+-----+-------+-------+ | S001 | C03 | Sunny | Spark | | S001 | C01 | Sunny | Java | | S001 | C02 | Sunny | Blink | | S002 | C03 | Tom | Spark | | S002 | C01 | Tom | Java | | S002 | C02 | Tom | Blink | | S003 | C02 | Kevin | Blink | | S003 | C03 | Kevin | Spark | | S003 | C01 | Kevin | Java | +------+-----+-------+-------+ 9 rows in set (0.00 sec)
mysql> SELECT -> stu.no, c.no, stu.name, c.name, -> CASE -> WHEN s.score IS NULL THEN 0 -> ELSE s.score -> END AS score -> FROM student stu JOIN course c -- 迪卡尔积 -> LEFT JOIN score s ON stu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN -> ORDER BY stu.no; -- 排序只是为了你们好看一点:) +------+-----+-------+-------+-------+ | no | no | name | name | score | +------+-----+-------+-------+-------+ | S001 | C03 | Sunny | Spark | 76 | | S001 | C01 | Sunny | Java | 80 | | S001 | C02 | Sunny | Blink | 98 | | S002 | C02 | Tom | Blink | 0 | -- TOM 虽然没有参加考试,可是仍然看到他的信息 | S002 | C03 | Tom | Spark | 0 | | S002 | C01 | Tom | Java | 0 | | S003 | C02 | Kevin | Blink | 88 | | S003 | C03 | Kevin | Spark | 68 | | S003 | C01 | Kevin | Java | 78 | +------+-----+-------+-------+-------+ 9 rows in set (0.00 sec)
通过CROSS JOIN帮咱们将Tom的信息也查询出来了!(TOM 虽然没有参加考试,可是仍然看到他的信息)
内联接在SQL92中 ON 表示联接添加,可选的WHERE子句表示过滤条件,如开篇的示例就是一个多表的内联接,咱们在看一个简单的示例: 查询成绩大于80分的学生学号,学生姓名和成绩:
mysql> SELECT -> stu.no, stu.name , s.score -> FROM student stu JOIN score s ON stu.no = s.s_no -> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面按语义的逻辑是:
mysql> SELECT -> stu.no, stu.name , s.score -> FROM student stu JOIN score s ON stu.no = s.s_no ; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 80 | | S001 | Sunny | 98 | | S001 | Sunny | 76 | | S003 | Kevin | 78 | | S003 | Kevin | 88 | | S003 | Kevin | 68 | +------+-------+-------+ 6 rows in set (0.00 sec)
-> WHERE s.score > 80; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面的查询过程符合语义,可是若是在filter条件能过滤不少数据的时候,先进行数据的过滤,在进行内联接会获取更好的性能,好比咱们手工写一下:
mysql> SELECT -> no, name , score -> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
上面写法语义和第一种写法语义一致,获得相同的查询结果,上面查询过程是:
mysql> SELECT s_no, score FROM score s WHERE s.score >80; +------+-------+ | s_no | score | +------+-------+ | S001 | 98 | | S003 | 88 | +------+-------+ 2 rows in set (0.00 sec)
-> ON no = s_no; +------+-------+-------+ | no | name | score | +------+-------+-------+ | S001 | Sunny | 98 | | S003 | Kevin | 88 | +------+-------+-------+ 2 rows in set (0.00 sec)
如上两种写法在语义上一致,但查询性能在数量很大的状况下会有很大差距。上面为了和你们演示相同的查询语义,能够有不一样的查询方式,不一样的执行计划。实际上数据库自己的优化器会自动进行查询优化,在内联接中ON的联接条件和WHERE的过滤条件具备相同的优先级,具体的执行顺序能够由数据库的优化器根据性能消耗决定。也就是说物理执行计划能够先执行过滤条件进行查询优化,若是细心的读者可能发现,在第二个写法中,子查询咱们不但有行的过滤,也进行了列的裁剪(去除了对查询结果没有用的c_no列),这两个变化实际上对应了数据库中两个优化规则:
如上优化规则以filter push down 为例,示意优化器对执行plan的优化变更:
左外联接语义是返回左表全部行,右表不存在补NULL,为了演示做用,咱们查询没有参加考试的全部学生的成绩单:
mysql> SELECT -> no, name , s.c_no, s.score -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no -> WHERE s.score is NULL; +------+------+------+-------+ | no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec)
上面查询的执行逻辑上也是分红两步:
mysql> SELECT -> no, name , s.c_no, s.score -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no; +------+-------+------+-------+ | no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | C01 | 80 | | S001 | Sunny | C02 | 98 | | S001 | Sunny | C03 | 76 | | S002 | Tom | NULL | NULL | -- 右表不存在的补NULL | S003 | Kevin | C01 | 78 | | S003 | Kevin | C02 | 88 | | S003 | Kevin | C03 | 68 | +------+-------+------+-------+ 7 rows in set (0.00 sec)
mysql> SELECT -> no, name , s.c_no, s.score -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no -> WHERE s.score is NULL; +------+------+------+-------+ | no | name | c_no | score | +------+------+------+-------+ | S002 | Tom | NULL | NULL | +------+------+------+-------+ 1 row in set (0.00 sec)
这两个过程和上面分析的INNER JOIN同样,可是这时候可否利用上面说的 filter push down的优化呢?根据LEFT OUTER JOIN的语义来说,答案是否认的。咱们手工操做看一下:
mysql> SELECT * FROM score s WHERE s.score is NULL; Empty set (0.00 sec)
mysql> SELECT -> no, name , s.c_no, s.score -> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON stu.no = s.s_no; +------+-------+------+-------+ | no | name | c_no | score | +------+-------+------+-------+ | S001 | Sunny | NULL | NULL | | S002 | Tom | NULL | NULL | | S003 | Kevin | NULL | NULL | +------+-------+------+-------+ 3 rows in set (0.00 sec)
咱们发现两种写法的结果不一致,第一种写法只返回Tom没有参加考试,是咱们预期的。第二种写法返回了Sunny,Tom和Kevin三名同窗都没有参加考试,这明显是非预期的查询结果。全部LEFT OUTER JOIN不能利用INNER JOIN的 filter push down优化。
右外连接语义是返回右表全部行,左边不存在补NULL,以下:
mysql> SELECT -> s.c_no, s.score, no, name -> FROM score s RIGHT JOIN student stu ON stu.no = s.s_no; +------+-------+------+-------+ | c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C02 | 98 | S001 | Sunny | | C03 | 76 | S001 | Sunny | | NULL | NULL | S002 | Tom | -- 左边没有的进行补 NULL | C01 | 78 | S003 | Kevin | | C02 | 88 | S003 | Kevin | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 7 rows in set (0.00 sec)
上面右外连接我只是将上面左外连接查询的左右表交换了一下:)。
全外连接语义返回左表和右表的并集,不存在一边补NULL,用于演示的MySQL数据库不支持FULL OUTER JOIN。这里不作演示了。
上面介绍的INNER JOIN、OUTER JOIN都是不一样表之间的联接查询,自联接是一张表以不一样的别名作为左右两个表,能够进行如上的INNER JOIN和OUTER JOIN。以下看一个INNER 自联接:
mysql> SELECT * FROM student l JOIN student r where l.no = r.no; +------+-------+------+------+-------+------+ | no | name | sex | no | name | sex | +------+-------+------+------+-------+------+ | S001 | Sunny | M | S001 | Sunny | M | | S002 | Tom | F | S002 | Tom | F | | S003 | Kevin | M | S003 | Kevin | M | +------+-------+------+------+-------+------+ 3 rows in set (0.00 sec)
这里说的不等值联接是SQL92语法里面的ON子句里面只有不等值联接,好比:
mysql> SELECT -> s.c_no, s.score, no, name -> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no; +------+-------+------+-------+ | c_no | score | no | name | +------+-------+------+-------+ | C01 | 80 | S001 | Sunny | | C01 | 80 | S002 | Tom | | C01 | 80 | S003 | Kevin | | C02 | 98 | S001 | Sunny | | C02 | 98 | S002 | Tom | | C02 | 98 | S003 | Kevin | | C03 | 76 | S001 | Sunny | | C03 | 76 | S002 | Tom | | C03 | 76 | S003 | Kevin | | C01 | 78 | S001 | Sunny | | C01 | 78 | S002 | Tom | | C01 | 78 | S003 | Kevin | | C02 | 88 | S001 | Sunny | | C02 | 88 | S002 | Tom | | C02 | 88 | S003 | Kevin | | C03 | 68 | S001 | Sunny | | C03 | 68 | S002 | Tom | | C03 | 68 | S003 | Kevin | +------+-------+------+-------+ 18 rows in set (0.00 sec)
上面这示例,其实没有什么实际业务价值,在实际的使用场景中,不等值联接每每是结合等值联接,将不等值条件在WHERE子句指定,即, 带有WHERE子句的等值联接。
CROSS | INNER | OUTER | SELF | ON | WHERE | |
---|---|---|---|---|---|---|
Apache Flink | N | Y | Y | Y | 必选 | 可选 |
Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 能够转换为普通的INNER和OUTER)。在语义上面Apache Flink严格遵照标准SQL的语义,与上面演示的语义一致。下面我重点介绍Apache Flink中JOIN的实现原理。
传统数据库表的JOIN是两张静态表的数据联接,在流上面是 动态表(关于流与动态表的关系请查阅 《Apache Flink 漫谈系列 - 流表对偶(duality)性)》,双流JOIN的数据不断流入与传统数据库表的JOIN有以下3个核心区别:
分布式流计算全部数据会进行Shuffle,怎么才能保障左右两边流的要JOIN的数据会在相同的节点进行处理呢?在双流JOIN的场景,咱们会利用JOIN中ON的联接key进行partition,确保两个流相同的联接key会在同一个节点处理。
不管是INNER JOIN仍是OUTER JOIN 都须要对左右两边的流的数据进行保存,JOIN算子会开辟左右两个State进行数据存储,左右两边的数据到来时候,进行以下操做:
JOIN有不少复杂的场景,咱们先以最简单的场景进行实现原理的介绍,好比:最直接的两个进行INNER JOIN,好比查询产品库存和订单数量,库存变化事件流和订单事件流进行INNER JOIN,JION条件是产品ID,具体以下:
双流JOIN两边事件都会存储到State里面,如上,事件流按照标号前后流入到join节点,咱们假设右边流比较快,先流入了3个事件,3个事件会存储到state中,但由于左边尚未数据,全部右边前3个事件流入时候,没有join结果流出,当左边第一个事件序号为4的流入时候,先存储左边state,再与右边已经流入的3个事件进行join,join的结果如图 三行结果会流入到下游节点sink。当第5号事件流入时候,也会和左边第4号事件进行join,流出一条jion结果到下游节点。这里关于INNER JOIN的语义和你们强调两点:
LEFT OUTER JOIN 能够简写 LEFT JOIN,语义上和INNER JOIN的区别是不论右流是否有JOIN的事件,左流的事件都须要流入下游节点,但右流没有能够JION的事件时候,右边的事件补NULL。一样咱们以最简单的场景说明LEFT JOIN的实现,好比查询产品库存和订单数量,库存变化事件流和订单事件流进行LEFT JOIN,JION条件是产品ID,具体以下:
下图也是表达LEFT JOIN的语义,只是展示方式不一样:
上图主要关注点是当左边先流入1,2事件时候,右边没有能够join的事件时候会向下游发送左边事件并补NULL向下游发出,当右边第一个相同的Join key到来的时候会将左边先来的事件发出的带有NULL的事件撤回(对应上面command的-记录,+表明正向记录,-表明撤回记录)。这里强调三点:
RIGHT JOIN内部实现与LEFT JOIN相似, FULL JOIN和LEFT JOIN的区别是左右两边都会产生补NULL和撤回的操做。对于State的使用都是类似的,这里再也不重复说明了。
上面咱们介绍了双流JOIN会使用State记录左右两边流的事件,同时咱们示例数据的场景也是比较简单,好比流上没有更新事件(没有撤回事件),同时流上没有重复行事件。那么咱们尝试思考下面的事件流在双流JOIN时候是怎么处理的?
上图示例是连续产生了2笔销售数量同样的订单,同时在产生一笔销售数量为5的订单以后,又将该订单取消了(或者退货了),这样在事件流上面就会是上图的示意,这种状况Blink内部如何支撑呢?
根据JOIN的语义以INNER JOIN为例,右边有两条相同的订单流入,咱们就应该向下游输出两条JOIN结果,当有撤回的事件流入时候,咱们也须要将已经下发下游的JOIN事件撤回,以下:
上面的场景以及LEFT JOIN部分介绍的撤回状况,Apache Flink内部须要处理以下几个核心点:
在Apache Flink内部对不一样的场景有特殊的数据结构优化,本篇咱们只针对上面说的状况(通用设计)介绍一下双流JOIN的State的数据结构和用途:
Map<JoinKey, Map<rowData, count>>;
咱们在 《Apache Flink 漫谈系列 - 持续查询(Continuous Queries)》篇中以双流JOIN为例介绍了如何构造业务上的PK source,构造PK source本质上在保证业务语义的同时也是对双流JOIN的一种优化,好比多级LEFT JOIN会让流上的数据不断膨胀,形成JOIN节点性能较慢,JOIN以后的下游节点边堵(数据量大致使,非热点)。那么嫌少流入JOIN的数据,好比构造PK source就会大大减小JOIN数据的膨胀。这里再也不重复举例,你们能够查阅 《Apache Flink 漫谈系列 - 持续查询(Continuous Queries)》 的双流JOIN示例部分。
好比咱们有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的业务,JOB的DAG以下:
假设在实际业务中有这样的特色,大部分时候当A事件流入的时候,B尚未能够JOIN的数据,可是B来的时候,A已经有能够JOIN的数据了,这特色就会致使,A LEFT JOIN B 会产生大量的 (A, NULL),其中包括B里面的 cCol 列也是NULL,这时候当与C进行LEFT JOIN的时候,首先Blink内部会利用cCol对AB的JOIN产生的事件流进行Shuffle, cCol是NULL进而是下游节点大量的NULL事件流入,形成热点。那么这问题如何解决呢?
咱们能够改变JOIN的前后顺序,来保证A LEFT JOIN B 不会产生NULL的热点问题,以下:
对于JOIN算子的实现咱们知道左右两边的事件都会存储到State中,在流入事件时候在从另外一边读取全部事件进行JOIN计算,这样的实现逻辑在数据量很大的场景会有必定的state操做瓶颈,咱们某些场景能够经过业务角度调整JOIN的顺序,来消除性能瓶颈,好比:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 这样的场景,若是 A与B进行JOIN产生数据量很大,可是B与C进行JOIN产生的数据量很小,那么咱们能够强制调整JOIN的联接顺序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 以下示意图:
本篇向你们介绍了数据库设计范式的要求和实际业务的查询须要是传统数据库JOIN算子存在的缘由,并以具体示例的方式向你们介绍JOIN在数据库的查询过程,以及潜在的查询优化,再以实际的例子介绍Apache Flink上面的双流JOIN的实现原理和State数据结构设计,最后向你们介绍两个双流JOIN的使用优化。
本系列文章不免有不少缺陷和不足,真诚但愿读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢你们!