目录html
疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -24【 博客园 总入口 】node
你们好,我是做者尼恩。目前和几个小伙伴一块儿,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战面试
前面,已经完成一个高性能的 Java 聊天程序的四件大事:apache
接下来,须要进入到分布式开发的环节了。 分布式的中间件,疯狂创客圈的小伙伴们,一致的选择了zookeeper,不单单是因为其在大数据领域,太有名了。更重要的是,不少的著名框架,都使用了zk。api
本篇介绍 Curator客户端的基本操做。服务器
打开Curator的官网,咱们能够看到,Curator包含了如下几个包:session
curator-framework:对zookeeper的底层api的一些封装;并发
curator-client:提供一些客户端的操做,例如重试策略等;框架
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。异步
Maven依赖(使用curator的版本:4.0.0,对应Zookeeper的版本为:3.4.x,若是版本不匹配,就会有兼容性问题,颇有可能致使节点操做失败。具体的版本对应关系,能够去curator的官网查看。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency>
使用 curator-framework 包中的工厂类CuratorFrameworkFactory中的静态方法newClient,来建立客户端会话。
代码以下:
/** * create by 尼恩 @ 疯狂创客圈 **/ public class ClientFactory { /** * @param connectionString zk的链接地址 * @return CuratorFramework 实例 */ public static CuratorFramework createSimple(String connectionString) { // 重试策略:第一次重试等待1s,第二次重试等待2s,第三次重试等待4s // 第一个参数:等待时间的基础单位,单位为毫秒 // 第二个参数:最大重试次数 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 获取 CuratorFramework 实例的最简单的方式 // 第一个参数:zk的链接地址 // 第二个参数:重试策略 return CuratorFrameworkFactory.newClient(connectionString, retryPolicy); } /** * @param connectionString zk的链接地址 * @param retryPolicy 重试策略 * @param connectionTimeoutMs 链接 * @param sessionTimeoutMs * @return CuratorFramework 实例 */ public static CuratorFramework createWithOptions( String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // builder 模式建立 CuratorFramework 实例 return CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // 其余的建立选项 .build(); } }
这里用到两个版本,前一个是简化版本,只须要设置ZK集群的链接地址和重试策略。
后一个是相对复杂的重载版本,能够设置链接超时connectionTimeoutMs、会话超时sessionTimeoutMs 等其余的会话建立选项。
具体请看疯狂创客圈的Demo源码。
使用create()方法,最后使用forPath带上须要建立的节点路径。
/** * 建立节点 */ @Test public void createNode() { //建立客户端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //启动客户端实例,链接服务器 client.start(); // 建立一个 ZNode 节点 // 节点的数据为 payload String data = "hello"; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/CRUD/node-1"; client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(zkPath, payload); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
使用withMode()方法,设置节点的类型。zookeeper节点有四种类型:
(1)PERSISTENT 持久节点
(2)PERSISTENT_SEQUENTIAL 持久顺序节点
(3)PHEMERAL 临时节
(4)EPHEMERAL_SEQUENTIAL 临时顺序节点
下面详细介绍一下四种节点的区别和联系。
(1)持久节点(PERSISTENT)
所谓持久节点,是指在节点建立后,就一直存在,直到有删除操做来主动清除这个节点。持久节点的生命周期是永久有效,不会由于建立该节点的客户端会话失效而消失。
(2)持久顺序节点(PERSISTENT_SEQUENTIAL)
这类节点的生命周期和持久节点是一致的。额外的特性是,在ZK中,每一个父节点会为他的第一级子节点维护一份次序,会记录每一个子节点建立的前后顺序。若是在建立子节点的时候,能够设置这个属性,那么在建立节点过程当中,ZK会自动为给定节点名加上一个表示次序的数字后缀,做为新的节点名。这个次序后缀的范围是整型的最大值。
好比,在建立节点的时候只须要传入节点 “/test_”,这样以后,zookeeper自动会给”test_”后面补充数字次序。
(3)临时节点(EPHEMERAL)
和持久节点不一样的是,临时节点的生命周期和客户端会话绑定。也就是说,若是客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非链接断开。这里还要注意一件事,就是当你客户端会话失效后,所产生的节点也不是一会儿就消失了,也要过一段时间,大概是10秒之内,能够试一下,本机操做生成节点,在服务器端用命令来查看当前的节点数目,你会发现客户端已经stop,可是产生的节点还在。
另外,在临时节点下面不能建立子节点。
(4)临时顺序节点(EPHEMERAL_SEQUENTIAL)
此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。
与节点读取的有关的方法,主要有三个:
(1)首先是判断节点是否存在,使用checkExists方法。
(2)其次是获取节点的数据,使用getData方法。
(3)最后是获取子节点列表,使用getChildren方法。
演示代码以下:
/** * 读取节点 */ @Test public void readNode() { //建立客户端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //启动客户端实例,链接服务器 client.start(); String zkPath = "/test/CRUD/node-1"; Stat stat = client.checkExists().forPath(zkPath); if (null != stat) { //读取节点的数据 byte[] payload = client.getData().forPath(zkPath); String data = new String(payload, "UTF-8"); log.info("read data:", data); String parentPath = "/test"; List<String> children = client.getChildren().forPath(parentPath); for (String child : children) { log.info("child:", child); } } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
节点的更新,分为同步更新与异步更新。
/** * 更新节点 */ @Test public void updateNode() { //建立客户端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //启动客户端实例,链接服务器 client.start(); String data = "hello world"; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/node-1"; client.setData() .forPath(zkPath, payload); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
异步更新,须要用到inBackground()方法,其做用是,让更新操做异步执行。若是须要监听到异步操做的结果,须要为inBackground加上AsyncCallback回调实例。
异步更新的代码以下:
/** * 更新节点 - 异步模式 */ @Test public void updateNodeAsync() { //建立客户端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //更新完成监听器 AsyncCallback.StringCallback callback = new AsyncCallback.StringCallback() { @Override public void processResult(int i, String s, Object o, String s1) { System.out.println( "i = " + i + " | " + "s = " + s + " | " + "o = " + o + " | " + "s1 = " + s1 ); } }; //启动客户端实例,链接服务器 client.start(); String data = "hello ,every body! "; byte[] payload = data.getBytes("UTF-8"); String zkPath = "/test/CRUD/node-1"; client.setData() .inBackground(callback) .forPath(zkPath, payload); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
删除节点,使用delete 方法,代码以下。
/** * 删除节点 */ @Test public void deleteNode() { //建立客户端 CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS); try { //启动客户端实例,链接服务器 client.start(); //删除节点 String zkPath = "/test/CRUD/node-1"; client.delete().forPath(zkPath); //删除后查看结果 String parentPath = "/test"; List<String> children = client.getChildren().forPath(parentPath); for (String child : children) { log.info("child:", child); } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } }
和更新同样,也能够进行异步删除,一样须要用到inBackground()方法。若是须要监听异步操做的结果,须要为inBackground方法加上一个参数:AsyncCallback回调实例。
下一篇:开启zk的客户端开发。
Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战
疯狂创客圈 【 博客园 总入口 】