朴素Paxos(Basic Paxos)算法java简易实现

Paxos算法是大名鼎鼎的Zookeeper中采用的选举Leader的算法,事实上,在涉及到分布式系统的一致性的时候,就只有一种算法,那就是Paxos.java

首先来看,Paxos是为了解决什么问题:算法

Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景是,在一个分布式数据库存储中,若是各节点的初始状态一致,每一个节点执行相同的操做序列,那么他们最后能获得一个一致的状态。为保证每一个节点执行相同的命令序列,须要在每一条指令上执行一个“一致性算法”以保证每一个节点看到的指令一致。一个通用的一致性算法能够应用在许多场景中,是分布式计算中的重要问题。所以从20世纪80年代起对于一致性算法的研究就没有中止过。节点通讯存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。Paxos 算法就是一种基于消息传递模型的一致性算法。数据库

本文的实现参考了LynnCui在知乎上的文章:https://zhuanlan.zhihu.com/p/21438357promise

有关Paxos的详细介绍及推理过程都可参考上述文章,若是还有不明白,能够查阅相关文章,这一类的文章网上仍是比较多的。服务器

关于个人实现,我这里有几点须要说明:网络

1.关于maxVote(即比本次表决编号小的最大表决编号),若是该编号存在,则应该用改编号对应表决的提案做为下一次的提案,不然,则能够随机指定提案,具体的提案的指定方式依赖于需求逻辑,本程序中采用随机提案;app

2.Paxos假设消息的传递过程是不可靠的,这主要是由于实际环境中,网络通讯是没有办法保障的。网络延迟以及服务器自己也有可能宕机,这些条件其实都在Paxos的前提假设之中,本程序当中作了一个假设,假设网络通讯有50%的几率失败,其实是我随便指定的;dom

3.Java自己是一门繁琐的语言,因此不免会有很大冗余代码,这主要是为了保证程序的稳定性,还有一部分是出于习惯。Paxos其实并非一个复杂的算法,至少基本的不是。程序中所采用的的面向对象的设计也不必定是合理的;分布式

4.本程序依赖了Google的guava包以及common-lang3的包,实际中彻底能够去掉。guava主要是在重载hashCode()方法的时候用到,但程序中并无使用hashCode的地方,实际只是由于在重载equals()方法的时候老是应该重载hashCode()方法。lang3主要作了字符串比较,应对字符串为空的状况。除此以外没有任何依赖。ide

Java实现参考:

public final class PaxosDemo {

    private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
    private static final Random RANDOM = new Random();
    private static final String[] PROPOSALS = {"ProjectA", "ProjectB", "ProjectC"};

    public static void main(String[] args) {
        List<Acceptor> acceptors = new ArrayList<Acceptor>();
        Arrays.asList("A", "B", "C", "D", "E")
                .forEach(name -> acceptors.add(new Acceptor(name)));
        Proposer.vote(new Proposal(1L, null), acceptors);
    }

    private static void printInfo(String subject, String operation, String result) {
        System.out.println(subject + ":" + operation + "<" + result + ">");
    }

    /**
     * 对于提案的约束,第三条约束要求:
     * 若是maxVote不存在,那么没有限制,下一次表决可使用任意提案;
     * 不然,下一次表决要沿用maxVote的提案
     *
     * @param currentVoteNumber
     * @param proposals
     * @return
     */
    private static Proposal nextProposal(long currentVoteNumber, List<Proposal> proposals) {
        long voteNumber = currentVoteNumber + 1;
        if (proposals.isEmpty())
            return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
        Collections.sort(proposals);
        Proposal maxVote = proposals.get(proposals.size() - 1);
        long maxVoteNumber = maxVote.getVoteNumber();
        String content = maxVote.getContent();
        if (maxVoteNumber >= currentVoteNumber)
            throw new IllegalStateException("illegal state maxVoteNumber");
        if (content != null)
            return new Proposal(voteNumber, content);
        else return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
    }


    private static class Proposer {

        /**
         * @param proposal
         * @param acceptors
         */
        public static void vote(Proposal proposal, Collection<Acceptor> acceptors) {
            int quorum = Math.floorDiv(acceptors.size(), 2) + 1;
            int count = 0;
            while (true) {
                printInfo("VOTE_ROUND", "START", ++count + "");
                List<Proposal> proposals = new ArrayList<Proposal>();
                for (Acceptor acceptor : acceptors) {
                    Promise promise = acceptor.onPrepare(proposal);
                    if (promise != null && promise.isAck())
                        proposals.add(promise.getProposal());
                }
                if (proposals.size() < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT PREPARED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                int acceptCount = 0;
                for (Acceptor acceptor : acceptors) {
                    if (acceptor.onAccept(proposal))
                        acceptCount++;
                }
                if (acceptCount < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT ACCEPTED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                break;
            }
            printInfo("PROPOSER[" + proposal + "]", "VOTE", "SUCCESS");
        }


    }

    private static class Acceptor {

        //上次表决结果
        private Proposal last = new Proposal();
        private String name;

        public Acceptor(String name) {
            this.name = name;
        }

        public Promise onPrepare(Proposal proposal) {
            //假设这个过程有50%的概率失败
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "PREPARE", "NO RESPONSE");
                return null;
            }
            if (proposal == null)
                throw new IllegalArgumentException("null proposal");
            if (proposal.getVoteNumber() > last.getVoteNumber()) {
                Promise response = new Promise(true, last);
                last = proposal;
                printInfo("ACCEPTER_" + name, "PREPARE", "OK");
                return response;
            } else {
                printInfo("ACCEPTER_" + name, "PREPARE", "REJECTED");
                return new Promise(false, null);
            }
        }

        public boolean onAccept(Proposal proposal) {
            //假设这个过程有50%的概率失败
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "ACCEPT", "NO RESPONSE");
                return false;
            }
            printInfo("ACCEPTER_" + name, "ACCEPT", "OK");
            return last.equals(proposal);
        }
    }

    private static class Promise {

        private final boolean ack;
        private final Proposal proposal;

        public Promise(boolean ack, Proposal proposal) {
            this.ack = ack;
            this.proposal = proposal;
        }

        public boolean isAck() {
            return ack;
        }

        public Proposal getProposal() {
            return proposal;
        }
    }

    private static class Proposal implements Comparable<Proposal> {

        private final long voteNumber;
        private final String content;

        public Proposal(long voteNumber, String content) {
            this.voteNumber = voteNumber;
            this.content = content;
        }

        public Proposal() {
            this(0, null);
        }

        public long getVoteNumber() {
            return voteNumber;
        }

        public String getContent() {
            return content;
        }

        @Override
        public int compareTo(Proposal o) {
            return Long.compare(voteNumber, o.voteNumber);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null)
                return false;
            if (!(obj instanceof Proposal))
                return false;
            Proposal proposal = (Proposal) obj;
            return voteNumber == proposal.voteNumber && StringUtils.equals(content, proposal.content);
        }

        @Override
        public int hashCode() {
            return HASH_FUNCTION
                    .newHasher()
                    .putLong(voteNumber)
                    .putString(content, Charsets.UTF_8)
                    .hash()
                    .asInt();
        }

        @Override
        public String toString() {
            return new StringBuilder()
                    .append(voteNumber)
                    .append(':')
                    .append(content)
                    .toString();
        }
    }

}

如下是我试着运行了一下程序的结果:

VOTE_ROUND:START<1>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<NO RESPONSE>
PROPOSER[1:null]:VOTE<NOT PREPARED>
VOTE_ROUND:START<2>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<OK>
ACCEPTER_E:ACCEPT<OK>
PROPOSER[2:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<3>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[3:ProjectB]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<4>
ACCEPTER_A:PREPARE<NO RESPONSE>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<OK>
ACCEPTER_E:ACCEPT<OK>
PROPOSER[4:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<5>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<NO RESPONSE>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<NO RESPONSE>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[5:ProjectB]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<6>
ACCEPTER_A:PREPARE<NO RESPONSE>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<NO RESPONSE>
ACCEPTER_A:ACCEPT<NO RESPONSE>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[6:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<7>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<NO RESPONSE>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<OK>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[7:ProjectB]:VOTE<SUCCESS>

通过7轮表决终于达成一致.

相关文章
相关标签/搜索