client的工做过程,须要咱们本身去编写对应的逻辑,咱们目前只能从example写的例子来看。目前examle中提供了两个例子,一个是单机的,一个是集群的cluster,咱们后续若是须要进行开发的话,其实也是开发咱们本身的client,以及client的一些逻辑。咱们主要看下集群的client是如何实现和消费的,又是怎么和server进行数据交互的。java
咱们来看看具体的代码:node
protected void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); waiting = false; while (running) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { printSummary(message, batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } catch (Exception e) { logger.error("process error!", e); } finally { connector.disconnect(); MDC.remove("destination"); } } }
这个的这样的过程是这样的数据库
咱们具体来看下。编程
CanalConnector主要有两个实现,一个是SimpleCanalConnector,一个是ClusterCanalConnector,咱们主要看下ClusterCanalConnector,这也是咱们要用的一个模式。json
咱们用的时候,经过一个工厂类生成咱们须要的Connector,这里的工厂类是CanalConnectors,里面包含了生成ClusterCanalConnector的方法。服务器
public static CanalConnector newClusterConnector(String zkServers, String destination, String username, String password) { ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers))); canalConnector.setSoTimeout(30 * 1000); return canalConnector; }
用到的参数有zk的地址,canal的名称,数据库的帐号密码。里面有个ClusterNodeAccessStrategy是用来选择client的策略,这个ClusterNodeAccessStrategy的构造方法里面有些东西须要咱们关注下。网络
public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){ this.zkClient = zkClient; childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { initClusters(currentChilds); } }; dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { runningAddress = null; } public void handleDataChange(String dataPath, Object data) throws Exception { initRunning(data); } }; String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination); this.zkClient.subscribeChildChanges(clusterPath, childListener); initClusters(this.zkClient.getChildren(clusterPath)); String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination); this.zkClient.subscribeDataChanges(runningPath, dataListener); initRunning(this.zkClient.readData(runningPath, true)); }
这边起了两个监听器,都是监听server端的活动服务器的。一个是获取全部的server列表,一个是获取活动的server服务器,都是从zk的对应节点上去取的。session
获取到CanalConnector以后,就是真正的链接了。在ClusterCanalConnector中,咱们能够看到,其实他底层用的也是SimpleCanalConnector,只不过加了一个选择的策略。app
public void connect() throws CanalClientException { if (connected) { return; } if (runningMonitor != null) { if (!runningMonitor.isStart()) { runningMonitor.start(); } } else { waitClientRunning(); if (!running) { return; } doConnect(); if (filter != null) { // 若是存在条件,说明是自动切换,基于上一次的条件订阅一次 subscribe(filter); } if (rollbackOnConnect) { rollback(); } } connected = true; }
若是是集群模式的客户端,那么这边的runningMonitor不为空,由于他进行了初始化。咱们主要看下runningMonitor.start()里面的操做。socket
public void start() { super.start(); String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId()); zkClient.subscribeDataChanges(path, dataListener); initRunning(); }
这边监听的路径是:/otter/canal/destinations/{destination}/{clientId}/running。若是有任何的变化,或节点的删除,那么执行dataListener里面的操做。
dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操做,而且本机以前是active release = true; releaseRunning();// 完全释放mainstem } activeData = (ClientRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); // 触发一下退出,多是人为干预的释放操做或者网络闪断引发的session expired timeout processActiveExit(); if (!release && activeData != null && isMine(activeData.getAddress())) { // 若是上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 不然就是等待delayTime,避免因网络瞬端或者zk异常,致使出现频繁的切换操做 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } };
这里的注释比较清楚,基本上若是数据发生了变化,那么进行节点释放后,将运行节点置为活动节点。若是发生了数据删除,那么直接触发退出,若是上一次的active状态是本机,那么触发一下active抢占,不然等待delayTime,默认5s后重试。下面咱们主要看下initRunning。
这块主要是建立运行节点的临时节点。节点路径是/otter/canal/destinations/{destination}/{clientId},节点内容是ClientRunningData的json序列化结果。链接的代码:
public InetSocketAddress processActiveEnter() { InetSocketAddress address = doConnect(); mutex.set(true); if (filter != null) { // 若是存在条件,说明是自动切换,基于上一次的条件订阅一次 subscribe(filter); } if (rollbackOnConnect) { rollback(); } return address; }
这块有几段逻辑,咱们慢慢看下。
这里是client直接连上了server,经过socket链接,也就是server暴露的socket端口。
private InetSocketAddress doConnect() throws CanalClientException { try { channel = SocketChannel.open(); channel.socket().setSoTimeout(soTimeout); SocketAddress address = getAddress(); if (address == null) { address = getNextAddress(); } channel.connect(address); readableChannel = Channels.newChannel(channel.socket().getInputStream()); writableChannel = Channels.newChannel(channel.socket().getOutputStream()); Packet p = Packet.parseFrom(readNextPacket()); if (p.getVersion() != 1) { throw new CanalClientException("unsupported version at this client."); } if (p.getType() != PacketType.HANDSHAKE) { throw new CanalClientException("expect handshake but found other type."); } // Handshake handshake = Handshake.parseFrom(p.getBody()); supportedCompressions.addAll(handshake.getSupportedCompressionsList()); // ClientAuth ca = ClientAuth.newBuilder() .setUsername(username != null ? username : "") .setPassword(ByteString.copyFromUtf8(password != null ? password : "")) .setNetReadTimeout(soTimeout) .setNetWriteTimeout(soTimeout) .build(); writeWithHeader(Packet.newBuilder() .setType(PacketType.CLIENTAUTHENTICATION) .setBody(ca.toByteString()) .build() .toByteArray()); // Packet ack = Packet.parseFrom(readNextPacket()); if (ack.getType() != PacketType.ACK) { throw new CanalClientException("unexpected packet type when ack is expected"); } Ack ackBody = Ack.parseFrom(ack.getBody()); if (ackBody.getErrorCode() > 0) { throw new CanalClientException("something goes wrong when doing authentication: " + ackBody.getErrorMessage()); } connected = true; return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort()); } catch (IOException e) { throw new CanalClientException(e); } }
这边采用NIO编程,创建和server的socket链接后,发送了握手包和认证包,当收到ack包后,认为链接成功。认证包的服务端处理在ClientAuthenticationHandler类中,握手处理在HandshakeInitializationHandler类。
server接收到认证的消息后,会作以下的处理:
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array()); switch (packet.getVersion()) { case SUPPORTED_VERSION: default: final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody()); // 若是存在订阅信息 if (StringUtils.isNotEmpty(clientAuth.getDestination()) && StringUtils.isNotEmpty(clientAuth.getClientId())) { ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(), Short.valueOf(clientAuth.getClientId()), clientAuth.getFilter()); try { MDC.put("destination", clientIdentity.getDestination()); embeddedServer.subscribe(clientIdentity); ctx.setAttachment(clientIdentity);// 设置状态数据 // 尝试启动,若是已经启动,忽略 if (!embeddedServer.isStart(clientIdentity.getDestination())) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (!runningMonitor.isStart()) { runningMonitor.start(); } } } finally { MDC.remove("destination"); } } NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { //忽略 } }); break; } }
主要的逻辑在subscribe里面。若是metaManager没有启动,那么须要进行启动。启动时,会从zk节点下面拉取一些数据,包括客户端的消费位点状况等等。而后就是订阅,订阅是新建一个zk节点,路径为/otter/canal/destinations/{destination}/{clientId}。而后还有一些过滤器,也须要写到zk中。以后就是获取一下本client的位点信息,若是原来zk中包含,那么直接从内存中获取,不然取eventStore的第一条数据。
发送订阅消息给server,经过socket的方式。这边是判断,若是filter不为空,才发送订阅消息。服务端的处理过程是这样的:
case SUBSCRIPTION: Sub sub = Sub.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) { clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()), sub.getFilter()); MDC.put("destination", clientIdentity.getDestination()); // 尝试启动,若是已经启动,忽略 if (!embeddedServer.isStart(clientIdentity.getDestination())) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (!runningMonitor.isStart()) { runningMonitor.start(); } } embeddedServer.subscribe(clientIdentity); ctx.setAttachment(clientIdentity);// 设置状态数据 NettyUtils.ack(ctx.getChannel(), null); } else { NettyUtils.error(401, MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(), ctx.getChannel(), null); } break;
相似于connect的过程,不过这边带上了filter的参数。这边启动了server以及他的监听器。
这里的回滚是指回滚server端记录的本client的位点信息。
public void rollback() throws CanalClientException { waitClientRunning(); rollback(0);// 0代笔未设置 }
这里发送了rollback的指令。服务端是这么处理的:
case CLIENTROLLBACK: ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody()); MDC.put("destination", rollback.getDestination()); if (StringUtils.isNotEmpty(rollback.getDestination()) && StringUtils.isNotEmpty(rollback.getClientId())) { clientIdentity = new ClientIdentity(rollback.getDestination(), Short.valueOf(rollback.getClientId())); if (rollback.getBatchId() == 0L) { embeddedServer.rollback(clientIdentity);// 回滚全部批次 } else { embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次 } } else { NettyUtils.error(401, MessageFormatter.format("destination or clientId is null", rollback.toString()) .getMessage(), ctx.getChannel(), null); } break;
这里的batchId传入的是0,也就是要回滚全部的批次。咱们来看下这个回滚的动做:
@Override public void rollback(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // 由于存在第一次连接时自动rollback的状况,因此须要忽略未订阅 boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { // 清除batch信息 canalInstance.getMetaManager().clearAllBatchs(clientIdentity); // rollback eventStore中的状态信息 canalInstance.getEventStore().rollback(); logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() }); } }
这里回滚的,实际上是eventStore中的指针,把get的指针设置为以前ack的指针。
当client链接server完成后,就须要进行binlog数据的订阅。
public void subscribe() throws CanalClientException { subscribe(""); // 传递空字符便可 } public void subscribe(String filter) throws CanalClientException { int times = 0; while (times < retryTimes) { try { currentConnector.subscribe(filter); this.filter = filter; return; } catch (Throwable t) { if (retryTimes == -1 && t.getCause() instanceof InterruptedException) { logger.info("block waiting interrupted by other thread."); return; } else { logger.warn(String.format( "something goes wrong when subscribing from server: %s", currentConnector != null ? currentConnector.getAddress() : "null"), t); times++; restart(); logger.info("restart the connector for next round retry."); } } } throw new CanalClientException("failed to subscribe after " + times + " times retry."); }
订阅这块的内容再也不赘述,在上面的connect过程当中有提到。这边还有一个失败重试的机制,当异常不是中断异常的状况下,会重试重启client connector,直到达到了阈值retryTimes。
在创建链接和进行数据订阅以后,就能够开始进行binlog数据的获取了。主要的方法是getWithOutAck这个方法,这种是须要client本身进行数据ack的,保证了只有数据真正的被消费,并且进行了业务逻辑处理以后,才会ack。固然,若是有了异常,也会进行必定次数的重试和重启。
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); try { ...//忽略 writeWithHeader(Packet.newBuilder() .setType(PacketType.GET) .setBody(Get.newBuilder() .setAutoAck(false) .setDestination(clientIdentity.getDestination()) .setClientId(String.valueOf(clientIdentity.getClientId())) .setFetchSize(size) .setTimeout(time) .setUnit(unit.ordinal()) .build() .toByteString()) .build() .toByteArray()); return receiveMessages(); } catch (IOException e) { throw new CanalClientException(e); } }
咱们能够看到,实际上是发送了一个GET命令给server端,而后传递了一个参数batchSize,还有超时时间,并且不是自动提交的。服务端的处理是这样的:
embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
也是调用的这个方法:
@Override public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); synchronized (canalInstance) { // 获取到流式数据中的最后一批获取的位置 PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); Events<Event> events = null; if (positionRanges != null) { // 存在流数据 events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit); } else {// ack后第一次获取 Position start = canalInstance.getMetaManager().getCursor(clientIdentity); if (start == null) { // 第一次,尚未过ack记录,则获取当前store中的第一条 start = canalInstance.getEventStore().getFirstPosition(); } events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); } if (CollectionUtils.isEmpty(events.getEvents())) { logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", clientIdentity.getClientId(), batchSize); return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能 } else { // 记录到流式信息 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); } }); if (logger.isInfoEnabled()) { logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); } return new Message(batchId, entrys); } } }
最主要的逻辑在这里:
结果封装在Messages中,最终改成Message,包含批次号和binlog列表。
拿到message后,须要进行判断batchId,若是batchId=-1或者binlog大小为0,说明没有拿到数据。不然在message基础上进行逻辑处理。
Message的内容,后续咱们再进行讨论。
connector.ack(batchId); // 提交确认
提交批次id,底层发送CLIENTACK命令到server。server调用CanalServerWithEmbedded的ack方法来进行提交。
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); PositionRange<LogPosition> positionRanges = null; positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置 if (positionRanges == null) { // 说明是重复的ack/rollback throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } // 更新cursor if (positionRanges.getAck() != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck()); if (logger.isInfoEnabled()) { logger.info("ack successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } } // 可定时清理数据 canalInstance.getEventStore().ack(positionRanges.getEnd()); }
首先更新metaManager中的batch,而后更新ack指针,同时清理store中到ack指针位置的数据。
若是有失败的状况,须要进行回滚。发送CLIENTROLLBACK命令给server端,进行数据回滚。回滚单个批次时的处理逻辑是这样的:
@Override public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // 由于存在第一次连接时自动rollback的状况,因此须要忽略未订阅 boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { // 清除batch信息 PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); if (positionRanges == null) { // 说明是重复的ack/rollback throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } // lastRollbackPostions.put(clientIdentity, // positionRanges.getEnd());// 记录一下最后rollback的位置 // TODO 后续rollback到指定的batchId位置 canalInstance.getEventStore().rollback();// rollback // eventStore中的状态信息 logger.info("rollback successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } }
这里的rollback到指定的batchId,实际上是假的。他的rollback也是全量回滚到ack的指针位置。
在发生异常状况时,client会断开与server的链接,也就是disconnect方法。
public void disconnect() throws CanalClientException { if (rollbackOnDisConnect && channel.isConnected()) { rollback(); } connected = false; if (runningMonitor != null) { if (runningMonitor.isStart()) { runningMonitor.stop(); } } else { doDisconnnect(); } }
判断是否在断开链接的时候回滚参数(默认false)和当前socket通道是否链接中,进行回滚。
不然调用runningMonitor.stop方法进行中止。主要的过程是这样的: