Flink Connector 深度解析

做者介绍:董亭亭,快手大数据架构实时计算引擎团队负责人。目前负责 Flink 引擎在快手内的研发、应用以及周边子系统建设。2013 年毕业于大连理工大学,曾就任于奇虎 360、58 集团。主要研究领域包括:分布式计算、调度系统、分布式存储等系统。html

<!--more-->java

本文主要分享Flink connector相关内容,分为如下三个部分的内容:第一部分会首先介绍一下Flink Connector有哪些。第二部分会重点介绍在生产环境中常用的kafka connector的基本的原理以及使用方法。第三部分答疑环节,看你们有没有一些问题。mysql

Flink Streaming Connector

Flink是新一代流批统一的计算引擎,它须要从不一样的第三方存储引擎中把数据读过来,进行处理,而后再写出到另外的存储引擎中。Connector的做用就至关于一个链接器,链接 Flink 计算引擎跟外界存储系统。Flink里有如下几种方式,固然也不限于这几种方式能够跟外界进行数据交换:第一种 Flink里面预约义了一些source和sink。第二种 FLink内部也提供了一些Boundled connectors。第三种 可使用第三方apache Bahir项目中提供的链接器。第四种是经过异步IO方式。下面分别简单介绍一下这四种数据读写的方式。git

预约义的source和sink

Flink里预约义了一部分source和sink。在这里分了几类。github

  • 基于文件的source和sink。

若是要从文本文件中读取数据,能够直接使用正则表达式

  • env.readTextFile(path)

就能够以文本的形式读取该文件中的内容。固然也可使用redis

  • env.readFile(fileInputFormat, path)

根据指定的fileInputFormat格式读取文件中的内容。sql

若是数据在FLink内进行了一系列的计算,想把结果写出到文件里,也能够直接使用内部预约义的一些sink,好比将结果已文本或csv格式写出到文件中,可使用DataStream的writeAsText(path)和 writeAsCsv(path)。apache

  • 基于Socket的Source和Sink

提供Socket的host name及port,能够直接用StreamExecutionEnvironment预约的接口socketTextStream建立基于Socket的source,从该socket中以文本的形式读取数据。固然若是想把结果写出到另一个Socket,也能够直接调用DataStream writeToSocket。json

  • 基于内存 Collections、Iterators 的Source能够直接基于内存中的集合或者迭代器,调用StreamExecutionEnvironment fromCollection、fromElements构建相应的source。结果数据也能够直接print、printToError的方式写出到标准输出或标准错误。

详细也能够参考Flink源码中提供的一些相对应的Examples来查看异常预约义source和sink的使用方法,例如WordCount、SocketWindowWordCount。

Bundled Connectors

Flink里已经提供了一些绑定的Connector,例如kafka source和sink,Es sink等。读写kafka、es、rabbitMQ时能够直接使用相应connector的api便可。第二部分会详细介绍生产环境中最经常使用的kafka connector。

虽然该部分是Flink 项目源代码里的一部分,可是真正意义上不算做flink引擎相关逻辑,而且该部分没有打包在二进制的发布包里面。因此在提交Job时候须要注意,job代码jar包中必定要将相应的connetor相关类打包进去,不然在提交做业时就会失败,提示找不到相应的类,或初始化某些类异常。

Apache Bahir中的链接器

Apache Bahir 最初是从 Apache Spark 中独立出来项目提供,以提供不限于 Spark 相关的扩展/插件、链接器和其余可插入组件的实现。经过提供多样化的流链接器(streaming connectors)和 SQL 数据源扩展分析平台的覆盖面。若有须要写到flume、redis的需求的话,可使用该项目提供的connector。

Async I/O

流计算中常常须要与外部存储系统交互,好比须要关联mysql中的某个表。通常来讲,若是用同步I/O的方式,会形成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步I/O能够并发处理多个请求,提升吞吐,减小延迟。

Async的原理可参考官方文档:<https://ci.apache.org/project...

Flink Kafka Connector

本章重点介绍生产环境中最经常使用到的Flink kafka connector。使用flink的同窗,必定会很熟悉kafka,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也常常会跟kafka进行一些数据的交换,好比利用kafka consumer读取数据,而后进行一系列的处理以后,再将结果写出到kafka中。这里会主要分两个部分进行介绍,一是Flink kafka Consumer,一个是Flink kafka Producer。

首先看一个例子来串联下Flink kafka connector。代码逻辑里主要是从kafka里读数据,而后作简单的处理,再写回到kafka中。

分别用红色框 框出 如何构造一个Source sink Function. Flink提供了现成的构造FLinkKafkaConsumer、Producer的接口,能够直接使用。这里须要注意,由于kafka有多个版本,多个版本之间的接口协议会不一样。Flink针对不一样版本的kafka有相应的版本的Consumer和Producer。例如:针对0八、0九、十、11版本,Flink对应的consumer分别是FlinkKafkaConsumer0八、0九、0十、011,producer也是。

Flink kafka Consumer

反序列化数据

由于kafka中数据都是以二进制byte形式存储的。读到flink系统中以后,须要将二进制数据转化为具体的java、scala对象。具体须要实现一个schema类,定义如何序列化和反序列数据。反序列化时须要实现DeserializationSchema接口,并重写deserialize(byte[] message)函数,若是是反序列化kafka中kv的数据时,须要实现KeyedDeserializationSchema接口,并重写deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)函数。

另外Flink中也提供了一些经常使用的序列化反序列化的schema类。例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。TypeInformationSerializationSchema,它可根据Flink的TypeInformation信息来推断出须要选择的schema。JsonDeserializationSchema 使用jackson反序列化json格式消息,并返回ObjectNode,可使用.get(“property”)方法来访问相应字段。

消费起始位置设置

如何设置做业从kafka消费数据最开始的起始位置,这一部分flink也提供了很是好的封装。在构造好的FlinkKafkaConsumer类后面调用以下相应函数,设置合适的其实位置。

setStartFromGroupOffsets,也是默认的策略,从group offset位置读取数据,group offset指的是kafka broker端记录的某个group的最后一次的消费位置。可是kafka broker端没有该group信息,会根据kafka的参数"auto.offset.reset"的设置来决定从哪一个位置开始消费。

setStartFromEarliest,从kafka最先的位置开始读取。

setStartFromLatest,从kafka最新的位置开始读取。

setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka时戳,是指kafka为每条消息增长另外一个时戳。该时戳能够表示消息在proudcer端生成时的时间、或进入到kafka broker时的时间。

setStartFromSpecificOffsets,从指定分区的offset位置开始读取,如指定的offsets中不存某个分区,该分区从group offset位置开始读取。此时须要用户给定一个具体的分区、offset的集合。

一些具体的使用方法能够参考下图。须要注意的是,由于flink框架有容错机制,若是做业故障,若是做业开启checkpoint,会从上一次checkpoint状态开始恢复。或者在中止做业的时候主动作savepoint,启动做业时从savepoint开始恢复。这两种状况下恢复做业时,做业消费起始位置是从以前保存的状态中恢复,与上面提到跟kafka这些单独的配置无关。

topic和partition动态发现

实际的生产环境中可能有这样一些需求,好比场景一,有一个flink做业须要将五份数据聚合到一块儿,五份数据对应五个kafka topic,随着业务增加,新增一类数据,同时新增了一个kafka topic,如何在不重启做业的状况下做业自动感知新的topic。场景二,做业从一个固定的kafka topic读数据,开始该topic有10个partition,但随着业务的增加数据量变大,须要对kafka partition个数进行扩容,由10个扩容到20。该状况下如何在不重启做业状况下动态感知新扩容的partition?

针对上面的两种场景,首先须要在构建FlinkKafkaConsumer时的properties中设置flink.partition-discovery.interval-millis参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时FLinkKafkaConsumer内部会启动一个单独的线程按期去kafka获取最新的meta信息。针对场景一,还需在构建FlinkKafkaConsumer时,topic的描述能够传一个正则表达式描述的pattern。每次获取最新kafka meta时获取正则匹配的最新topic列表。针对场景二,设置前面的动态发现参数,在按期获取kafka最新meta信息时会匹配新的partition。为了保证数据的正确性,新发现的partition从最先的位置开始读取。

commit offset方式

Flink kafka consumer commit offset方式须要区分是否开启了checkpoint。

若是checkpoint关闭,commit offset要依赖于kafka客户端的auto commit。需设置enable.auto.commit, auto.commit.interval.ms 参数到consumer properties,就会按固定的时间间隔按期auto commit offset到kafka。

若是开启checkpoint,这个时候做业消费的offset是Flink在state中本身管理和容错。此时提交offset到kafka,通常都是做为外部进度的监控,想实时知道做业消费的位置和lag状况。此时须要setCommitOffsetsOnCheckpoints为true来设置当checkpoint成功时提交offset到kafka。此时commit offset的间隔就取决于checkpoint的间隔,因此此时从kafka一侧看到的lag可能并不是彻底实时,若是checkpoint间隔比较长lag曲线可能会是一个锯齿状。

Timestamp Extraction/Watermark生成

咱们知道当flink做业内使用EventTime属性时,须要指定从消息中提取时戳和生成水位的函数。FlinkKakfaConsumer构造的source后直接调用assignTimestampsAndWatermarks函数设置水位生成器的好处是此时是每一个partition一个watermark assigner,以下图。source生成的睡戳为多个partition时戳对齐后的最小时戳。此时在一个source读取多个partition,而且partition之间数据时戳有必定差距的状况下,由于在source端watermark在partition级别有对齐,不会致使数据读取较慢partition数据丢失。

Flink kafka Producer

Producer 分区

使用FlinkKafkaProducer往kafka中写数据时,若是不单独设置partition策略,会默认使用FlinkFixedPartitioner,该partitioner分区的方式是task所在的并发id对topic 总partition数取余:parallelInstanceId % partitions.length。此时若是sink为4,paritition为1,则4个task往同一个partition中写数据。但当sink task< partition 个数时会有部分partition没有数据写入,例如sink task为2,partition总数为4,则后面两个partition将没有数据写入。若是构建FlinkKafkaProducer时,partition设置为null,此时会使用kafka producer默认分区方式,非key写入的状况下,使用round-robin的方式进行分区,每一个task都会轮训的写下游的全部partition。该方式下游的partition数据会比较均衡,可是缺点是partition个数过多的状况下维持过多的网络连接,即每一个task都会维持跟全部partition所在broker的连接。

容错

Flink kafka 0九、010版本下,经过setLogFailuresOnly为false,setFlushOnCheckpoint为true,能达到at-least-once语义。setLogFailuresOnly,默认为false,是控制写kafka失败时,是否只打印失败的log不抛异常让做业中止。setFlushOnCheckpoint,默认为true,是控制是否在checkpoint时fluse数据到kafka,保证数据已经写到kafka。不然数据有可能还缓存在kafka 客户端的buffer中,并无真正写出到kafka,此时做业挂掉数据即丢失,不能作到至少一次的语义。

Flink kafka 011版本下,经过两阶段提交的sink结合kafka事务的功能,能够保证端到端精准一次。详细原理能够参考:https://www.ververica.com/blo...

Q&A

(1)在flink consumer的并行度的设置:是对应topic的partitions个数吗?要是有多个主题数据源,并行度是设置成整体的partitions数吗?
答:这个并非绝对的,跟topic的数据量也有关,若是数据量不大,也能够设置小于partitions个数的并发数。但不要设置并发数大于partitions总数,由于这种状况下某些并发由于分配不到partition致使没有数据处理。

(2)若是 partitioner 传 null 的时候是 round-robin 发到每个partition?若是有 key 的时候行为是 kafka 那种按照 key 分布到具体分区的行为吗?
答:若是在构造FlinkKafkaProducer时,若是没有设置单独的partitioner,则默认使用FlinkFixedPartitioner,此时不管是带key的数据,仍是不带key。若是主动设置partitioner为null时,不带key的数据会round-robin的方式写出,带key的数据会根据key,相同key数据分区的相同的partition,若是key为null,再轮询写。不带key的数据会轮询写各partition。

(3)若是checkpoint时间过长,offset未提交到kafka,此时节点宕机了,重启以后的重复消费如何保证呢?
首先开启checkpoint时offset是flink经过状态state管理和恢复的,并非从kafka的offset位置恢复。在checkpoint机制下,做业从最近一次checkpoint恢复,自己是会回放部分历史数据,致使部分数据重复消费,Flink引擎仅保证计算状态的精准一次,要想作到端到端精准一次须要依赖一些幂等的存储系统或者事务操做。

关注我

微信公众号:zhisheng

另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号(zhisheng)了,你能够回复关键字:Flink 便可无条件获取到。另外也能够加我微信 你能够加个人微信:yuanblog_tzs,探讨技术!

更多私密资料请加入知识星球!

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

之后这个项目的全部代码都将放在这个仓库里,包含了本身学习 flink 的一些 demo 和博客

博客

一、Flink 从0到1学习 —— Apache Flink 介绍

二、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

三、Flink 从0到1学习 —— Flink 配置文件详解

四、Flink 从0到1学习 —— Data Source 介绍

五、Flink 从0到1学习 —— 如何自定义 Data Source ?

六、Flink 从0到1学习 —— Data Sink 介绍

七、Flink 从0到1学习 —— 如何自定义 Data Sink ?

八、Flink 从0到1学习 —— Flink Data transformation(转换)

九、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows

十、Flink 从0到1学习 —— Flink 中的几种 Time 详解

十一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch

十二、Flink 从0到1学习 —— Flink 项目如何运行?

1三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka

1四、Flink 从0到1学习 —— Flink JobManager 高可用性配置

1五、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍

1六、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL

1七、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ

1八、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase

1九、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS

20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis

2一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra

2二、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume

2三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB

2四、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ

2五、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了

2六、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了

2七、阿里巴巴开源的 Blink 实时计算框架真香

2八、Flink 从0到1学习 —— Flink 中如何管理配置?

2九、Flink 从0到1学习—— Flink 不能够连续 Split(分流)?

30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

3一、Flink 架构、原理与部署测试

3二、为何说流处理即将来?

3三、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库

3四、流计算框架 Flink 与 Storm 的性能对比

3五、Flink状态管理和容错机制介绍

3六、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

3七、360深度实践:Flink与Storm协议级对比

3八、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

3九、Apache Flink 1.9 重大特性提早解读

40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

4一、Flink 灵魂两百问,这谁顶得住?

4二、Flink 从0到1学习 —— 如何使用 Side Output 来分流?

4三、你公司到底需不须要引入实时计算引擎?

4四、一文让你完全了解大数据实时计算引擎 Flink

源码解析

一、Flink 源码解析 —— 源码编译运行

二、Flink 源码解析 —— 项目结构一览

三、Flink 源码解析—— local 模式启动流程

四、Flink 源码解析 —— standalone session 模式启动流程

五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动

六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动

七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程

八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程

九、Flink 源码解析 —— 如何获取 JobGraph?

十、Flink 源码解析 —— 如何获取 StreamGraph?

十一、Flink 源码解析 —— Flink JobManager 有什么做用?

十二、Flink 源码解析 —— Flink TaskManager 有什么做用?

1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制

1六、Flink 源码解析 —— 深度解析 Flink 序列化机制

1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?

1八、Flink Metrics 源码解析 —— Flink-metrics-core

1九、Flink Metrics 源码解析 —— Flink-metrics-datadog

20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源码解析 —— Flink-metrics-graphite

2二、Flink Metrics 源码解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源码解析 —— Flink-metrics-jmx

2四、Flink Metrics 源码解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源码解析 —— Flink-metrics-statsd

2六、Flink Metrics 源码解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源码解析

2七、Flink 源码解析 —— 如何获取 ExecutionGraph ?

2八、大数据重磅炸弹——实时计算框架 Flink

2九、Flink Checkpoint-轻量级分布式快照

30、Flink Clients 源码解析
原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng

相关文章
相关标签/搜索