Paxos算法是大名鼎鼎的Zookeeper中采用的选举Leader的算法,事实上,在涉及到分布式系统的一致性的时候,就只有一种算法,那就是Paxos.java
首先来看,Paxos是为了解决什么问题:算法
Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景是,在一个分布式数据库存储中,若是各节点的初始状态一致,每一个节点执行相同的操做序列,那么他们最后能获得一个一致的状态。为保证每一个节点执行相同的命令序列,须要在每一条指令上执行一个“一致性算法”以保证每一个节点看到的指令一致。一个通用的一致性算法能够应用在许多场景中,是分布式计算中的重要问题。所以从20世纪80年代起对于一致性算法的研究就没有中止过。节点通讯存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。Paxos 算法就是一种基于消息传递模型的一致性算法。数据库
本文的实现参考了LynnCui在知乎上的文章:https://zhuanlan.zhihu.com/p/21438357promise
有关Paxos的详细介绍及推理过程都可参考上述文章,若是还有不明白,能够查阅相关文章,这一类的文章网上仍是比较多的。服务器
关于个人实现,我这里有几点须要说明:网络
1.关于maxVote(即比本次表决编号小的最大表决编号),若是该编号存在,则应该用改编号对应表决的提案做为下一次的提案,不然,则能够随机指定提案,具体的提案的指定方式依赖于需求逻辑,本程序中采用随机提案;app
2.Paxos假设消息的传递过程是不可靠的,这主要是由于实际环境中,网络通讯是没有办法保障的。网络延迟以及服务器自己也有可能宕机,这些条件其实都在Paxos的前提假设之中,本程序当中作了一个假设,假设网络通讯有50%的几率失败,其实是我随便指定的;dom
3.Java自己是一门繁琐的语言,因此不免会有很大冗余代码,这主要是为了保证程序的稳定性,还有一部分是出于习惯。Paxos其实并非一个复杂的算法,至少基本的不是。程序中所采用的的面向对象的设计也不必定是合理的;分布式
4.本程序依赖了Google的guava包以及common-lang3的包,实际中彻底能够去掉。guava主要是在重载hashCode()方法的时候用到,但程序中并无使用hashCode的地方,实际只是由于在重载equals()方法的时候老是应该重载hashCode()方法。lang3主要作了字符串比较,应对字符串为空的状况。除此以外没有任何依赖。ide
Java实现参考:
public final class PaxosDemo { private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(); private static final Random RANDOM = new Random(); private static final String[] PROPOSALS = {"ProjectA", "ProjectB", "ProjectC"}; public static void main(String[] args) { List<Acceptor> acceptors = new ArrayList<Acceptor>(); Arrays.asList("A", "B", "C", "D", "E") .forEach(name -> acceptors.add(new Acceptor(name))); Proposer.vote(new Proposal(1L, null), acceptors); } private static void printInfo(String subject, String operation, String result) { System.out.println(subject + ":" + operation + "<" + result + ">"); } /** * 对于提案的约束,第三条约束要求: * 若是maxVote不存在,那么没有限制,下一次表决可使用任意提案; * 不然,下一次表决要沿用maxVote的提案 * * @param currentVoteNumber * @param proposals * @return */ private static Proposal nextProposal(long currentVoteNumber, List<Proposal> proposals) { long voteNumber = currentVoteNumber + 1; if (proposals.isEmpty()) return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]); Collections.sort(proposals); Proposal maxVote = proposals.get(proposals.size() - 1); long maxVoteNumber = maxVote.getVoteNumber(); String content = maxVote.getContent(); if (maxVoteNumber >= currentVoteNumber) throw new IllegalStateException("illegal state maxVoteNumber"); if (content != null) return new Proposal(voteNumber, content); else return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]); } private static class Proposer { /** * @param proposal * @param acceptors */ public static void vote(Proposal proposal, Collection<Acceptor> acceptors) { int quorum = Math.floorDiv(acceptors.size(), 2) + 1; int count = 0; while (true) { printInfo("VOTE_ROUND", "START", ++count + ""); List<Proposal> proposals = new ArrayList<Proposal>(); for (Acceptor acceptor : acceptors) { Promise promise = acceptor.onPrepare(proposal); if (promise != null && promise.isAck()) proposals.add(promise.getProposal()); } if (proposals.size() < quorum) { printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT PREPARED"); proposal = nextProposal(proposal.getVoteNumber(), proposals); continue; } int acceptCount = 0; for (Acceptor acceptor : acceptors) { if (acceptor.onAccept(proposal)) acceptCount++; } if (acceptCount < quorum) { printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT ACCEPTED"); proposal = nextProposal(proposal.getVoteNumber(), proposals); continue; } break; } printInfo("PROPOSER[" + proposal + "]", "VOTE", "SUCCESS"); } } private static class Acceptor { //上次表决结果 private Proposal last = new Proposal(); private String name; public Acceptor(String name) { this.name = name; } public Promise onPrepare(Proposal proposal) { //假设这个过程有50%的概率失败 if (Math.random() - 0.5 > 0) { printInfo("ACCEPTER_" + name, "PREPARE", "NO RESPONSE"); return null; } if (proposal == null) throw new IllegalArgumentException("null proposal"); if (proposal.getVoteNumber() > last.getVoteNumber()) { Promise response = new Promise(true, last); last = proposal; printInfo("ACCEPTER_" + name, "PREPARE", "OK"); return response; } else { printInfo("ACCEPTER_" + name, "PREPARE", "REJECTED"); return new Promise(false, null); } } public boolean onAccept(Proposal proposal) { //假设这个过程有50%的概率失败 if (Math.random() - 0.5 > 0) { printInfo("ACCEPTER_" + name, "ACCEPT", "NO RESPONSE"); return false; } printInfo("ACCEPTER_" + name, "ACCEPT", "OK"); return last.equals(proposal); } } private static class Promise { private final boolean ack; private final Proposal proposal; public Promise(boolean ack, Proposal proposal) { this.ack = ack; this.proposal = proposal; } public boolean isAck() { return ack; } public Proposal getProposal() { return proposal; } } private static class Proposal implements Comparable<Proposal> { private final long voteNumber; private final String content; public Proposal(long voteNumber, String content) { this.voteNumber = voteNumber; this.content = content; } public Proposal() { this(0, null); } public long getVoteNumber() { return voteNumber; } public String getContent() { return content; } @Override public int compareTo(Proposal o) { return Long.compare(voteNumber, o.voteNumber); } @Override public boolean equals(Object obj) { if (obj == null) return false; if (!(obj instanceof Proposal)) return false; Proposal proposal = (Proposal) obj; return voteNumber == proposal.voteNumber && StringUtils.equals(content, proposal.content); } @Override public int hashCode() { return HASH_FUNCTION .newHasher() .putLong(voteNumber) .putString(content, Charsets.UTF_8) .hash() .asInt(); } @Override public String toString() { return new StringBuilder() .append(voteNumber) .append(':') .append(content) .toString(); } } }
如下是我试着运行了一下程序的结果:
VOTE_ROUND:START<1> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<NO RESPONSE> PROPOSER[1:null]:VOTE<NOT PREPARED> VOTE_ROUND:START<2> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<OK> ACCEPTER_E:ACCEPT<OK> PROPOSER[2:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<3> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[3:ProjectB]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<4> ACCEPTER_A:PREPARE<NO RESPONSE> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<OK> ACCEPTER_E:ACCEPT<OK> PROPOSER[4:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<5> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<NO RESPONSE> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<NO RESPONSE> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[5:ProjectB]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<6> ACCEPTER_A:PREPARE<NO RESPONSE> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<NO RESPONSE> ACCEPTER_A:ACCEPT<NO RESPONSE> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[6:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<7> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<NO RESPONSE> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<OK> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[7:ProjectB]:VOTE<SUCCESS>
通过7轮表决终于达成一致.