目标:介绍基于zookeeper的来实现的远程通讯、介绍dubbo-remoting-zookeeper内的源码解析。java
对于zookeeper我相信确定不陌生,在以前的文章里面也有讲到zookeeper来做为注册中心。在这里,基于zookeeper来实现远程通信,duubo封装了zookeeper client,来和zookeeper server通信。git
下面是类图:github
public interface ZookeeperClient {
/** * 建立client * @param path * @param ephemeral */
void create(String path, boolean ephemeral);
/** * 删除client * @param path */
void delete(String path);
/** * 得到子节点集合 * @param path * @return */
List<String> getChildren(String path);
/** * 向zookeeper的该节点发起订阅,得到该节点全部 * @param path * @param listener * @return */
List<String> addChildListener(String path, ChildListener listener);
/** * 移除该节点的子节点监听器 * @param path * @param listener */
void removeChildListener(String path, ChildListener listener);
/** * 新增状态监听器 * @param listener */
void addStateListener(StateListener listener);
/** * 移除状态监听 * @param listener */
void removeStateListener(StateListener listener);
/** * 判断是否链接 * @return */
boolean isConnected();
/** * 关闭客户端 */
void close();
/** * 得到url * @return */
URL getUrl();
}
复制代码
该接口是基于zookeeper的客户端接口,其中封装了客户端的一些方法。api
该类实现了ZookeeperClient接口,是客户端的抽象类,它实现了一些公共逻辑,把具体的doClose、createPersistent等方法抽象出来,留给子类来实现。服务器
/** * url对象 */
private final URL url;
/** * 状态监听器集合 */
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
/** * 客户端监听器集合 */
private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
/** * 是否关闭 */
private volatile boolean closed = false;
复制代码
@Override
public void create(String path, boolean ephemeral) {
// 若是不是临时节点
if (!ephemeral) {
// 判断该客户端是否存在
if (checkExists(path)) {
return;
}
}
// 得到/的位置
int i = path.lastIndexOf('/');
if (i > 0) {
// 建立客户端
create(path.substring(0, i), false);
}
// 若是是临时节点
if (ephemeral) {
// 建立临时节点
createEphemeral(path);
} else {
// 递归建立节点
createPersistent(path);
}
}
复制代码
该方法是建立客户端的方法,其中createEphemeral和createPersistent方法都被抽象出来。具体看下面的类的介绍。app
@Override
public void addStateListener(StateListener listener) {
// 状态监听器加入集合
stateListeners.add(listener);
}
复制代码
该方法就是增长状态监听器。框架
@Override
public void close() {
if (closed) {
return;
}
closed = true;
try {
// 关闭
doClose();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
复制代码
该方法是关闭客户端,其中doClose方法也被抽象出。ide
/** * 关闭客户端 */
protected abstract void doClose();
/** * 递归建立节点 * @param path */
protected abstract void createPersistent(String path);
/** * 建立临时节点 * @param path */
protected abstract void createEphemeral(String path);
/** * 检测该节点是否存在 * @param path * @return */
protected abstract boolean checkExists(String path);
/** * 建立子节点监听器 * @param path * @param listener * @return */
protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener);
/** * 为子节点添加监听器 * @param path * @param listener * @return */
protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener);
/** * 移除子节点监听器 * @param path * @param listener */
protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
复制代码
上述的方法都是被抽象的,又它的两个子类来实现。函数
该类继承了AbstractZookeeperClient,是zk客户端的实现类。源码分析
/** * zk客户端包装类 */
private final ZkClientWrapper client;
/** * 链接状态 */
private volatile KeeperState state = KeeperState.SyncConnected;
复制代码
该类有两个属性,其中client就是核心所在,几乎全部方法都调用了client的方法。
public ZkclientZookeeperClient(URL url) {
super(url);
// 新建一个zkclient包装类
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
// 增长状态监听
client.addListener(new IZkStateListener() {
/** * 若是状态改变 * @param state * @throws Exception */
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
// 若是状态变为了断开链接
if (state == KeeperState.Disconnected) {
// 则修改状态
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
// 状态变为重连
stateChanged(StateListener.RECONNECTED);
}
});
// 启动客户端
client.start();
}
复制代码
该方法是构造方法,同时在里面也作了建立客户端和启动客户端的操做。其余方法都是实现了父类抽象的方法,而且调用的是client方法,为举个例子:
@Override
public void createPersistent(String path) {
try {
// 递归建立节点
client.createPersistent(path);
} catch (ZkNodeExistsException e) {
}
}
复制代码
该方法是递归场景节点,调用的就是client.createPersistent(path)。
该类是Curator框架提供的一套高级API,简化了ZooKeeper的操做,从而对客户端的实现。
/** * 框架式客户端 */
private final CuratorFramework client;
复制代码
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 工厂建立者
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 建立客户端
client = builder.build();
// 添加监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
// 若是为状态为lost,则改变为未链接
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
// 改变状态为链接
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
// 改变状态为未链接
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
复制代码
该方法是构造方法,一样里面也包含了客户端建立和启动的逻辑。
其余的方法也同样是实现了父类的抽象方法,举个列子:
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
复制代码
@SPI("curator")
public interface ZookeeperTransporter {
/** * 链接服务器 * @param url * @return */
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
复制代码
该方法是zookeeper的信息交换接口。一样也是一个可扩展接口,默认实现CuratorZookeeperTransporter类。
public class ZkclientZookeeperTransporter implements ZookeeperTransporter {
@Override
public ZookeeperClient connect(URL url) {
// 新建ZkclientZookeeperClient实例
return new ZkclientZookeeperClient(url);
}
}
复制代码
该类实现了ZookeeperTransporter,其中就是建立了ZkclientZookeeperClient实例。
public class CuratorZookeeperTransporter implements ZookeeperTransporter {
@Override
public ZookeeperClient connect(URL url) {
// 建立CuratorZookeeperClient实例
return new CuratorZookeeperClient(url);
}
}
复制代码
该接口实现了ZookeeperTransporter,是ZookeeperTransporter默认的实现类,一样也是建立了;对应的CuratorZookeeperClient实例。
该类是zk客户端的包装类。
/** * 超时事件 */
private long timeout;
/** * zk客户端 */
private ZkClient client;
/** * 客户端状态 */
private volatile KeeperState state;
/** * 客户端线程 */
private ListenableFutureTask<ZkClient> listenableFutureTask;
/** * 是否开始 */
private volatile boolean started = false;
复制代码
public ZkClientWrapper(final String serverAddr, long timeout) {
this.timeout = timeout;
listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
@Override
public ZkClient call() throws Exception {
// 建立zk客户端
return new ZkClient(serverAddr, Integer.MAX_VALUE);
}
});
}
复制代码
设置了超时时间和客户端线程。
public void start() {
// 若是客户端没有开启
if (!started) {
// 建立链接线程
Thread connectThread = new Thread(listenableFutureTask);
connectThread.setName("DubboZkclientConnector");
connectThread.setDaemon(true);
// 开启线程
connectThread.start();
try {
// 得到zk客户端
client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
}
started = true;
} else {
logger.warn("Zkclient has already been started!");
}
}
复制代码
该方法是客户端启动方法。
public void addListener(final IZkStateListener listener) {
// 增长监听器
listenableFutureTask.addListener(new Runnable() {
@Override
public void run() {
try {
client = listenableFutureTask.get();
// 增长监听器
client.subscribeStateChanges(listener);
} catch (InterruptedException e) {
logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!");
} catch (ExecutionException e) {
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
}
}
});
}
复制代码
该方法是为客户端添加监听器。
其余方法都是对于 客户端是否还链接的检测,可自行查看代码。
public interface ChildListener {
/** * 子节点修改 * @param path * @param children */
void childChanged(String path, List<String> children);
}
复制代码
该接口是子节点的监听器,当子节点变化的时候会用到。
public interface StateListener {
int DISCONNECTED = 0;
int CONNECTED = 1;
int RECONNECTED = 2;
/** * 状态修改 * @param connected */
void stateChanged(int connected);
}
复制代码
该接口是状态监听器,其中定义了一个状态更改的方法以及三种状态。
该部分相关的源码解析地址:github.com/CrazyHZM/in…
该文章讲解了基于zookeeper的来实现的远程通讯、介绍dubbo-remoting-zookeeper内的源码解析,关键须要对zookeeper有所了解。该篇以后,远程通信的源码解析就先到这里了,其实你们会发现,若是可以对讲解api系列的文章了解透了,那么后面的文章九很简单,就好像轨道铺好,能够直接顺着轨道日后,根本没有阻碍。接下来我将开始对rpc模块进行讲解。