canal源码分析系列——ErosaConnection分析

类结构

ErosaConnection java

| mysql

|-------------------------------------- sql

|                                                    | 数据库

MysqlConnection                            LocalBinLogConnection 服务器

ErosaConnection是一个链接的接口,定义了一些通用的方法。目前它有两个实现类,MysqlConnection是与MySQL服务器链接的实现类,LocalBinLogConnection是与本地的binlog文件进行链接的实现类。从类中能够看出,目前canal还不支持oracle的实现。 session

接口定义

package com.alibaba.otter.canal.parse.inbound;

import java.io.IOException;

/**
 * 通用的Erosa的连接接口, 用于通常化处理mysql/oracle的解析过程
 * 
 * @author: yuanzu Date: 12-9-20 Time: 下午2:47
 */
public interface ErosaConnection {

	/**
	 * 创建链接
	 * @throws IOException
	 */
    public void connect() throws IOException;

    /**
     * 从新创建链接,会断开已有链接
     * @throws IOException
     */
    public void reconnect() throws IOException;

    /**
     * 断开链接。
     * @throws IOException
     */
    public void disconnect() throws IOException;

    /**
     * 是否创建链接。
     * @return
     */
    public boolean isConnected();

    /**
     * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析处理器。
     */
    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 获取binlog事件,若是没有数据会阻塞,等待数据的到达。
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析处理器。
     */
    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 获取binlog事件,若是没有数据会阻塞,等待数据的到达。
     * @param timestamp 起始时间,只同步该时间以后的产生的新事件。
     * @param func 事件解析处理器。
     */
    public void dump(long timestamp, SinkFunction func) throws IOException;

    /**
     * 产生一个新的链接。
     */
    ErosaConnection fork();
}

其中实现类MysqlConnection是咱们常常会使用到的一个类,先看看这个类是如何实现的。 oracle

MysqlConnection

属性和构造函数

private MysqlConnector      connector;
    private long                slaveId;
    private Charset             charset = Charset.forName("UTF-8");
    private BinlogFormat        binlogFormat;
    private BinlogImage         binlogImage;

    public MysqlConnection(){
    }

    public MysqlConnection(InetSocketAddress address, String username, String password){

        connector = new MysqlConnector(address, username, password);
    }

    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
                           String defaultSchema){
        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
    }

从代码能够看出,它大部分依赖一个MysqlConnector组件来实现与MySQL的链接。咱们稍后看看该代码的实现。 函数

构造函数须要的是MySql服务器的地址,用户名和密码,该用户必须具有了replication slave权限才能够。slaveId是当前解析器的slaveId,它不能与其它的slaveId冲突。 fetch

链接方法实现

public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }


基本上都是调用了MysqlConnection的方法实现的,还须要进入该类查看实现。 编码

dump方法


public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        while (fetcher.fetch()) {
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }
        }
    }  
/**
     * the settings that will need to be checked or set:<br>
     * <ol>
     * <li>wait_timeout</li>
     * <li>net_write_timeout</li>
     * <li>net_read_timeout</li>
     * </ol>
     * 
     * @param channel
     * @throws IOException
     */
    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // 设置服务端返回结果时不作编码转化,直接按照数据库的二进制编码进行发送,由客户端本身根据需求进行编码转化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mysql5.6针对checksum支持须要设置session变量
            // 若是不设置会出现错误: Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能乱设置,须要和mysql server的checksum配置一致,否则RotateLogEvent会出现乱码
            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mariadb针对特殊的类型,须要设置session变量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
    }


dump方法的实现流程是这样的。

1. 更新MySQL配置信息。调用方法updateSettings();主要包括设置超时时间、设置数据库直接发送二进制数据,设置master_binlog_checksum和mariadb_slave_capability等变量值。

2.发送binlogdump命令。发送COM_BINLOG_DUMP命令,携带binlogFileName、binlogPosition和slaveServerId等关键信息。

3.构建一个binlog获取器组件DirectLogFetcher。使用它得到binlog数据。

4.循环从DirectLogFetcher获取内容,将获取到的数据转化为event。

5.调用SinkFunction处理获取到的event,若处理失败则会中断循环,不然继续。

接下来要看懂他们就须要了解MySQL的binlog协议及数据格式定义了。

相关文章
相关标签/搜索