集群中存活的节点与同步
分布式系统中,如何判断一个节点(node)是否存活?
kafka这样认为:node
- 此节点和zookeeper能喊话.(Keep sessions with zookeeper through heartbeats.)
- 此节点若是是个从节点,必须可以尽量忠实地反映主节点的数据变化。
也就是说,必须可以在主节点写了新数据后,及时复制这些变化的数据,所谓及时,不能拉下太多哦.
那么,符合上面两个条件的节点就能够认为是存活的,也能够认为是同步的(in-sync).数据库
关于第1点,你们对心跳都很熟悉,那么咱们能够这样认为某个节点不能和zookeeper喊话了:缓存
1网络 2session 3架构 4并发 5app 6框架 7异步 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
zookeeper-node: var timer = new timer() .setInterval(10sec) .onTime(slave-nodes,function(slave-nodes){ slave-nodes.forEach( node -> { boolean isAlive = node.heartbeatACK(15sec); if (!isAlive) { node.numNotAlive += 1 ; if (node.numNotAlive >= 3 ) { node.declareDeadOrFailed(); slave-nodes.remove(node); //回调也可 leader-node-app.notifyNodeDeadOrFailed(node) } } else node.numNotAlive = 0 ; }); }); timer.run(); //你能够回调也能够像下面这样简单的计时判断 leader-node-app: var timer = new timer() .setInterval(10sec) .onTime(slave-nodes,function(slave-nodes){ slave-nodes.forEach(node -> { if (node.isDeadOrFailed) { //node不能和zookeeper喊话了 } }); }); timer.run(); |
关于第二点,要稍微复杂点了,怎么搞呢?
来这么分析:
- 数据 messages.
- 操做 op-log.
- 偏移 position/offset.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
// 1. 先考虑messages // 2. 再考虑log的postion或者offset // 3. 考虑msg和off都记录在同源数据库或者存储设备上.(database or storage-device.) var timer = new timer() .setInterval(10sec) .onTime(slave-nodes, function (nodes){ var core-of-cpu = 8; //嫌慢就并发呗 mod hash go! nodes.groupParallel(core-of-cpu) .forEach(node -> { boolean nodeSucked = false ; if (node.ackTimeDiff > 30sec) { //30秒内没有回复,node卡住了 nodeSucked = true ; } if (node.logOffsetDiff > 100) { //node复制跟不上了,差距超过100条数据 nodeSucked = true ; } if (nodeSucked) { //总之node“死”掉了,其实到底死没死,谁知道呢?network-error在分布式系统中或者节点失败这个事情是正常现象. node.declareDeadOrFailed(); //不和你玩啦,集群不要你了 nodes.remove(node); //该怎么处理呢,抛个事件吧. fire-event-NodeDeadOrFailed(node); } }); }); timer.run(); |
上面的节点的状态管理通常由zookeeper来作,leader或者master节点也会维护那么点状态。
那么应用中的leader或者master节点,只须要从zookeeper拉状态就能够,同时,上面的实现是否是必定最佳呢?不是的,并且多数操做能够合起来,但为了描述节点是否存活这个事儿,我们这么写没啥问题。
节点死掉、失败、不一样步了,咋处理呢?
好嘛,终于说到failover和recover了,那failover比较简单,由于还有其它的slave节点在,不影响数据读取。
- 同时多个slave节点失败了?
没有100%的可用性.数据中心和机房瘫痪、网络电缆切断、hacker入侵删了你的根,总之你rp爆表了.
- 若是主节点失败了,那master-master不行嘛?
keep-alived或者LVS或者你本身写failover吧.
高可用架构(HA)又是个大件儿了,此文不展开了。
咱们来关注下recover方面的东西,这里把视野打开点,不只关注slave节点重启后追log来同步数据,咱们看下在实际应用中,数据请求(包括读、写、更新)失败怎么办?
你们可能都会说,重试(retry)呗、重放(replay)呗或者干脆无论了呗!
行,都行,这些都是策略,但具体怎么个搞法,你真的清楚了?
一个bigdata问题
咱们先摆个探讨的背景:
问题:消息流,好比微博的微博(真绕),源源不断地流进咱们的应用中,要处理这些消息,有个需求是这样的:
Reach is the number of unique people exposed to a URL on Twitter.
那么,统计一下3小时内的本条微博(url)的reach总数。
怎么解决呢?
把某时间段内转发过某条微博(url)的人拉出来,把这些人的粉丝拉出来,去掉重复的人,而后求总数,就是要求的reach.
为了简单,咱们忽略掉日期,先看看这个方法行不行:
1 2 3 4 5 6 7 8 9 10 11 12 |
/** --------------------------------- * 1. 求出转发微博(url)的大V. * __________________________________*/ 方法 :getUrlToTweetersMap(String url_id) SQL : /* 数据库A,表url_user存储了转发某url的user */ SELECT url_user.user_id as tweeter_id FROM url_user WHERE url_user.url_id = ${url_id} 返回 :[user_1,...,user_m] |
1 2 3 4 5 6 7 8 9 10 11 12 |
/** --------------------------------- * 2. 求出大V的粉丝 * __________________________________*/ 方法 : getFollowers(String tweeter_id); SQL : /* 数据库B */ SELECT users.id as user_id FROM users WHERE users.followee_id = ${tweeter_id} 返回:tweeter的粉丝 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
/** --------------------------------- * 3. 求出Reach * __________________________________*/ var url = queryArgs.getUrl(); var tweeters = getUrlToTweetersMap(); var result = new HashMap<String,Integer>(); tweeters.forEach(t -> { // 你能够批量in + 并发读来优化下面方法的性能 var followers = getFollowers(t.tweeter_id); followers.forEach(f -> { //hash去重 result.put(f.user_id,1); }); }); //Reach return result.size(); |
其实这又引出了一个很重要的问题,也是不少大谈框架、设计、模式却每每忽视的问题:性能和数据库建模的关系。
- 数据量有多大?
不知道读者有木有对这个问题的数据库I/O有点想法,或者虎躯一震呢?
Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples.
在上面的数据库设计中避免了JOIN,为了提升求大V粉丝的性能,能够将一批大V做为batch/bulk,而后多个batch并发读,誓死搞死数据库。
这里将微博到转发者表所在的库,与粉丝库分离,若是数据更大怎么办?
库再分表…
OK,假设你已经很是熟悉传统关系型数据库的分库分表及数据路由(读路径的聚合、写路径的分发)、或者你对于sharding技术也很熟悉、或者你良好的结合了HBase的横向扩展能力并有一致性策略来解决其二级索引问题.
总之,存储和读取的问题假设你已经解决了,那么分布式计算呢?
- 微博这种应用,人与人之间的关系成图状(网),你怎么建模存储?而不只仅对应这个问题,好比:
某人的好友的好友可能和某人有几分相熟?
看看用storm怎么来解决分布式计算,并提供流式计算的能力:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// url到大V -> 数据库1 TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); // 大V到粉丝 -> 数据库2 TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream( "reach" ) .stateQuery(urlToTweeters, new Fields( "args" ), new MapGet(), new Fields( "tweeters" )) .each( new Fields( "tweeters" ), new ExpandList(), new Fields( "tweeter" )) .shuffle() /* 大V的粉丝不少,因此须要分布式处理*/ .stateQuery(tweetersToFollowers, new Fields( "tweeter" ), new MapGet(), new Fields( "followers" )) .parallelismHint(200) /* 粉丝不少,因此须要高并发 */ .each( new Fields( "followers" ), new ExpandList(), new Fields( "follower" )) .groupBy( new Fields( "follower" )) .aggregate( new One(), new Fields( "one" )) /* 去重 */ .parallelismHint(20) .aggregate( new Count(), new Fields( "reach" )); /* 计算reach数 */ |
最多处理一次(At most once)
回到主题,引出上面的例子,一是为了引出一个有关分布式(存储+计算)的问题,二是透漏这么点意思:
码农,就应该关注设计和实现的东西,好比Jay Kreps是如何发明Kafka这个轮子的 : ]
若是你仍是码农级别,咱来务点实吧,前面咱们说到recover
,节点恢复的问题,那么咱们恢复几个东西?
基本的:
本篇从数据上来讨论下这个问题,为使问题再简单点,咱们考虑写数据的场景,若是咱们用write-ahead-log
的方式来保证数据复制和一致性,那么咱们会怎么处理一致性问题呢?
- 主节点有新数据写入.
- 从节点追log,准备复制这批新数据。从节点作两件事:
(1). 把数据的id偏移写入log;
(2). 正要处理数据自己,从节点挂了。
那么根据上文的节点存活条件,这个从节点挂了这件事被探测到了,从节点由维护人员手动或者其本身恢复了,那么在加入集群和小伙伴们继续玩耍以前,它要同步本身的状态和数据。
问题来了:
若是根据log内的数据偏移来同步数据,那么,由于这个节点在处理数据以前就把偏移写好了,但是那批数据lost-datas没有获得处理,若是追log以后的数据来同步,那么那批数据lost-datas就丢了。
在这种状况下,就叫做数据最多处理一次,也就是说数据会丢失。
最少处理一次(At least once)
好吧,丢失数据不能容忍,那么咱们换种方式来处理:
- 主节点有新数据写入.
- 从节点追log,准备复制这批新数据。从节点作两件事:
(1). 先处理数据;
(2). 正要把数据的id偏移写入log,从节点挂了。
问题又来了:
若是从节点追log来同步数据,那么由于那批数据duplicated-datas被处理过了,而数据偏移没有反映到log中,若是这样追,会致使这批数据重复。
这种场景,从语义上来说,就是数据最少处理一次,意味着数据处理会重复。
仅处理一次(Exactly once)
Transaction
好吧,数据重复也不能容忍?要求挺高啊。
你们都追求的强一致性保证(这里是最终一致性),怎么来搞呢?
换句话说,在更新数据的时候,事务能力如何保障呢?
假设一批数据以下:
1 2 3 4 5 6 |
// 新到数据 { transactionId:4 urlId:99 reach:5 } |
如今要更新这批数据到库里或者log里,那么原来的状况是:
1 2 3 4 5 6 |
// 老数据 { transactionId:3 urlId:99 reach:3 } |
若是说能够保证以下三点:
- 事务ID的生成是强有序的.(隔离性,串行)
- 同一个事务ID对应的一批数据相同.(幂等性,屡次操做一个结果)
- 单条数据会且仅会出如今某批数据中.(一致性,无遗漏无重复)
那么,放心大胆的更新好了:
1 2 3 4 5 6 7 |
// 更新后数据 { transactionId:4 urlId:99 //3 + 5 = 8 reach:8 } |
注意到这个更新是ID偏移和数据一块儿更新的,那么这个操做靠什么来保证:原子性。
你的数据库不提供原子性?后文略有说起。
这里是更新成功了。若是更新的时候,节点挂了,那么库里或者log里的id偏移不写,数据也不处理,等节点恢复,就能够放心去同步,而后加入集群玩耍了。
因此说,要保证数据仅处理一次,仍是挺困难的吧?
上面的保障“仅处理一次”这个语义的实现有什么问题呢?
性能问题。
这里已经使用了batch策略来减小到库或磁盘的Round-Trip Time,那么这里的性能问题是什么呢?
考虑一下,采用master-master架构来保证主节点的可用性,可是一个主节点失败了,到另外一个主节点主持工做,是须要时间的。
假设从节点正在同步,啪!主节点挂了!由于要保证仅处理一次的语义,因此原子性发挥做用,失败,回滚,而后从主节点拉失败的数据(你不能就近更新,由于这批数据可能已经变化了,或者你根本没缓存本批数据),结果是什么呢?
老主节点挂了, 新的主节点还没启动,因此此次事务就卡在这里,直到数据同步的源——主节点能够响应请求。
若是不考虑性能,就此做罢,这也不是什么大事。
你彷佛意犹未尽?来吧,看看“银弹”是什么?
Opaque-Transaction
如今,咱们来追求这样一种效果:
某条数据在一批数据中(这批数据对应着一个事务),极可能会失败,可是它会在另外一批数据中成功。
换句话说,一批数据的事务ID必定相同。
来看看例子吧,老数据不变,只是多了个字段:prevReach
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// 老数据 { transactionId:3 urlId:99 //注意这里多了个字段,表示以前的reach的值 prevReach:2 reach:3 } // 新到数据 { transactionId:4 urlId:99 reach:5 } |
这种状况,新事务的ID更大、更靠后,代表新事务能够执行,还等什么,直接更新,更新后数据以下:
1 2 3 4 5 6 7 8 9 |
// 新到数据 { transactionId:4 urlId:99 //注意这里更新为以前的值 prevReach:3 //3 + 5 = 8 reach:8 } |
如今来看下另外的状况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// 老数据 { transactionId:3 urlId:99 prevReach:2 reach:3 } // 新到数据 { //注意事务ID为3,和老数据中的事务ID相同 transactionId:3 urlId:99 reach:5 } |
这种状况怎么处理?是跳过吗?由于新数据的事务ID和库里或者log里的事务ID相同,按事务要求此次数据应该已经处理过了,跳过?
不,这种事不能靠猜的,想一想咱们有的几个性质,其中关键一点就是:
给定一批数据,它们所属的事务ID相同。
仔细体会下,上面那句话和下面这句话的差异:
给定一个事务ID,任什么时候候,其所关联的那批数据相同。
咱们应该这么作,考虑到新到数据的事务ID和存储中的事务ID一致,因此这批数据可能被分别或者异步处理了,可是,这批数据对应的事务ID永远是同一个,那么,即便这批数据中的A部分先处理了,因为你们都是一个事务ID,那么A部分的前值是可靠的。
因此,咱们将依靠prevReach而不是Reach的值来更新:
1 2 3 4 5 6 7 8 9 |
// 更新后数据 { transactionId:3 urlId:99 //这个值不变 prevReach:2 //2 + 5 = 7 reach:7 } |
你发现了什么呢?
不一样的事务ID,致使了不一样的值:
- 当事务ID为4,大于存储中的事务ID3,Reach更新为3+5 = 8.
- 当事务ID为3,等于存储中的事务ID3,Reach更新为2+5 = 7.
这就是Opaque Transaction
.
这种事务能力是最强的了,能够保证事务异步提交。因此不用担忧被卡住了,若是说集群中:
Transaction:
- 数据是分批处理的,每一个事务ID对应一批肯定、相同的数据.
- 保证事务ID的产生是强有序的.
- 保证分批的数据不重复、不遗漏.
- 若是事务失败,数据源丢失,那么后续事务就卡住直到数据源恢复.
Opaque-Transaction:
- 数据是分批处理的,每批数据有肯定而惟一的事务ID.
- 保证事务ID的产生是强有序的.
- 保证分批的数据不重复、不遗漏.
- 若是事务失败,数据源丢失,不影响后续事务,除非后续事务的数据源也丢了.
其实这个全局ID的设计也是门艺术:
- 冗余关联表的ID,以减小join,作到O(1)取ID.
- 冗余日期(long型)字段,以免order by.
- 冗余过滤字段,以免无二级索引(HBase)的尴尬.
- 存储mod-hash的值,以方便分库、分表后,应用层的数据路由书写.
这个内容也太多,话题也太大,就不在此展开了。
你如今知道twitter的snowflake生成全局惟一且有序的ID的重要性了。
两阶段提交
如今用zookeeper来作两阶段提交已是入门级技术,因此也不展开了。
若是你的数据库不支持原子操做,那么考虑两阶段提交吧