中间件——canal小记

接到个小需求,将mysql的部分数据增量同步到es,可是不只仅是使用canal而已,总体的流程是mysql>>canal>>flume>>kafka>>es,说难倒也不难,只是作起来碰到的坑实在太多,特别是中间套了那么多中间件,出了故障找起来真的特别麻烦。html

先来了解一下MySQL的主从备份:mysql

从上层来看,复制分红三步:
master将改变记录到二进制日志(binary log)中(这些记录叫作二进制日志事件,binary log events,能够经过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重作中继日志中的事件,将改变反映它本身的数据。spring

问题一:测试环境一切正常,可是正式环境中,这几个字段全为0,不知道为何

最后发现是沟通问题。。。sql

排查过程:数据库

  1. 起初,怀疑是es的问题,会不会是string转为long中出现了问题,PUT了个,无异常,这种状况排除。
  2. 再而后觉得是代码有问题,但是想了下,rowData.getAfterColumnsList().forEach(column -> data.put(column.getName(), column.getValue()))这句不可能有什么其余的问题啊,并且测试环境中一切都是好好的。
  3. canal安装出错,从新查看了一次canal.properties和instance.properties,并无发现配置错了啥,若是错了,那为何只有那几个字段出现异常,其余的都是好好的,郁闷。并且,用测试环境的canal配置生产中的数据库,而后本地调试,结果依旧同样。可能问题出在mysql。

最后发现,竟然是沟通问题。。。。测试环境中是从正式环境导入的,用的insert,但是在正式环境里,用的确实insert后update字段,以后发现竟然还用delete,,,,晕。。。。以前明确问过了只更新insert的,人与人之间的信任在哪里。。。。less

问题二:canal.properties中四种模式的差异

简单的说,canal维护一份增量订阅和消费关系是依靠解析位点和消费位点的,目前提供了一下四种配置,一开始我也是懵的。分布式

#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

local-instance
我也不知道啥。。ide

memory-instance
全部的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特色:速度最快,依赖最少(不须要zookeeper)
场景:通常应用在quickstart,或者是出现问题后,进行数据分析的场景,不该该将其应用于生产环境。
我的建议是调试的时候使用该模式,即新增数据的时候,客户端能立刻捕获到改日志,可是因为位点一直都是canal启动的时候最新的,不适用与生产环境。post

file-instance
全部的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.
特色:支持单机持久化
场景:生产环境,无HA需求,简单可用.
采用该模式的时候,若是关闭了canal,会在destination中生成一个meta.dat,用来记录关键信息。若是想要启动canal以后立刻订阅最新的位点,须要把该文件删掉。
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.6.71","port":3306}},"postion":{"included":false,"journalName":"binlog.008335","position":221691106,"serverId":88888,"timestamp":1524294834000}}}],"destination":"example"}测试

default-instance
全部的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享。
特色:支持HA
场景:生产环境,集群化部署.
该模式会记录集群中全部运行的节点,主要用与HA主备模式,节点中的数据以下,能够关闭某一个canal服务来查看running的变化信息。

问题三:若是要订阅的是mysql的从库改怎么作?

生产环境中的主库是不能随便重启的,因此订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操做写进中继日志,并写到本身的二进制日志的,因此须要让其成为canal的主库,必须让其将日志也写到本身的二进制日志里面。处理方法:修改/etc/my.cnf,增长一行log_slave_updates=1,重启数据库后就能够了。

问题四:部分字段没有更新

最终版本是以mysql的id为es的主键,用canal同步到flume,再由flume到kafka,而后再由一个中间件写到es里面去,结果发现,一天之中,会有那么一段时间得出的结果少一丢丢,甚至是骤降,如图。不得不从头开始排查状况,canal到flume,加了canal的重试,以及发送到flume的重试机制,没有报错,全部数据正常发送。flume到kafka不敢怀疑,毕竟公司一直在用,怎么可能有问题。kafka到es的中间件?组长写的,并且一直在用,不可能==最后确认的是flume到kafka,kafka的parition处理速度不一样,

check一下flume的文档,能够知道

Property Name Description
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.

大概意思是flume若是不自定义partitionIdHeader,那么消息将会被分布式kafka的partion处理,kafka自己的设置就是高吞吐量的消息系统,同一partion的消息是能够按照顺序发送的,可是多个partion就不肯定了,若是须要将消息按照顺序发送,那么就必需要指定一个parition,即在flume的配置文件中添加:a1.channels.channel1.partitionIdHeader=1,指定parition便可。所有修改完以后,在kibana查看一下曲线:

用sql在数据库确认了下,终于一致了,不容易。。。

相关文章
相关标签/搜索