Canal的数据传输有两块,一块是进行binlog订阅时,binlog转换为咱们所定义的Message,第二块是client与server进行TCP交互时,传输的TCP协议。mysql
这块是binlog的一个存储。主要的格式以下:git
Entry Header version [协议的版本号,default = 1] logfileName [binlog文件名] logfileOffset [binlog position] serverId [服务端serverId] serverenCode [变动数据的编码] executeTime [变动数据的执行时间] sourceType [变动数据的来源,default = MYSQL] schemaName [变动数据的schemaname] tableName [变动数据的tablename] eventLength [每一个event的长度] eventType [insert/update/delete类型,default = UPDATE] props [预留扩展] gtid [当前事务的gitd] entryType [事务头BEGIN/事务尾END/数据ROWDATA/HEARTBEAT/GTIDLOG] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange tableId [tableId,由数据库产生] eventType [数据变动类型,default = UPDATE] isDdl [标识是不是ddl语句,好比create table/drop table] sql [ddl/query的sql语句] rowDatas [具体insert/update/delete的变动数据,可为多条,1个binlog event事件可对应多条变动,好比批处理] beforeColumns [字段信息,增量数据(修改前,删除前),Column类型的数组] afterColumns [字段信息,增量数据(修改后,新增后),Column类型的数组] props [预留扩展] props [预留扩展] ddlSchemaName [ddl/query的schemaName,会存在跨库ddl,须要保留执行ddl的当前schemaName] Column index [字段下标] sqlType [jdbc type] name [字段名称(忽略大小写),在mysql中是没有的] isKey [是否为主键] updated [是否发生过变动] isNull [值是否为null] props [预留扩展] value [字段值,timestamp,Datetime是一个时间格式的文本] length [对应数据对象原始长度] mysqlType [字段mysql类型]
这块主要定义了client和server交互的协议。sql
Packet magic_number [default = 17] version [default = 1] type [PacketType,类型] compression [压缩,default = NONE] body [具体内容]
主要的类型和对应的body,均可以在CanalProtocal.proto里面查看到。数据库
enum PacketType { HANDSHAKE = 1; CLIENTAUTHENTICATION = 2; ACK = 3; SUBSCRIPTION = 4; UNSUBSCRIPTION = 5; GET = 6; MESSAGES = 7; CLIENTACK = 8; // management part SHUTDOWN = 9; // integration DUMP = 10; HEARTBEAT = 11; CLIENTROLLBACK = 12; }
//心跳 message HeartBeat { optional int64 send_timestamp = 1; optional int64 start_timestamp = 2; } //握手 message Handshake { optional string communication_encoding = 1 [default = "utf8"]; optional bytes seeds = 2; repeated Compression supported_compressions = 3; } // client authentication message ClientAuth { optional string username = 1; optional bytes password = 2; // hashed password with seeds from Handshake message optional int32 net_read_timeout = 3 [default = 0]; // in seconds optional int32 net_write_timeout = 4 [default = 0]; // in seconds optional string destination = 5; optional string client_id = 6; optional string filter = 7; optional int64 start_timestamp = 8; } //服务端响应 message Ack { optional int32 error_code = 1 [default = 0]; optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it. } //客户端提交 message ClientAck { optional string destination = 1; optional string client_id = 2; optional int64 batch_id = 3; } // subscription message Sub { optional string destination = 1; optional string client_id = 2; optional string filter = 7; } // Unsubscription message Unsub { optional string destination = 1; optional string client_id = 2; optional string filter = 7; } // PullRequest message Get { optional string destination = 1; optional string client_id = 2; optional int32 fetch_size = 3; optional int64 timeout = 4 [default = -1]; // 默认-1时表明不控制 optional int32 unit = 5 [default = 2];// 数字类型,0:纳秒,1:毫秒,2:微秒,3:秒,4:分钟,5:小时,6:天 optional bool auto_ack = 6 [default = false]; // 是否自动ack } //消息 message Messages { optional int64 batch_id = 1; repeated bytes messages = 2; } // TBD when new packets are required message Dump{ optional string journal = 1; optional int64 position = 2; optional int64 timestamp = 3 [default = 0]; } // 客户端回滚 message ClientRollback{ optional string destination = 1; optional string client_id = 2; optional int64 batch_id = 3; }