20180705关于mysql binlog的解析方式

来自:https://blog.csdn.net/u012985132/article/details/74964366/mysql

关系型数据库和Hadoop生态的沟通愈来愈密集,时效要求也愈来愈高。本篇就来调研下实时抓取MySQL更新数据到HDFS。正则表达式

本篇仅做为调研报告。spring

初步调研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。这几个工具抓取MySQL的方式都是经过扫描binlog,模拟MySQL master和slave(Mysql Replication架构–解决了:数据多点备份,提升数据可用性;读写分流,提升集群的并发能力。(并不是是负载均衡);让一些非实时的数据操做,转移到slaves上进行。)之间的协议来实现实时更新的sql


先科普下Canal数据库

Canal简介

原理

Canal原理图

Canal原理图json


原理相对比较简单:bootstrap

 

  1. canal模拟mysql slave的交互协议,假装本身为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

架构

Canal架构图

Canal架构图api

 

组件说明:缓存

  1. server表明一个canal运行实例,对应于一个jvm
  2. instance对应于一个数据队列(1个server对应1..n个instance)

而instance模块又由eventParser(数据源接入,模拟slave协议和master进行交互,协议解析)、eventSink(Parser和Store链接器,进行数据过滤,加工,分发的工做)、eventStore(数据存储)和metaManager(增量订阅&消费信息管理器)组成。服务器

  • EventParser在向mysql发送dump命令以前会先从Log Position中获取上次解析成功的位置(若是是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操做,直到存储成功),传送成功以后更新Log Position。流程图以下:
    EventParser流程图

    EventParser流程图

     

  • EventSink起到一个相似channel的功能,能够对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是链接EventParser和EventStore的桥梁。

  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。

  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:

Message getWithoutAck(int batchSize),容许指定batchSize,一次能够获取多条,每次返回的对象为Message,包含的内容为:batch id[惟一标识]和entries[具体的数据对象]
void rollback(long batchId),顾命思议,回滚上次的get请求,从新获取数据。基于get获取的batchId进行提交,避免误操做
void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操做

增量订阅和消费之间的协议交互以下:
增量订阅和消费协议

增量订阅和消费协议

 

canal的get/ack/rollback协议和常规的jms协议有所不一样,容许get/ack异步处理,好比能够连续调用get屡次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式api设计的好处:

  • get/ack异步化,减小因ack带来的网络延迟和操做成本 (99%的状态都是处于正常状态,异常的rollback属于个别状况,不必为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者须要多进程/多线程消费时,能够不停的轮询get数据,不停的日后发送任务,提升并行化. (在实际业务中的一个case:业务数据消费须要跨中美网络,因此一次操做基本在200ms以上,为了减小延迟,因此须要实施并行化)

流式api设计示意图以下:
流式api

流式api

 

  • 每次get操做都会在meta中产生一个mark,mark标记会递增,保证运行过程当中mark的惟一性
  • 每次的get操做,都会在上一次的mark操做记录的cursor继续日后取,若是mark不存在,则在last ack cursor继续日后取
  • 进行ack时,须要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
  • 一旦出现异常状况,客户端可发起rollback状况,从新置位:删除全部的mark, 清理get请求位置,下次请求会从last ack cursor继续日后取

这个流式api是否是相似hdfs write在pipeline中传输packet的形式,先将packet放入dataQueue,而后向下游传输,此时将packet放入ackQueue等到下游返回的ack,这也是异步的。

HA机制

canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA相似。

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

  • canal server: 为了减小对mysql dump的请求,不一样server上的instance(不一样server上的相同instance)要求同一时间只能有一个处于running,其余的处于standby状态(standby是instance的状态)。
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操做,不然客户端接收没法保证有序。

server ha的架构图以下:
ha

ha


大体步骤:

 

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断(实现:建立EPHEMERAL节点,谁建立成功就容许谁启动)
  2. 建立zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有建立成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A建立的instance节点消失后,当即通知其余的canal server再次进行步骤1的操做,从新选出一个canal server启动instance。
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,而后和其创建连接,一旦连接不可用,会从新尝试connect。

Canal Client的方式和canal server方式相似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

Canal部署及使用

MySQL配置

canal同步数据须要扫描MySQL的binlog日志,而binlog默认是关闭的,须要开启,而且为了保证同步数据的一致性,使用的日志格式为row-based replication(RBR),在my.conf中开启binlog,

 
    
1
2
3
4
 
    
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW  #选择row模式
server_id=1  #配置mysql replaction须要定义,不能和canal的slaveId重复

更改my.conf以后,须要重启MySQL,重启的方式有不少找到合适本身的就行。

Canal配置

由上面的介绍得知Canal由ServerInstance组成,而Server中又能够包含不少个Instance,一个Instance对应一个数据库实例,则Canal将配置分为两类,一类是server的配置,名字为canal.properties,另外一类是instance的配置,名字为instance.properties,通常会在conf目录下新建一个instance同名的目录,将其放入此目录中。

先介绍canal.properties中的几个关键属性

参数名字 参数说明 默认值
canal.destinations 当前server上部署的instance列表
canal.conf.dir conf/目录所在的路径 ../conf
canal.instance.global.spring.xml 全局的spring配置方式的组件文件 classpath:spring/file-instance.xml 
(spring目录相对于canal.conf.dir)
canal.zkServers canal server连接zookeeper集群的连接信息
canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
canal.file.data.dir canal持久化数据到file上的目录 ../conf (默认和instance.properties为同一目录,方便运维和备份)
canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
canal.instance.memory.batch.mode canal内存store中数据缓存模式 
1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 
2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小
MEMSIZE
canal.instance.memory.buffer.size canal内存store中可缓存buffer记录数,须要为2的指数 16384
canal.instance.memory.buffer.memunit 内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小 1024

下面看下instance.properties,这里的属性较少:

参数名字 参数说明 默认值
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,须要保证和当前mysql集群中id惟一 1234
canal.instance.master.address mysql主库连接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主库连接时起始的binlog文件
canal.instance.master.position mysql主库连接时起始的binlog偏移量
canal.instance.master.timestamp mysql主库连接时起始的binlog的时间戳
canal.instance.dbUsername mysql数据库账号 canal
canal.instance.dbPassword mysql数据库密码 canal
canal.instance.defaultDatabaseName mysql连接时默认schema
canal.instance.connectionCharset mysql 数据解析编码 UTF-8
canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式. 
多个正则之间以逗号(,)分隔,转义符须要双斜杠
.*\\..*

除了上面两个配置文件,conf目录下还有一个目录须要强调下,那就是spring目录,里面存放的是instance.xml配置文件,目前默认支持的instance.xml有memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml。这里主要维护的增量订阅和消费的关系信息(解析位点和消费位点)。

对应的两个位点组件,目前都有几种实现:

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

分别介绍下这几种配置的功能

  • memory-instance.xml:

全部的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

特色:速度最快,依赖最少(不须要zookeeper)

场景:通常应用在quickstart,或者是出现问题后,进行数据分析的场景,不该该将其应用于生产环境

  • file-instance.xml:

全部的组件(parser , sink , store)都选择了基于file持久化模式(组件内容持久化的file存在哪里???),注意,不支持HA机制.

特色:支持单机持久化

场景:生产环境,无HA需求,简单可用.

  • default-instance.xml:

全部的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.(全部组件持久化的内容只有位置信息吧???)

特色:支持HA

场景:生产环境,集群化部署.

  • group-instance.xml:

主要针对须要进行多库合并时,能够将多个物理instance合并为一个逻辑instance,提供客户端访问。

场景:分库业务。 好比产品数据拆分了4个库,每一个库会有一个instance,若是不用group,业务上要消费数据时,须要启动4个客户端,分别连接4个instance实例。使用group后,能够在canal server上合并为一个逻辑instance,只须要启动1个客户端,连接这个逻辑instance便可.

canal example 部署

  • 在须要同步的MySQL数据库中建立一个用户,用来replica数据,这里新建的用户名和密码都为canal,命令以下:
 
    
1
2
3
4
 
    
CREATE USER canal IDENTIFIED BY  'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO  'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO  'canal'@'%' ;
FLUSH PRIVILEGES;
  • Mysql建立canal用户并为其赋所需权限以后,须要对Canal的配置文件(canal.properties和instance.properties)进行设置。

canal.properties和instance.properties里采用默认配置便可(这里只是运行个样例,生产中能够参考具体的参数属性进行设置),

  • Canal配置好以后,启动Canal client(client的做用是将Canal里的解析的binlog日志固化到存储介质中)。

client组件Canal自己是不提供的,须要根据api进行开发,这里将官方提供的client代码打包成jar进行消费Canal信息。

canal HA配置

canal的HA机制是依赖zk来实现的,须要更改canal.properties文件,修改内容以下:

 
    
1
2
3
4
 
    
# zk集群地址
canal.zkServers=10.20.144.51:2181
# 选择记录方式
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

更改两台canal机器上instance实例的配置instance.properties,修改内容以下:

 
    
1
2
 
    
canal.instance.mysql.slaveId = 1234  ##另一台机器改为1235,保证slaveId不重复便可
canal.instance.master.address = 10.20.144.15:3306

配置好以后启动canal进程,在两台服务器上执行sh bin/startup.sh

client进行消费时,能够直接指定zookeeper地址和instance name,也可让canal client会自动从zookeeper中的running节点,获取当前服务的工做节点,而后与其创建连接。

maxwell简介

maxwell实时抓取mysql数据的原理也是基于binlog,和canal相比,maxwell更像是canal server + 实时client。(数据抽取 + 数据转换)

maxwell集成了kafka producer,直接从binlog获取数据更新并写入kafka,而canal则须要本身开发实时client将canal读取的binlog内容写入kafka中。

maxwell特点:

  • 支持bootstrap启动,同步历史数据
  • 集成kafka,直接将数据落地到kafka
  • 已将binlog中的DML和DDL进行了模式匹配,将其解码为有schema的json(有利于后期将其重组为nosql支持的语言)
    {“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

缺点:

  • 一个MySQL实例须要对应一个maxwell进程
  • bootstrap的方案使用的是select *

maxwell的配置文件只有一个config.properties,在home目录。其中除了须要配置mysql master的地址、kafka地址还须要配置一个用于存放maxwell相关信息的mysql地址,maxwell会把读取binlog关系的信息,如binlog name、position。

工具对比

以上是Canal的原理及部署,其他相似maxwell和mysql_streamer对mysql进行实时数据抓取的原理同样就再也不进行一一介绍,这里只对他们进行下对比:

特点 Canal Maxwell mysql_streamer
语言 Java Java Python
活跃度 活跃 活跃 不活跃
HA 支持 定制 支持
数据落地 定制 落地到kafka 落地到kafka
分区 支持 不支持 不支持
bootstrap 不支持 支持 支持
数据格式 格式自由 json(格式固定) json(格式固定)
文档 较详细 较详细 略粗
随机读 支持 支持 支持

以上只是将mysql里的实时变化数据的binlog以同种形式同步到kafka,但要实时更新到hadoop还须要使用一个实时数据库来存储数据,并自定制开发将kafka中数据解析为nosql数据库能够识别的DML进行实时更新Nosql数据库,使其与MySQL里的数据实时同步。

基础架构

架构图以下:
基础架构图

基础架构图

 

虚线框是可选的方案

方案对比

  1. 方案1使用阿里开源的Canal进行Mysql binlog数据的抽取,另需开发一个数据转换工具将从binlog中解析出的数据转换成自带schema的json数据并写入kafka中。而方案2使用maxwell可直接完成对mysql binlog数据的抽取和转换成自带schema的json数据写入到kafka中。
  2. 方案1中不支持表中已存在的历史数据进行同步,此功能须要开发(若是使用sqoop进行历史数据同步,不够灵活,会使结果表与原始表结构相同,有区别于数据交换平台所需的schema)。方案2提供同步历史数据的解决方案。
  3. 方案1支持HA部署,而方案2不支持HA

方案1和方案2的区别只在于kafka以前,当数据缓存到kafka以后,须要一个定制的数据路由组件来将自带schema的数据解析到目标存储中。
数据路由组件主要负责将kafka中的数据实时读出,写入到目标存储中。(如将全部日志数据保存到HDFS中,也能够将数据落地到全部支持jdbc的数据库,落地到HBase,Elasticsearch等。)

综上,
方案1须要开发的功能有:

  • bootstrap功能
  • 实时数据转换工具
  • 数据路由工具

方案2须要开发的功能有:

  • 数据路由工具
  • HA模块(初期可暂不支持HA,因此开发紧急度不高)

数据路由工具是两个方案都须要开发的,则我比较偏向于第二种方案,由于在初期试水阶段能够短时间出成果,能够较快的验证想法,并在尝试中可以较快的发现问题,好及时的调整方案。即便方案2中maxwell最终不能知足需求,而使用canal的话,咱们也可能将实时数据转换工具的数据输出模式与maxwell一致,这样初始投入人力开发的数据路由工具依然能够继续使用,而不须要从新开发。

把增量的Log做为一切系统的基础。后续的数据使用方,经过订阅kafka来消费log。

好比:
大数据的使用方能够将数据保存到Hive表或者Parquet文件给Hive或Spark查询;
提供搜索服务的使用方能够保存到Elasticsearch或HBase 中;
提供缓存服务的使用方能够将日志缓存到Redis或alluxio中;
数据同步的使用方能够将数据保存到本身的数据库中;
因为kafka的日志是能够重复消费的,而且缓存一段时间,各个使用方能够经过消费kafka的日志来达到既能保持与数据库的一致性,也能保证明时性;

{“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

{“database”:”test”,”table”:”e”,”type”:”insert”,”ts”:1488857922,”xid”:8932,”commit”:true,”data”:{“id”:2,”m”:4.2,”torvalds”:null}}

相关文章
相关标签/搜索