Kafka学习笔记(4)----Kafka的Leader Election

1. Zookeeper的基本操做

  zookeeper中的节点能够持久化/有序的两个维度分为四种类型:java

  PERSIST:持久化无序(保存在磁盘中)node

  PERSIST_SEQUENTIAL:持久化有序递增apache

  EPHEMERAL:非持久化的无序的,保存在内存中,当客户端关闭后消失。api

  EPHEMERAL_SEQUENTIAL:非持久有序递增,保存在内存中,当客户端关闭后消失函数

  每一个节点均可以注册Watch操做,用于监听节点的变化,有四种事件类型以下:this

  Created event: Enabled with a call to exists编码

  Deleted event: Enabled with a call to exists, getData, and getChildrenspa

  Changed event: Enabled with a call to exists and getDatacode

  Child event: Enabled with a call to getChildrenxml

  Watch的基本特征是客户端先获得通知,而后才能获得数据,Watch被fire以后就当即取消了,不会再有Watch后续变化,想要监听只能从新注册;

使用原生Zookeeper建立节点和监听节点变化代码以下:

  1. 引入依赖,pom.xml

 <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.13</version>
    </dependency>

  2. 客户端链接类

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立连接,并监听链接状态
        ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("连接客户端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立节点,/parent:节点路径, data.xx:数据,Ids:设置权限CreateNode.PERSISTENT:建立节点类型
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        //监听节点变化
        zooKeeper.exists("/testRoot", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
            }
        });
       System.out.println(parent);
        Thread.sleep(10000000);
    }
}

  运行建立一个持久化的节点。

  查看客户端能够看到:

  

  parent节点建立成功。

  删除parent节点,观察watche变化。

  控制台打印:

  

  表示监听了删除节点事件,此时再在客户端手动建立节点,观察变化

  

  控制台并无打印任何建立信息,说明没有监听到,这就是咱们说的一旦watche被fire以后就会被关闭,此时改造一下代码:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立连接,并监听链接状态
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("连接客户端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立节点
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        //监听节点变化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                   //从新注册监听事件
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  删除节点,再手动建立节点:

  

  控制台打印以下:

  

  这样建立节点的事件就又被从新注册并监听到了。

2. 基于Zookeeper的Leader Election

  1. 抢注Leader节点——非公平模式

  编码流程:

  1. 建立Leader父节点,如/chroot,并将其设置为persist节点

  2. 各客户端经过在/chroot下建立Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral

  3. 若某建立Leader节点成功,则该客户端成功竞选为Leader

  4. 若建立Leader节点失败,则竞选Leader失败,在/chroot/leader节点上注册exist的watch,一旦该节点被删除则得到通知

  5. Leader可经过删除Leader节点来放弃Leader

  6. 若是Leader宕机,因为Leader节点被设置为ephemeral,Leader节点会自行删除。而其它节点因为在Leader节点上注册了watch,故可获得通知,参与下一轮竞选,从而保证总有客户端以Leader角色工做。

  实现代码以下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立连接,并监听链接状态
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("连接客户端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立节点
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

        //监听节点变化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  当存在节点以后,会抛出异常,这样就会致使节点建立不成功,因此只有建立成功的node才能成为leader。使用watcher监听能够在节点被删除或宕机以后来抢占leader.

  2.  先到先得,后者监视前者——公平模式

  1. 建立Leader父节点,如/chroot,并将其设置为persist节点

  2. 各客户端经过在/chroot下建立Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral_sequential

  3. 客户端经过getChildren方法获取/chroot/下全部子节点,若是其注册的节点的id在全部子节点中最小,则当前客户端竞选Leader成功

  4. 不然,在前面一个节点上注册watch,一旦前者被删除,则它获得通知,返回step 3(并不能直接认为本身成为新Leader,由于可能前面的节点只是宕机了)

  5. Leader节点可经过自行删除本身建立的节点以放弃Leader

  代码实现以下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立连接,并监听链接状态
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("连接客户端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立节点
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

        //监听节点变化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  能够看到zk中的parent下多出了三个节点:

  

  默认以node+十个十进制数命名节点名称,数据递增。

  当id在全部子节点中最小,选举成为leader.

3. Leader Election在Curator中的实现

  手下引入Curator依赖,pom.xml以下:

 <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>3.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>3.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>3.2.1</version>
    </dependency>

  1. Curator LeaderLatch特色及api的做用:

  1. 竞选为Leader后,不可自行放弃领导权

  2. 只能经过close方法放弃领导权

  3. 强烈建议增长ConnectionStateListener,当链接SUSPENDED或者LOST时视为丢失领导权

  4. 可经过await方法等待成功获取领导权,并可加入timeout

  5. 可经过hasLeadership方法判断是否为Leader

  6. 可经过getLeader方法获取当前Leader

  7. 可经过getParticipants方法获取当前竞选Leader的参与方

  简单实现:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLeaderLatch {
    public static void main(String[] args) throws Exception {
        //设置重试策略,这里是沉睡一秒后开始重试,重试五次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
        //经过工厂类获取curatorFramework
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
        //leader节点建立
        LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node");
        //监听leader节点
        leaderLatch.addListener(new LeaderLatchListener() {
            //当前节点是leader时回调
            public void isLeader() {
                System.out.println("I am a listener");
            }
            //再也不是leader时回调
            public void notLeader() {
                System.out.println("I am not a  listener");
            }
        });
        //启动
        curatorFramework.start();
        leaderLatch.start();
        Thread.sleep(100000000);
        leaderLatch.close();
        curatorFramework.close();
    }
}

  2. Curator LeaderSelector特色及api的做用:

  1. 竞选Leader成功后回调takeLeadership方法

  2. 可在takeLeadership方法中实现业务逻辑

  3. 一旦takeLeadership方法返回,即视为放弃领导权

  4. 可经过autoRequeue方法循环获取领导权

  5. 可经过hasLeadership方法判断是否为Leader

  6. 可经过getLeader方法获取当前Leader

  7. 可经过getParticipants方法获取当前竞选Leader的参与方

简单实现:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLeaderSelector {
    public static void main(String[] args) throws Exception {
        //设置重试策略,这里是沉睡一秒后开始重试,重试五次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
        //经过工厂类获取curatorFramework
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
        //leader节点建立,监听Leader状态,并在takeLeadership回调函数中作本身的业务逻辑
        LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                Thread.sleep(1000);
                System.out.println("启动了 takeLeadership");
            }
        });
        leaderSelector.autoRequeue();
        leaderSelector.start();
        //启动
        curatorFramework.start();
        Thread.sleep(100000000);
        leaderSelector.close();
        curatorFramework.close();
    }
}

  这里的LeaderSelectorListenerAdapter实现了LeaderSelectorListener接口,源码以下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.curator.framework.recipes.leader;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    public LeaderSelectorListenerAdapter() {
    }
    //当链接失败时,会抛出异常,这样就会中断takeLeadership方法,防止业务逻辑错误操做
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (client.getConnectionStateErrorPolicy().isErrorState(newState)) {
            throw new CancelLeadershipException();
        }
    }
}

4. Kafka的Leader Election

  1. Kafka“各自为政”Leader Election

  每一个Partition的多个Replica同时竞争Leader,这样作的好处是实现起来比较简单,可是一样出现的问题的就是Herd Effect(可能会有不少的leader节点),Zookeeper负载太重,Latency较大(可能会产生不少其余的问题)

  2. Kafka基于Controller的Leader Election

  原理是在整个集群中选举出一个Broker做为Controller,Controller为全部Topic的全部Partition指定Leader及Follower,Kafka经过在zookeeper上建立/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 

  利用Zookeeper的强一致性特性,一个节点只能被一个客户端建立成功,建立成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中全部大小事务。 当leader和zookeeper失去链接时,临时节点会删除,而其余broker会监听该节点的变化,当节点删除时,其余broker会收到事件通知,从新发起leader选举。

  这样作极大缓解Herd Effect问题,减轻Zookeeper负载,Controller与Leader及Follower间经过RPC通讯,高效且实时,可是因为引入Controller增长了复杂度,同时须要考虑Controller的Failover(容错)

相关文章
相关标签/搜索