来自:https://blog.csdn.net/u012985132/article/details/74964366/mysql
关系型数据库和Hadoop生态的沟通愈来愈密集,时效要求也愈来愈高。本篇就来调研下实时抓取MySQL更新数据到HDFS。正则表达式
本篇仅做为调研报告。spring
初步调研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。这几个工具抓取MySQL的方式都是经过扫描binlog,模拟MySQL master和slave(Mysql Replication架构–解决了:数据多点备份,提升数据可用性;读写分流,提升集群的并发能力。(并不是是负载均衡);让一些非实时的数据操做,转移到slaves上进行。)之间的协议来实现实时更新的。sql
先科普下Canal数据库
Canal原理图json
原理相对比较简单:bootstrap
Canal架构图api
组件说明:缓存
而instance模块又由eventParser(数据源接入,模拟slave协议和master进行交互,协议解析)、eventSink(Parser和Store链接器,进行数据过滤,加工,分发的工做)、eventStore(数据存储)和metaManager(增量订阅&消费信息管理器)组成。服务器
EventParser在向mysql发送dump命令以前会先从Log Position中获取上次解析成功的位置(若是是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操做,直到存储成功),传送成功以后更新Log Position。流程图以下:
EventSink起到一个相似channel的功能,能够对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是链接EventParser和EventStore的桥梁。
EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
Message getWithoutAck(int batchSize),容许指定batchSize,一次能够获取多条,每次返回的对象为Message,包含的内容为:batch id[惟一标识]和entries[具体的数据对象]
void rollback(long batchId),顾命思议,回滚上次的get请求,从新获取数据。基于get获取的batchId进行提交,避免误操做
void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操做
canal的get/ack/rollback协议和常规的jms协议有所不一样,容许get/ack异步处理,好比能够连续调用get屡次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:
这个流式api是否是相似hdfs write在pipeline中传输packet的形式,先将packet放入dataQueue,而后向下游传输,此时将packet放入ackQueue等到下游返回的ack,这也是异步的。
canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA相似。
canal的ha分为两部分,canal server和canal client分别有对应的ha实现
大体步骤:
Canal Client的方式和canal server方式相似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.
canal同步数据须要扫描MySQL的binlog日志,而binlog默认是关闭的,须要开启,而且为了保证同步数据的一致性,使用的日志格式为row-based replication(RBR),在my.conf
中开启binlog,
1
2
3
4
|
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW
#选择row模式
server_id=1
#配置mysql replaction须要定义,不能和canal的slaveId重复
|
更改my.conf以后,须要重启MySQL,重启的方式有不少找到合适本身的就行。
由上面的介绍得知Canal由Server
和Instance
组成,而Server中又能够包含不少个Instance,一个Instance对应一个数据库实例,则Canal将配置分为两类,一类是server的配置,名字为canal.properties
,另外一类是instance的配置,名字为instance.properties
,通常会在conf目录下新建一个instance同名的目录,将其放入此目录中。
先介绍canal.properties中的几个关键属性
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.destinations | 当前server上部署的instance列表 | 无 |
canal.conf.dir | conf/目录所在的路径 | ../conf |
canal.instance.global.spring.xml | 全局的spring配置方式的组件文件 | classpath:spring/file-instance.xml (spring目录相对于canal.conf.dir) |
canal.zkServers | canal server连接zookeeper集群的连接信息 | 无 |
canal.zookeeper.flush.period | canal持久化数据到zookeeper上的更新频率,单位毫秒 | 1000 |
canal.file.data.dir | canal持久化数据到file上的目录 | ../conf (默认和instance.properties为同一目录,方便运维和备份) |
canal.file.flush.period | canal持久化数据到file上的更新频率,单位毫秒 | 1000 |
canal.instance.memory.batch.mode | canal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小 |
MEMSIZE |
canal.instance.memory.buffer.size | canal内存store中可缓存buffer记录数,须要为2的指数 | 16384 |
canal.instance.memory.buffer.memunit | 内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小 | 1024 |
下面看下instance.properties,这里的属性较少:
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.instance.mysql.slaveId | mysql集群配置中的serverId概念,须要保证和当前mysql集群中id惟一 | 1234 |
canal.instance.master.address | mysql主库连接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | mysql主库连接时起始的binlog文件 | 无 |
canal.instance.master.position | mysql主库连接时起始的binlog偏移量 | 无 |
canal.instance.master.timestamp | mysql主库连接时起始的binlog的时间戳 | 无 |
canal.instance.dbUsername | mysql数据库账号 | canal |
canal.instance.dbPassword | mysql数据库密码 | canal |
canal.instance.defaultDatabaseName | mysql连接时默认schema | 无 |
canal.instance.connectionCharset | mysql 数据解析编码 | UTF-8 |
canal.instance.filter.regex | mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符须要双斜杠 |
.*\\..* |
除了上面两个配置文件,conf目录下还有一个目录须要强调下,那就是spring目录,里面存放的是instance.xml配置文件,目前默认支持的instance.xml有memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml。这里主要维护的增量订阅和消费的关系信息(解析位点和消费位点)。
对应的两个位点组件,目前都有几种实现:
分别介绍下这几种配置的功能
全部的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特色:速度最快,依赖最少(不须要zookeeper)
场景:通常应用在quickstart,或者是出现问题后,进行数据分析的场景,不该该将其应用于生产环境
全部的组件(parser , sink , store)都选择了基于file持久化模式(组件内容持久化的file存在哪里???),注意,不支持HA机制.
特色:支持单机持久化
场景:生产环境,无HA需求,简单可用.
全部的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.(全部组件持久化的内容只有位置信息吧???)
特色:支持HA
场景:生产环境,集群化部署.
主要针对须要进行多库合并时,能够将多个物理instance合并为一个逻辑instance,提供客户端访问。
场景:分库业务。 好比产品数据拆分了4个库,每一个库会有一个instance,若是不用group,业务上要消费数据时,须要启动4个客户端,分别连接4个instance实例。使用group后,能够在canal server上合并为一个逻辑instance,只须要启动1个客户端,连接这个逻辑instance便可.
canal
,命令以下:
1
2
3
4
|
CREATE USER canal IDENTIFIED BY
'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO
'canal'@'%' ;
FLUSH PRIVILEGES;
|
canal
用户并为其赋所需权限以后,须要对Canal的配置文件(canal.properties和instance.properties)进行设置。canal.properties和instance.properties里采用默认配置便可(这里只是运行个样例,生产中能够参考具体的参数属性进行设置),
client组件Canal自己是不提供的,须要根据api进行开发,这里将官方提供的client代码打包成jar进行消费Canal信息。
canal的HA机制是依赖zk来实现的,须要更改canal.properties文件,修改内容以下:
1
2
3
4
|
# zk集群地址
canal.zkServers=10.20.144.51:2181
# 选择记录方式
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
|
更改两台canal机器上instance实例的配置instance.properties,修改内容以下:
1
2
|
canal.instance.mysql.slaveId = 1234
##另一台机器改为1235,保证slaveId不重复便可
canal.instance.master.address = 10.20.144.15:3306
|
配置好以后启动canal进程,在两台服务器上执行sh bin/startup.sh
client进行消费时,能够直接指定zookeeper地址和instance name,也可让canal client会自动从zookeeper中的running节点,获取当前服务的工做节点,而后与其创建连接。
maxwell实时抓取mysql数据的原理也是基于binlog,和canal相比,maxwell更像是canal server + 实时client
。(数据抽取 + 数据转换)
maxwell集成了kafka producer,直接从binlog获取数据更新并写入kafka,而canal则须要本身开发实时client将canal读取的binlog内容写入kafka中。
maxwell特点:
缺点:
select *
maxwell的配置文件只有一个config.properties,在home目录。其中除了须要配置mysql master的地址、kafka地址还须要配置一个用于存放maxwell相关信息的mysql地址,maxwell会把读取binlog关系的信息,如binlog name、position。
以上是Canal的原理及部署,其他相似maxwell和mysql_streamer对mysql进行实时数据抓取的原理同样就再也不进行一一介绍,这里只对他们进行下对比:
特点 | Canal | Maxwell | mysql_streamer |
---|---|---|---|
语言 | Java | Java | Python |
活跃度 | 活跃 | 活跃 | 不活跃 |
HA | 支持 | 定制 | 支持 |
数据落地 | 定制 | 落地到kafka | 落地到kafka |
分区 | 支持 | 不支持 | 不支持 |
bootstrap | 不支持 | 支持 | 支持 |
数据格式 | 格式自由 | json(格式固定) | json(格式固定) |
文档 | 较详细 | 较详细 | 略粗 |
随机读 | 支持 | 支持 | 支持 |
以上只是将mysql里的实时变化数据的binlog以同种形式同步到kafka,但要实时更新到hadoop还须要使用一个实时数据库来存储数据,并自定制开发将kafka中数据解析为nosql数据库能够识别的DML进行实时更新Nosql数据库,使其与MySQL里的数据实时同步。
虚线框是可选的方案
方案1和方案2的区别只在于kafka以前,当数据缓存到kafka以后,须要一个定制的数据路由组件来将自带schema的数据解析到目标存储中。
数据路由组件主要负责将kafka中的数据实时读出,写入到目标存储中。(如将全部日志数据保存到HDFS中,也能够将数据落地到全部支持jdbc的数据库,落地到HBase,Elasticsearch等。)
综上,
方案1须要开发的功能有:
方案2须要开发的功能有:
数据路由工具是两个方案都须要开发的,则我比较偏向于第二种方案,由于在初期试水阶段能够短时间出成果,能够较快的验证想法,并在尝试中可以较快的发现问题,好及时的调整方案。即便方案2中maxwell最终不能知足需求,而使用canal的话,咱们也可能将实时数据转换工具的数据输出模式与maxwell一致,这样初始投入人力开发的数据路由工具依然能够继续使用,而不须要从新开发。
把增量的Log做为一切系统的基础。后续的数据使用方,经过订阅kafka来消费log。
好比:
大数据的使用方能够将数据保存到Hive表或者Parquet文件给Hive或Spark查询;
提供搜索服务的使用方能够保存到Elasticsearch或HBase 中;
提供缓存服务的使用方能够将日志缓存到Redis或alluxio中;
数据同步的使用方能够将数据保存到本身的数据库中;
因为kafka的日志是能够重复消费的,而且缓存一段时间,各个使用方能够经过消费kafka的日志来达到既能保持与数据库的一致性,也能保证明时性;
{“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}
{“database”:”test”,”table”:”e”,”type”:”insert”,”ts”:1488857922,”xid”:8932,”commit”:true,”data”:{“id”:2,”m”:4.2,”torvalds”:null}}