数栈技术分享:详解FlinkX中的断点续传和实时采集

数栈是云原生—站式数据中台PaaS,咱们在github和gitee上有一个有趣的开源项目:FlinkX,FlinkX是一个基于Flink的批流统一的数据同步工具,既能够采集静态的数据,也能够采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。你们喜欢的话请给咱们点个star!star!star!mysql

github开源项目:https://github.com/DTStack/flinkxgit

gitee开源项目:https://gitee.com/dtstack_dev_0/flinkxgithub

 

袋鼠云云原生一站式数据中台PaaS——数栈,覆盖了建设数据中心过程当中所须要的各类工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等),完整覆盖离线计算、实时计算应用,帮助企业极大地缩短数据价值的萃取过程,提升提炼数据价值的能力。sql

目前,数栈-离线开发平台(BatchWorks) 中的数据离线同步任务、数栈-实时开发平台(StreamWorks)中的数据实时采集任务已经统一基于FlinkX来实现。数据的离线采集和实时采集基本的原理的是同样的,主要的不一样之处是源头的流是否有界,因此统一用Flink的Stream API 来实现这两种数据同步场景,实现数据同步的批流统一。数据库

1、功能介绍

一、断点续传缓存

断点续传是指数据同步任务在运行过程当中因各类缘由致使任务失败,不须要重头同步数据,只须要从上次失败的位置继续同步便可,相似于下载文件时因网络缘由失败,不须要从新下载文件,只须要继续下载就行,能够大大节省时间和计算资源。断点续传是数栈-离线开发平台(BatchWorks)里数据同步任务的一个功能,须要结合任务的出错重试机制才能完成。当任务运行失败,会在Engine里进行重试,重试的时候会接着上次失败时读取的位置继续读取数据,直到任务运行成功为止。服务器

二、实时采集网络

实时采集是数栈-实时开发平台(StreamWorks)里数据采集任务的一个功能,当数据源里的数据发生了增删改操做,同步任务监听到这些变化,将变化的数据实时同步到目标数据源。除了数据实时变化外,实时采集和离线数据同步的另外一个区别是:实时采集任务是不会中止的,任务会一直监听数据源是否有变化。这一点和Flink任务是一致的,因此实时采集任务是数栈流计算应用里的一个任务类型,配置过程和离线计算里的同步任务基本同样。oracle

2、Flink中的Checkpoint机制

断点续传和实时采集都依赖于Flink的Checkpoint机制,因此我们先来简单了解一下。分布式

Checkpoint是Flink实现容错机制最核心的功能,它可以根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据按期持久化存储下来,当Flink程序一旦意外崩溃时,从新运行程序时能够有选择地从这些Snapshot进行恢复,从而修正由于故障带来的程序数据状态中断。

 

Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一块儿流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。由于一个Operator可能存在多个输入的Stream,而每一个Stream中都会存在对应的Barrier,该Operator要等到全部的输入Stream中的Barrier都到达。

当全部Stream中的Barrier都已经到达该Operator,这时全部的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待全部Barrier到达的过程当中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,做为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去做为这次Checkpoint的结果数据。

3、断点续传

一、前提条件

同步任务要支持断点续传,对数据源有一些强制性的要求:

1)数据源(这里特指关系数据库)中必须包含一个升序的字段,好比主键或者日期类型的字段,同步过程当中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据,若是这个字段的值不是升序的,那么任务恢复时过滤的数据就是错误的,最终致使数据的缺失或重复;

2)数据源必须支持数据过滤,若是不支持的话,任务就没法从断点处恢复运行,会致使数据重复;

3)目标数据源必须支持事务,好比关系数据库,文件类型的数据源也能够经过临时文件的方式支持。

二、任务运行的详细过程

咱们用一个具体的任务详细介绍一下整个过程,任务详情以下:

1)读取数据

读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板以下:

select * from data_test 
where id mod ${channel_num}=${channel_index}
and id > ${offset}

若是是第一次运行,或者上一次任务失败时尚未触发checkpoint,那么offset就不存在,根据offset和通道能够肯定具体的查询sql:

offset存在时

第一个通道:

select * from data_test
where id mod 2=0
and id > ${offset_0};

第二个通道:

select * from data_test
where id mod 2=1
and id > ${offset_1};

offset不存在时

第一个通道:

select * from data_test
where id mod 2=0;

第二个通道:

select * from data_test
where id mod 2=1;

数据分片构造好以后,每一个通道就根据本身的数据分片去读数据了。

2)写数据

写数据前会先作几个操做:

a、检测 /data_test 目录是否存在,若是目录不存在,则建立这个目录,若是目录存在,进行2操做;
b、判断是否是以覆盖模式写数据,若是是,则删除 /data_test目录,而后再建立目录,若是不是,则进行3操做;
c、检测 /data_test/.data 目录是否存在,若是存在就先删除,再建立,确保没有其它任务因异常失败遗留的脏数据文件;

数据写入hdfs是单条写入的,不支持批量写入。数据会先写入/data_test/.data/目录下,数据文件的命名格式为:

channelIndex.jobId.fileIndex

包含通道索引,jobId,文件索引三个部分。

3)checkpoint触发时

在FlinkX中“状态”表示的是标识字段id的值,咱们假设checkpoint触发时两个通道的读取和写入状况如图中所示:

checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成以后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0中止写出数据到HDFS,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达以后再将Buffer里的数据所有写出,而后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:

Reader_0:id=12

Reader_1:id=11

Writer_0:id=没法肯定

Writer_1:id=没法肯定

任务状态会记录到配置的HDFS目录/flinkx/checkpoint/abc123下。由于每一个Writer会接收两个Reader的数据,以及各个通道的数据读写速率可能不同,因此致使writer接收到的数据顺序是不肯定的,可是这不影响数据的准确性,由于读取数据时只须要Reader记录的状态就能够构造查询sql,咱们只要确保这些数据真的写到HDFS就好了。在Writer生成Snapshot以前,会作一系列操做保证接收到的数据所有写入HDFS:

a、close写入HDFS文件的数据流,这时候会在/data_test/.data目录下生成两个两个文件:

/data_test/.data/0.abc123.0

/data_test/.data/1.abc123.0

b、将生成的两个数据文件移动到/data_test目录下;

c、更新文件名称模板更新为:channelIndex.abc123.1;

快照生成后任务继续读写数据,若是生成快照的过程当中有任何异常,任务会直接失败,这样此次快照就不会生成,任务恢复时会从上一个成功的快照恢复。

4)任务正常结束

任务正常结束时也会作和生成快照时一样的操做,close文件流,移动临时数据文件等。

5)任务异常终止

任务若是异常结束,假设任务结束时最后一个checkpoint记录的状态为:

Reader_0:id=12Reader_1:id=11

那么任务恢复的时候就会把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:

第一个通道:

select * from data_test
where id mod 2=0
and id > 12;

第二个通道:

select * from data_test
where id mod 2=1
and id > 11;

这样就能够从上一次失败的位置继续读取数据了。

三、支持断点续传的插件

理论上只要支持过滤数据的数据源,和支持事务的数据源均可以支持断点续传的功能,目前FlinkX支持的插件以下:

4、实时采集

目前FlinkX支持实时采集的插件有KafKa、binlog插件,binlog插件是专门针对mysql数据库作实时采集的,若是要支持其它的数据源,只须要把数据打到Kafka,而后再用FlinkX的Kafka插件消费数据便可,好比oracle,只须要使用oracle的ogg将数据打到Kafka。这里咱们专门讲解一下mysql的实时采集插件binlog。

一、binlog

binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是彻底不一样的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。

binlog的做用主要有:

1)复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的;

2)数据恢复:经过mysqlbinlog工具恢复数据;

3)增量备份。

二、MySQL 主备复制

有了记录数据变化的binlog日志还不够,咱们还须要借助MySQL的主备复制功能:主备复制是指 一台服务器充当主数据库服务器,另外一台或多台服务器充当从数据库服务器,主服务器中的数据自动复制到从服务器之中。

主备复制的过程:

1)MySQL master 将数据变动写入二进制日志( binary log, 其中记录叫作二进制日志事件binary log events,能够经过 show binlog events 进行查看);


2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);

3)MySQL slave 重放 relay log 中事件,将数据变动反映它本身的数据。

三、写入Hive

binlog插件能够监听多张表的数据变动状况,解析出的数据中包含表名称信息,读取到的数据能够所有写入目标数据库的一张表,也能够根据数据中包含的表名信息写入不一样的表,目前只有Hive插件支持这个功能。Hive插件目前只有写入插件,功能基于HDFS的写入插件实现,也就是说从binlog读取,写入hive也支持失败恢复的功能。

写入Hive的过程:

1)从数据中解析出MySQL的表名,而后根据表名映射规则转换成对应的Hive表名;

2)检查Hive表是否存在,若是不存在就建立Hive表;

3)查询Hive表的相关信息,构造HdfsOutputFormat;

4)调用HdfsOutputFormat将数据写入HDFS。

相关文章
相关标签/搜索