本文主要研究一下canal的EventTransactionBufferjava
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.javamysql
public class EventTransactionBuffer extends AbstractCanalLifeCycle { private static final long INIT_SQEUENCE = -1; private int bufferSize = 1024; private int indexMask; private CanalEntry.Entry[] entries; private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 表明当前put操做最后一次写操做发生的位置 private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE); // 表明知足flush条件后最后一次数据flush的时间 private TransactionFlushCallback flushCallback; public EventTransactionBuffer(){ } public EventTransactionBuffer(TransactionFlushCallback flushCallback){ this.flushCallback = flushCallback; } public void start() throws CanalStoreException { super.start(); if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } Assert.notNull(flushCallback, "flush callback is null!"); indexMask = bufferSize - 1; entries = new CanalEntry.Entry[bufferSize]; } public void stop() throws CanalStoreException { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); entries = null; super.stop(); } public void add(List<CanalEntry.Entry> entrys) throws InterruptedException { for (CanalEntry.Entry entry : entrys) { add(entry); } } public void add(CanalEntry.Entry entry) throws InterruptedException { switch (entry.getEntryType()) { case TRANSACTIONBEGIN: flush();// 刷新上一次的数据 put(entry); break; case TRANSACTIONEND: put(entry); flush(); break; case ROWDATA: put(entry); // 针对非DML的数据,直接输出,不进行buffer控制 EventType eventType = entry.getHeader().getEventType(); if (eventType != null && !isDml(eventType)) { flush(); } break; case HEARTBEAT: // master过来的heartbeat,说明binlog已经读完了,是idle状态 put(entry); flush(); break; default: break; } } public void reset() { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); } private void put(CanalEntry.Entry data) throws InterruptedException { // 首先检查是否有空位 if (checkFreeSlotAt(putSequence.get() + 1)) { long current = putSequence.get(); long next = current + 1; // 先写数据,再更新对应的cursor,并发度高的状况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值 entries[getIndex(next)] = data; putSequence.set(next); } else { flush();// buffer区满了,刷新一下 put(data);// 继续加一下新数据 } } private void flush() throws InterruptedException { long start = this.flushSequence.get() + 1; long end = this.putSequence.get(); if (start <= end) { List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>(); for (long next = start; next <= end; next++) { transaction.add(this.entries[getIndex(next)]); } flushCallback.flush(transaction); flushSequence.set(end);// flush成功后,更新flush位置 } } //...... }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.javagit
public static interface TransactionFlushCallback { public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException; }
canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.protogithub
syntax = "proto3"; package com.alibaba.otter.canal.protocol; option java_package = "com.alibaba.otter.canal.protocol"; option java_outer_classname = "CanalEntry"; option optimize_for = SPEED; /**************************************************************** * message model *若是要在Enum中新增类型,确保之前的类型的下标值不变. ****************************************************************/ message Entry { /**协议头部信息**/ Header header = 1; ///**打散后的事件类型**/ [default = ROWDATA] oneof entryType_present{ EntryType entryType = 2; } /**传输的二进制数组**/ bytes storeValue = 3; } /**message Header**/ message Header { /**协议的版本号**/ //[default = 1] oneof version_present { int32 version = 1; } /**binlog/redolog 文件名**/ string logfileName = 2; /**binlog/redolog 文件的偏移位置**/ int64 logfileOffset = 3; /**服务端serverId**/ int64 serverId = 4; /** 变动数据的编码 **/ string serverenCode = 5; /**变动数据的执行时间 **/ int64 executeTime = 6; /** 变动数据的来源**/ //[default = MYSQL] oneof sourceType_present { Type sourceType = 7; } /** 变动数据的schemaname**/ string schemaName = 8; /**变动数据的tablename**/ string tableName = 9; /**每一个event的长度**/ int64 eventLength = 10; /**数据变动类型**/ // [default = UPDATE] oneof eventType_present { EventType eventType = 11; } /**预留扩展**/ repeated Pair props = 12; /**当前事务的gitd**/ string gtid = 13; } /**每一个字段的数据结构**/ message Column { /**字段下标**/ int32 index = 1; /**字段java中类型**/ int32 sqlType = 2; /**字段名称(忽略大小写),在mysql中是没有的**/ string name = 3; /**是不是主键**/ bool isKey = 4; /**若是EventType=UPDATE,用于标识这个字段值是否有修改**/ bool updated = 5; /** 标识是否为空 **/ //[default = false] oneof isNull_present { bool isNull = 6; } /**预留扩展**/ repeated Pair props = 7; /** 字段值,timestamp,Datetime是一个时间格式的文本 **/ string value = 8; /** 对应数据对象原始长度 **/ int32 length = 9; /**字段mysql类型**/ string mysqlType = 10; } message RowData { /** 字段信息,增量数据(修改前,删除前) **/ repeated Column beforeColumns = 1; /** 字段信息,增量数据(修改后,新增后) **/ repeated Column afterColumns = 2; /**预留扩展**/ repeated Pair props = 3; } /**message row 每行变动数据的数据结构**/ message RowChange { /**tableId,由数据库产生**/ int64 tableId = 1; /**数据变动类型**/ //[default = UPDATE] oneof eventType_present { EventType eventType = 2; } /** 标识是不是ddl语句 **/ // [default = false] oneof isDdl_present { bool isDdl = 10; } /** ddl/query的sql语句 **/ string sql = 11; /** 一次数据库变动可能存在多行 **/ repeated RowData rowDatas = 12; /**预留扩展**/ repeated Pair props = 13; /** ddl/query的schemaName,会存在跨库ddl,须要保留执行ddl的当前schemaName **/ string ddlSchemaName = 14; } /**开始事务的一些信息**/ message TransactionBegin{ /**已废弃,请使用header里的executeTime**/ int64 executeTime = 1; /**已废弃,Begin里不提供事务id**/ string transactionId = 2; /**预留扩展**/ repeated Pair props = 3; /**执行的thread Id**/ int64 threadId = 4; } /**结束事务的一些信息**/ message TransactionEnd{ /**已废弃,请使用header里的executeTime**/ int64 executeTime = 1; /**事务号**/ string transactionId = 2; /**预留扩展**/ repeated Pair props = 3; } /**预留扩展**/ message Pair{ string key = 1; string value = 2; } /**打散后的事件类型,主要用于标识事务的开始,变动数据,结束**/ enum EntryType{ ENTRYTYPECOMPATIBLEPROTO2 = 0; TRANSACTIONBEGIN = 1; ROWDATA = 2; TRANSACTIONEND = 3; /** 心跳类型,内部使用,外部暂不可见,可忽略 **/ HEARTBEAT = 4; GTIDLOG = 5; } /** 事件类型 **/ enum EventType { EVENTTYPECOMPATIBLEPROTO2 = 0; INSERT = 1; UPDATE = 2; DELETE = 3; CREATE = 4; ALTER = 5; ERASE = 6; QUERY = 7; TRUNCATE = 8; RENAME = 9; /**CREATE INDEX**/ CINDEX = 10; DINDEX = 11; GTID = 12; /** XA **/ XACOMMIT = 13; XAROLLBACK = 14; /** MASTER HEARTBEAT **/ MHEARTBEAT = 15; } /**数据库类型**/ enum Type { TYPECOMPATIBLEPROTO2 = 0; ORACLE = 1; MYSQL = 2; PGSQL = 3; }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.javasql
public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> { //...... public AbstractEventParser(){ // 初始化一下 transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() { public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException { boolean successed = consumeTheEventAndProfilingIfNecessary(transaction); if (!running) { return; } if (!successed) { throw new CanalParseException("consume failed!"); } LogPosition position = buildLastTransactionPosition(transaction); if (position != null) { // 可能position为空 logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position); } } }); } protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException, InterruptedException { long startTs = -1; boolean enabled = getProfilingEnabled(); if (enabled) { startTs = System.currentTimeMillis(); } boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination); if (enabled) { this.processingInterval = System.currentTimeMillis() - startTs; } if (consumedEventCount.incrementAndGet() < 0) { consumedEventCount.set(0); } return result; } //...... }
EventTransactionBuffer继承了AbstractCanalLifeCycle,其start方法建立bufferSize大小的CanalEntry.Entry数组;其stop方法设置putSequence及flushSequence为INIT_SQEUENCE,设置entries为null;其add方法根据entry.getEntryType()的不一样类型作不一样的处理,基本是执行put及flush方法;其reset方法设置putSequence及flushSequence为INIT_SQEUENCE;put方法给entries复制的同时更新putSequence,若是buffer满了则执行flush在从新put;flush方法则执行flushCallback.flush(transaction),并更新flushSequence数据库