ErosaConnection java
| mysql
|-------------------------------------- sql
| | 数据库
MysqlConnection LocalBinLogConnection 服务器
ErosaConnection是一个链接的接口,定义了一些通用的方法。目前它有两个实现类,MysqlConnection是与MySQL服务器链接的实现类,LocalBinLogConnection是与本地的binlog文件进行链接的实现类。从类中能够看出,目前canal还不支持oracle的实现。 session
package com.alibaba.otter.canal.parse.inbound; import java.io.IOException; /** * 通用的Erosa的连接接口, 用于通常化处理mysql/oracle的解析过程 * * @author: yuanzu Date: 12-9-20 Time: 下午2:47 */ public interface ErosaConnection { /** * 创建链接 * @throws IOException */ public void connect() throws IOException; /** * 从新创建链接,会断开已有链接 * @throws IOException */ public void reconnect() throws IOException; /** * 断开链接。 * @throws IOException */ public void disconnect() throws IOException; /** * 是否创建链接。 * @return */ public boolean isConnected(); /** * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据 * @param binlogfilename biglog文件名 * @param binlogPosition binlog起始位置。 * @param func 事件解析处理器。 */ public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException; /** * 获取binlog事件,若是没有数据会阻塞,等待数据的到达。 * @param binlogfilename biglog文件名 * @param binlogPosition binlog起始位置。 * @param func 事件解析处理器。 */ public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException; /** * 获取binlog事件,若是没有数据会阻塞,等待数据的到达。 * @param timestamp 起始时间,只同步该时间以后的产生的新事件。 * @param func 事件解析处理器。 */ public void dump(long timestamp, SinkFunction func) throws IOException; /** * 产生一个新的链接。 */ ErosaConnection fork(); }
其中实现类MysqlConnection是咱们常常会使用到的一个类,先看看这个类是如何实现的。 oracle
private MysqlConnector connector; private long slaveId; private Charset charset = Charset.forName("UTF-8"); private BinlogFormat binlogFormat; private BinlogImage binlogImage; public MysqlConnection(){ } public MysqlConnection(InetSocketAddress address, String username, String password){ connector = new MysqlConnector(address, username, password); } public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber, String defaultSchema){ connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema); }
从代码能够看出,它大部分依赖一个MysqlConnector组件来实现与MySQL的链接。咱们稍后看看该代码的实现。 函数
构造函数须要的是MySql服务器的地址,用户名和密码,该用户必须具有了replication slave权限才能够。slaveId是当前解析器的slaveId,它不能与其它的slaveId冲突。 fetch
public void connect() throws IOException { connector.connect(); } public void reconnect() throws IOException { connector.reconnect(); } public void disconnect() throws IOException { connector.disconnect(); } public boolean isConnected() { return connector.isConnected(); }
基本上都是调用了MysqlConnection的方法实现的,还须要进入该类查看实现。 编码
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendBinlogDump(binlogfilename, binlogPosition); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } } }
/** * the settings that will need to be checked or set:<br> * <ol> * <li>wait_timeout</li> * <li>net_write_timeout</li> * <li>net_read_timeout</li> * </ol> * * @param channel * @throws IOException */ private void updateSettings() throws IOException { try { update("set wait_timeout=9999999"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { update("set net_write_timeout=1800"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { update("set net_read_timeout=1800"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // 设置服务端返回结果时不作编码转化,直接按照数据库的二进制编码进行发送,由客户端本身根据需求进行编码转化 update("set names 'binary'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // mysql5.6针对checksum支持须要设置session变量 // 若是不设置会出现错误: Slave can not handle replication events with the // checksum that master is configured to log // 但也不能乱设置,须要和mysql server的checksum配置一致,否则RotateLogEvent会出现乱码 update("set @master_binlog_checksum= '@@global.binlog_checksum'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // mariadb针对特殊的类型,须要设置session变量 update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } }
dump方法的实现流程是这样的。
1. 更新MySQL配置信息。调用方法updateSettings();主要包括设置超时时间、设置数据库直接发送二进制数据,设置master_binlog_checksum和mariadb_slave_capability等变量值。
2.发送binlogdump命令。发送COM_BINLOG_DUMP命令,携带binlogFileName、binlogPosition和slaveServerId等关键信息。
3.构建一个binlog获取器组件DirectLogFetcher。使用它得到binlog数据。
4.循环从DirectLogFetcher获取内容,将获取到的数据转化为event。
5.调用SinkFunction处理获取到的event,若处理失败则会中断循环,不然继续。
接下来要看懂他们就须要了解MySQL的binlog协议及数据格式定义了。