原本想着基于zk临时节点,实现一下分布式锁,结果发现有curator框架。PS:原声API真的难用,连递归建立path都没有?spring
配置curator maven的时候,md配置了好几个小时,最后发现集中定义依赖版本号 我原本都是写数字的,结果到了zookeeper.version ,我居然写了 <zookeeper.version>zookeeper-3.4.7</zookeeper.version> 把英文也写上去了 多是从maven-repository copy过来的 很郁闷。apache
curator提供的可重入分布式锁看起来也没什么可封装的,由于它和ReentrantLock确实很。在须要的地方,new一个,再调用对象的方法就行了。网络
这个锁就是 InterProcessMutex 类,其构造方法须要咱们传入当前CuratorFramework对象,还有要锁定的节点。对了 这个节点是临时节点,再客户端断开链接后,锁不会一直存在,但也不会当即就失去锁,由于ZK须要根据缺省的时间判断你是真的断开了仍是某种网络缘由。多线程
首先说明它是一把可重入锁。注意在当前线程的for循环中,他们都是用的是同一把锁,同把锁才可重入。app
public void fun() { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml"); CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework"); InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws"); for (int i = 0; i < 10; i++) { try { lock.acquire(); System.out.println("yes"); } catch (Exception e) { e.printStackTrace(); } finally { try { //lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
其次模拟分布式环境,在十个线程中各获取锁(锁相同的path),并执行1s的任务,能够发现,多线程被锁同步。框架
public void fun() throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml"); CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework"); InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws"); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { lock.acquire(); System.out.println("yes"); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } System.in.read(); }
acquire支持传递等待超时时间,返回值是boolean类型。表明超时时间内是否成功获取到锁。maven
public void fun() throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml"); CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework"); System.out.println(curatorFramework); InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws"); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { if (lock.acquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("yes"); Thread.sleep(1000); } else { System.out.println("no"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } System.in.read(); }
最后附上spring配置:分布式
<!-- 重连配置 --> <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry"> <constructor-arg index="0" value="1000"/> <constructor-arg index="1" value="3"/> </bean> <bean id="curatorFramework" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start"> <constructor-arg index="0" value="server:port,server:port,server:port"/> <constructor-arg index="1" ref="retryPolicy"/> </bean>