本文主要分析的部分是instance启动时,parser的一个启动和工做过程。主要关注的是AbstractEventParser的start()方法中的parseThread。html
parseThread中包含的内容比较清晰,代码不是很长,咱们逐步分析下。java
erosaConnection = buildErosaConnection();
这里构造的,应该是一个mysql的连接,包括的内容都是从配置文件中过来的一些信息,包括mysql的地址,帐号密码等。mysql
startHeartBeat(erosaConnection);
这里的心跳,感受是个假的心跳,并无用到connection相关的内容。启动一个定时任务,默认3s发送一个心跳的binlog给sink阶段,表名parser还在工做。在sink阶段,会把心跳的binlog直接过滤,不会走到store过程。sql
这一步的代码也不复杂。数据库
preDump(erosaConnection);
咱们看看preDump都可以作什么?在MysqlEventParser中,咱们能够看到,主要作了几件事:缓存
这一步是比较核心的,也是保证binlog不丢失的核心代码。dom
EntryPosition position = findStartPosition(erosaConnection); final EntryPosition startPosition = position; if (startPosition == null) { throw new CanalParseException("can't find start position for " + destination); } if (!processTableMeta(startPosition)) { throw new CanalParseException("can't find init table meta for " + destination + " with position : " + startPosition); }
具体的findStartPosition是怎么实现的,请查阅下一篇文章。源码分析
若是没有找到最后的位置信息,那么直接抛出异常,不然还要进行一次判断,也就是processTableMeta,咱们看下这个方法作了什么。fetch
protected boolean processTableMeta(EntryPosition position) { if (isGTIDMode()) { if (binlogParser instanceof LogEventConvert) { // 记录gtid ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid())); } } if (tableMetaTSDB != null) { if (position.getTimestamp() == null || position.getTimestamp() <= 0) { throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0"); } return tableMetaTSDB.rollback(position); } return true; }
若是开启了GTID模式,那么直接设置GTID集合。若是tableMetaTSDB不为空,那么直接根据位置信息回滚到对应的表结构。这个tableMetaTSDB记录的是一个表结构的时序,使用的是Druid的一个功能,把全部DDL记录在数据库中,通常来讲,每24小时生成一份快照插入到数据库中,这样能解决DDL产生的表结构不一致的问题,也就是增长了一个表结构的回溯功能。ui
这边的rollback主要作的事情为:
在dump以前,代码中构造了一个sink类,也就是SinkFunction。里面定义了一个sink方法,主要的内容是对哪些数据进行过滤。
try { CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false); if (!running) { return false; } if (entry != null) { exception = null; // 有正常数据流过,清空exception transactionBuffer.add(entry); // 记录一下对应的positions this.lastPosition = buildLastPosition(entry); // 记录一下最后一次有数据的时间 lastEntryTime = System.currentTimeMillis(); } return running; } catch (TableIdNotFoundException e) { throw e; } catch (Throwable e) { if (e.getCause() instanceof TableIdNotFoundException) { throw (TableIdNotFoundException) e.getCause(); } // 记录一下,出错的位点信息 processSinkError(e, this.lastPosition, startPosition.getJournalName(), startPosition.getPosition()); throw new CanalParseException(e); // 继续抛出异常,让上层统一感知 }
首先判断parser是否在运行,若是不运行,那么就直接抛弃。运行时,判断entry是否为空,不为空的状况下,直接将entry加入到transactionBuffer中。这里咱们说下这个transactionBuffer,其实相似于Disruptor中的一个环形队列(默认长度为1024),维护了几个指针,包括put、get、ack三个指针,里面存储了须要进行传递到下一阶段的数据。
加到环形队列以后,记录一下当前的位置信息和时间。若是这个过程出错了,须要记录下出错的位置信息,这里的processSinkError其实就是打印了一下错误日志,而后抛出了一个CanalException,让上一层感知。
说了这么多,还没到真正开始dump的地方。下面开始吧。
if (isGTIDMode()) { erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler); } else { if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) { erosaConnection.dump(startPosition.getTimestamp(), sinkHandler); } else { erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler); } }
在新版本中,增长了GTID的模式,因此这里的dump须要判断怎么dump,发送什么命令给mysql来获取什么样的binlog。
若是开启了GTID模式(在instance.properties开启),那么须要发送COM_BINLOG_DUMP_GTID命令,而后开始接受binlog信息,进行binlog处理。
public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException { updateSettings(); sendBinlogDumpGTID(gtidSet); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } } }
调用LogDecoder.decode方法,对二进制进行解析,解析为咱们须要的LogEvent,若是解析失败,抛出异常。不然进行sink,若是sink返回的false,那么直接跳过,不然加入到transactionBuffer中。
这块有个逻辑判断,若是找到的最后的位置信息中包含了时间戳,若是没有binlog文件名,那么在MysqlConnection中直接报错,也就是必须既要有时间戳,又要有binlog文件名,才能进行dump操做。
这里的dump分了两步,第一步就是发送COM_REGISTER_SLAVE命令,假装本身是一个slave,而后发送COM_BINLOG_DUMP命令接收binlog。
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendRegisterSlave(); sendBinlogDump(binlogfilename, binlogPosition); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } if (event.getSemival() == 1) { sendSemiAck(context.getLogPosition().getFileName(), binlogPosition); } } }
这里有个mysql半同步的标识,semival。若是semival==1,说明须要进行ack,发送SEMI_SYNC_ACK给master(咱们这边more都不开启)。
若是整个过程当中发生了异常,有如下几种处理方式:
} catch (TableIdNotFoundException e) { exception = e; // 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,致使tablemap // Event时间没解析过 needTransactionPosition.compareAndSet(false, true); logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); } catch (Throwable e) { processDumpError(e); exception = e; if (!running) { if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) { throw new CanalParseException(String.format("dump address %s has an error, retrying. ", runningInfo.getAddress().toString()), e); } } else { logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); sendAlarm(destination, ExceptionUtils.getFullStackTrace(e)); } } finally { // 从新置为中断状态 Thread.interrupted(); // 关闭一下连接 afterDump(erosaConnection); try { if (erosaConnection != null) { erosaConnection.disconnect(); } } catch (IOException e1) { if (!running) { throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", runningInfo.getAddress().toString()), e1); } else { logger.error("disconnect address {} has an error, retrying., caused by ", runningInfo.getAddress().toString(), e1); } } } // 出异常了,退出sink消费,释放一下状态 eventSink.interrupt(); transactionBuffer.reset();// 重置一下缓冲队列,从新记录数据 binlogParser.reset();// 从新置位 if (running) { // sleep一段时间再进行重试 try { Thread.sleep(10000 + RandomUtils.nextInt(10000)); } catch (InterruptedException e) { } }