简介:本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面作的优化措施。git
本文做者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为:github
- 背景
- Flink SQL 的优化
- 总结
GitHub 地址
https://github.com/apache/flink
欢迎你们给 Flink 点赞送 star~算法
目前,京东搜索推荐的数据处理流程如上图所示。能够看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm。数据库
这就形成了如下现象:在一个业务引擎里,用户须要维护两套环境、两套代码,许多共性不能复用,数据的质量和一致性很可贵到保障。且由于流批底层数据模型不一致,致使须要作大量的拼凑逻辑;甚至为了数据一致性,须要作大量的同比、环比、二次加工等数据对比,效率极差,而且很是容易出错。apache
而支持批流一体的 Flink SQL 能够很大程度上解决这个痛点,所以咱们决定引入 Flink 来解决这种问题。segmentfault
在大多数做业,特别是 Flink 做业中,执行效率的优化一直是 Flink 任务优化的关键,在京东天天数据增量 PB 级状况下,做业的优化显得尤其重要。缓存
写过一些 SQL 做业的同窗确定都知道,对于 Flink SQL 做业,在一些状况下会形成同一个 UDF 被反复调用的状况,这对一些消耗资源的任务很是不友好;此外,影响执行效率大体能够从 shuffle、join、failover 策略等方面考虑;另外,Flink 任务调试的过程也很是复杂,对于一些线上机器隔离的公司来讲尤甚。安全
为此,咱们实现了内嵌式的 Derby 来做为 Hive 的元数据存储数据库 (allowEmbedded);在任务恢复方面,批式做业没有 checkpoint 机制来实现failover,可是 Flink 特有的 region 策略可使批式做业快速恢复;此外,本文还介绍了对象重用等相关优化措施。服务器
在 Flink SQL 任务里会出现如下这种状况:若是相同的 UDF 既出如今 LogicalProject 中,又出如今 Where 条件中,那么 UDF 会进行屡次调用 (见https://issues.apache.org/jira/browse/FLINK-20887))。可是若是该 UDF 很是耗 CPU 或者内存,这种多余的计算会很是影响性能,为此咱们但愿能把 UDF 的结果缓存起来下次直接使用。在设计的时候须要考虑:(很是重要:请必定保证 LogicalProject 和 where 条件的 subtask chain 到一块儿)网络
根据以上考虑,咱们用 guava cache 将 UDF 的结果缓存起来,以后调用的时候直接去cache 里面拿数据,最大可能下降任务的消耗。下面是一个简单的使用(同时设置了最大使用 size、超时时间,可是没有写锁):
public class RandomFunction extends ScalarFunction { private static Cache<String, Integer> cache = CacheBuilder.newBuilder() .maximumSize(2) .expireAfterWrite(3, TimeUnit.SECONDS) .build(); public int eval(String pvid) { profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet()); Integer result = cache.getIfPresent(pvid); if (null == result) { int tmp = (int)(Math.random() * 1000); cache.put("pvid", tmp); return tmp; } return result; } @Override public void close() throws Exception { super.close(); cache.cleanUp(); } }
你们可能会好奇为何会把单元测试也放到优化里面,你们都知道 Flink 任务调试过程很是复杂,对于一些线上机器隔离的公司来讲尤甚。京东的本地环境是没有办法访问任务服务器的,所以在初始阶段调试任务,咱们耗费了不少时间用来上传 jar 包、查看日志等行为。
为了下降任务的调试时间、增长代码开发人员的开发效率,实现了内嵌式的 Derby 来做为 Hive 的元数据存储数据库 (allowEmbedded),这算是一种优化开发时间的方法。具体思路以下:
首先建立 Hive Conf:
public static HiveConf createHiveConf() { ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader(); HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); try { TEMPORARY_FOLDER.create(); String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); HiveConf hiveConf = new HiveConf(); hiveConf.setVar( HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); hiveConf.set("datanucleus.connectionPoolingType", "None"); hiveConf.set("hive.metastore.schema.verification", "false"); hiveConf.set("datanucleus.schema.autoCreateTables", "true"); return hiveConf; } catch (IOException e) { throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e); } }
接下来建立 Hive Catalog:(利用反射的方式调用 embedded 的接口)
public static void createCatalog() throws Exception{ Class clazz = HiveCatalog.class; Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class}); c1.setAccessible(true); hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true}); hiveCatalog.open(); }
建立 tableEnvironment:(同官网)
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); TableConfig tableConfig = tableEnv.getConfig(); Configuration configuration = new Configuration(); configuration.setInteger("table.exec.resource.default-parallelism", 1); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName());
最后关闭 Hive Catalog:
public static void closeCatalog() { if (hiveCatalog != null) { hiveCatalog.close(); } }
此外,对于单元测试,构建合适的数据集也是一个很是大的功能,咱们实现了 CollectionTableFactory,容许本身构建合适的数据集,使用方法以下:
CollectionTableFactory.reset(); CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case"))); StringBuilder sbFilesSource = new StringBuilder(); sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + " `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')"); tableEnv.executeSql(sbFilesSource.toString());
传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
效率 | 空间 | 备注 | |
---|---|---|---|
Nested-loop Join | 差 | 占用大 | |
Sort-Merge Join | 有sort merge开销 | 占用小 | 有序数据集的一种优化措施 |
Hash Join | 高 | 占用大 | 适合大小表 |
Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。
如下两张图片是禁用前和禁用后的效果 (若是你的禁用没有生效,先看一下是否是 Equi-Join):
Hash Join 一样分为两个阶段:首先将一个数据集转换为 Hash Table,而后遍历另一个数据集元素并与 Hash Table 内的元素进行匹配。
Hash Join 效率较高可是对空间要求较大,一般是做为 Join 其中一个表为适合放入内存的小表的状况下的优化方案 (并非不容许溢写磁盘)。
注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于做为比较算子)。
Flink 在 join 之上又作了一些细分,具体包括:
特色 | 使用 | |
---|---|---|
Repartition-Repartition strategy | 对数据集分别进行分区和shuffle,若是数据集大的时候效率极差 | 两个数据集相差不大 |
Broadcast-Forward strategy | 将小表的数据所有发送到大表数据的机器上 | 两个数据集有较大的差距 |
众所周知,batch 的 shuffle 很是耗时间。
能够经过:table.optimizer.join.broadcast-threshold 来设置采用 broadcast 的 table 大小,若是设置为 “-1”,表示禁用 broadcast。
下图为禁用先后的效果:
在 Flink SQL 任务里,下降 shuffle 能够有效的提升 SQL 任务的吞吐量,在实际的业务场景中常常遇到这样的状况:上游产出的数据已经知足了数据分布要求 (如连续多个 join 算子,其中 key 是相同的),此时 Flink 的 forward shuffle 是冗余的 shuffle,咱们但愿将这些算子 chain 到一块儿。Flink 1.12 引入了 mutiple input 的特性,能够消除大部分不必的 forward shuffle,把 source 的算子 chain 到一块儿。
table.optimizer.multiple-input-enabled:true
下图为开了 multiple input 和没有开的拓扑图 ( operator chain 功能已经打开):
上下游 operator 之间会通过序列化 / 反序列化 / 复制阶段来进行数据传输,这种行为很是影响 Flink SQL 程序的性能,能够经过启用对象重用来提升性能。可是这在 DataStream 里面很是危险,由于可能会发生如下状况:在下一个算子中修改对象意外影响了上面算子的对象。
可是 Flink 的 Table / SQL API 中是很是安全的,能够经过以下方式来启用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse();
或者是经过设置:pipeline-object-reuse:true
为何启用了对象重用会有这么大的性能提高?在 Blink planner 中,同一任务的两个算子之间的数据交换最终将调用 BinaryString#copy,查看实现代码,能够发现 BinaryString#copy 须要复制底层 MemorySegment 的字节,经过启用对象重用来避免复制,能够有效提高效率。
下图为没有开启对象重用时相应的火焰图:
batch 任务模式下 checkpoint 以及其相关的特性所有都不可用,所以针对实时任务的基于 checkpoint 的 failover 策略是不能应用在批任务上面的,可是 batch 任务容许 Task 之间经过 Blocking Shuffle 进行通讯,当一个 Task 由于任务未知的缘由失败以后,因为 Blocking Shuffle 中存储了这个 Task 所须要的所有数据,因此只须要重启这个 Task 以及经过 Pipeline Shuffle 与其相连的所有下游任务便可:
jobmanager.execution.failover-strategy:region (已经 finish 的 operator 可直接恢复)
table.exec.shuffle-mode:ALL\_EDGES\_BLOCKING (shuffle 策略)。
Flink 里的 shuffle 分为 pipeline shuffle 和 blocking shuffle。
blocking shuffle 就是传统的 batch shuffle,会将数据落盘,这种 shuffle 的容错好,可是会产生大量的磁盘、网络 io (若是为了省心的话,建议用 blocking suffle)。blocking shuffle 又分为 hash shuffle 和 sort shuffle,
相应的控制参数:
table.exec.shuffle-mode,该参数有多个参数,默认是 ALL\_EDGES\_BLOCKING,表示全部的边都会用 blocking shuffle,不过你们能够试一下 POINTWISE\_EDGES\_PIPELINED,表示 forward 和 rescale edges 会自动开始 pipeline 模式。
taskmanager.network.sort-shuffle.min-parallelism ,将这个参数设置为小于你的并行度,就能够开启 sort-merge shuffle;这个参数的设置须要考虑一些其余的状况,具体的能够按照官网设置。
本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面作的优化措施。另外,感谢京东实时计算研发部付海涛等所有同事的支持与帮助。
本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。