今天上午被 Flink 的一个算子困惑了下,具体问题是什么呢?html
我有这么个需求:有不一样种类型的告警数据流(包含恢复数据),而后我要将这些数据流作一个拆分,拆分后的话,每种告警里面的数据又想将告警数据和恢复数据拆分出来。java
结果,这个需求用 Flink 的 Split 运算符出现了问题。git
需求以下图所示:github
我是指望如上这样将数据流进行拆分的,最后将每种告警和恢复用不一样的消息模版作一个渲染,渲染后再经过各类其余的方式(钉钉群
邮件、短信)进行告警通知。docker
因而个人代码大概的结构以下代码所示:apache
//dataStream 是总的数据流 //split 是拆分后的数据流 SplitStream<AlertEvent> split = dataStream.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); switch (value.getType()) { case MIDDLEWARE: tags.add(MIDDLEWARE); break; case HEALTH_CHECK: tags.add(HEALTH_CHECK); break; case DOCKER: tags.add(DOCKER); break; //... //固然这里还能够不少种类型 } return tags; } }); //而后你想获取每种不一样的数据类型,你可使用 select DataStream<AlertEvent> middleware = split.select(MIDDLEWARE); //选出中间件的数据流 //而后你又要将中间件的数据流分流成告警和恢复 SplitStream<AlertEvent> middlewareSplit = middleware.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); middlewareSplit.select(ALERT).print(); DataStream<AlertEvent> healthCheck = split.select(HEALTH_CHECK); //选出健康检查的数据流 //而后你又要将健康检查的数据流分流成告警和恢复 SplitStream<AlertEvent> healthCheckSplit = healthCheck.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); healthCheckSplit.select(ALERT).print(); DataStream<AlertEvent> docekr = split.select(DOCKER); //选出容器的数据流 //而后你又要将容器的数据流分流成告警和恢复 SplitStream<AlertEvent> dockerSplit = docekr.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); dockerSplit.select(ALERT).print();
结构我抽象后大概就长上面这样,而后我先本地测试的时候只把容器的数据那块代码打开了,其余种告警的分流代码注释掉了,一运行,发现居然容器告警的数据怎么还掺杂着健康检查的数据也一块儿打印出来了,一开始我觉得本身出了啥问题,就再起码运行了三遍 IDEA 才发现结果一直都是这样的。编程
因而,我只好在第二步分流前将 docekr 数据流打印出来,发现是没什么问题,打印出来的数据都是容器相关的,没有掺杂着其余种的数据啊。这会儿遍陷入了沉思,懵逼发呆了一会。微信
因而仍是开始面向 Google 编程:session
发现第一条就找到答案了,简直不要太快,点进去能够看到他也有这样的需求:ide
而后这个小伙伴还挣扎了下用不一样的方法(虽然结果更惨):
最后换了个姿式就行了(果真小伙子会的姿式挺多的):
但从这篇文章中,我找到了关联到的两个 Flink Issue,分别是:
一、https://issues.apache.org/jira/browse/FLINK-5031
二、https://issues.apache.org/jira/browse/FLINK-11084
而后呢,从第二个 Issue 的讨论中我发现了一些颇有趣的讨论:
对话颇有趣,可是我忽然想到以前个人知识星球里面一位很细心的小伙伴问的一个问题了:
能够发现代码上确实是标明了过时了,可是注释里面没写清楚推荐用啥,幸亏我看到了这个 Issue,否则脑子里面估计这个问题一直会存着呢。
那么这个问题解决方法是否是意味着就能够利用 Side Outputs 来解决呢?固然能够啦,官方都推荐了,还不能都话,那么不是打脸啪啪啪的响吗?不过这里仍是卖个关子将 Side Outputs 后面专门用一篇文章来说,感兴趣的能够先看看官网介绍:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
另外其实也能够经过 split + filter 组合来解决这个问题,反正关键就是不要连续的用 split 来分流。
用 split + filter 的方案代码大概以下:
DataStream<AlertEvent> docekr = split.select(DOCKER); //选出容器的数据流 //容器告警的数据流 docekr.filter(new FilterFunction<AlertEvent>() { @Override public boolean filter(AlertEvent value) throws Exception { return !value.isRecover(); } }) .print(); //容器恢复的数据流 docekr.filter(new FilterFunction<AlertEvent>() { @Override public boolean filter(AlertEvent value) throws Exception { return value.isRecover(); } }) .print();
上面这种就是屡次 filter 也能够知足需求,可是就是代码有点啰嗦。
Flink 中不支持连续的 Split/Select 分流操做,要实现连续分流也能够经过其余的方式(split + filter 或者 side output)来实现
本篇文章链接是:http://www.54tianzhisheng.cn/2019/06/12/flink-split/
微信公众号:zhisheng
另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号了。你能够加个人微信:zhisheng_tian,而后回复关键字:Flink 便可无条件获取到。
更多私密资料请加入知识星球!
https://github.com/zhisheng17/flink-learning/
之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客。
一、Flink 从0到1学习—— Apache Flink 介绍
二、Flink 从0到1学习—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
四、Flink 从0到1学习—— Data Source 介绍
五、Flink 从0到1学习—— 如何自定义 Data Source ?
七、Flink 从0到1学习—— 如何自定义 Data Sink ?
八、Flink 从0到1学习—— Flink Data transformation(转换)
九、Flink 从0到1学习—— 介绍Flink中的Stream Windows
十、Flink 从0到1学习—— Flink 中的几种 Time 详解
十一、Flink 从0到1学习—— Flink 写入数据到 ElasticSearch
十二、Flink 从0到1学习—— Flink 项目如何运行?
1三、Flink 从0到1学习—— Flink 写入数据到 Kafka
1四、Flink 从0到1学习—— Flink JobManager 高可用性配置
1五、Flink 从0到1学习—— Flink parallelism 和 Slot 介绍
1六、Flink 从0到1学习—— Flink 读取 Kafka 数据批量写入到 MySQL
1七、Flink 从0到1学习—— Flink 读取 Kafka 数据写入到 RabbitMQ
1八、Flink 从0到1学习》—— 你上传的 jar 包藏到哪里去了?
1九、Flink 从0到1学习 —— Flink 中如何管理配置?
四、Flink 源码解析 —— standalonesession 模式启动流程
五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程
八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程
九、Flink 源码解析 —— 如何获取 JobGraph?
十、Flink 源码解析 —— 如何获取 StreamGraph?
十一、Flink 源码解析 —— Flink JobManager 有什么做用?
十二、Flink 源码解析 —— Flink TaskManager 有什么做用?
1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程
1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程
1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制
1六、Flink 源码解析 —— 深度解析 Flink 序列化机制
1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng