为了提高系统的性能,进一步提升系统的吞吐能力,最近公司不少系统都在进行异步化改造。在异步化改造的过程当中,确定会比之前碰到更多的多线程问题,上周就碰到ZooKeeper客户端异步化过程当中的一个死锁问题,这里说明下。java
一般ZooKeeper对于同一个API,提供了同步和异步两种调用方式。
同步接口很容易理解,使用方法以下:node
ZooKeeper zk = new ZooKeeper(...); List children = zk.getChildren( path, true );
异步接口就相对复杂一点,使用方法以下:apache
ZooKeeper zk = new ZooKeeper(...); zk.getChildren( path, true, new AsyncCallback.Children2Callback() { @Override public void proce***esult( int rc, String path, Object ctx, List children, Stat stat ) { System.out.println( "Recive the response." ); } }, null);
咱们能够看到,异步调用中,须要注册一个Children2Callback,并实现回调方法:proce***esult。服务器
上周碰到这样的问题:应用注册了对某znode子节点列表变化的监听,逻辑是在接受到ZooKeeper服务器节点列表变动通知(EventType.NodeChildrenChanged)的时候,会从新获取一次子节点列表。以前,他们是使用同步接口,整个应用能够正常运行,可是此次异步化改造后,出现了诡异现象,可以收到子节点的变动通知,可是没法从新获取子节点列表了。session
下面,我首先把应用以前使用同步接口的逻辑代码,用一个简单的demo来演示下,以下:多线程
package book.chapter05; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; /** * ZooKeeper API 获取子节点列表,使用同步(sync)接口。 * @author <a href="mailto:nileader@gmail.com">银时</a> */ public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher { private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); private static CountDownLatch _semaphore = new CountDownLatch( 1 ); private ZooKeeper zk; ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException { ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher ); try { connectedSemaphore.await(); } catch ( InterruptedException e ) { } return zookeeper; } /** create path by sync */ void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException { if ( zk == null ) { zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this ); } zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode ); } /** Get children znodes of path and set watches */ List getChildren( String path ) throws KeeperException, InterruptedException, IOException{ System.out.println( "===Start to get children znodes.===" ); if ( zk == null ) { zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this ); } return zk.getChildren( path, true ); } public static void main( String[] args ) throws IOException, InterruptedException { ZooKeeper_GetChildren_API_Sync_Usage sample = new ZooKeeper_GetChildren_API_Sync_Usage(); String path = "/get_children_test"; try { sample.createPath_sync( path, "", CreateMode.PERSISTENT ); sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT ); List childrenList = sample.getChildren( path ); System.out.println( childrenList ); //Add a new child znode to test watches event notify. sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT ); _semaphore.await(); } catch ( KeeperException e ) { System.err.println( "error: " + e.getMessage() ); e.printStackTrace(); } } /** * Process when receive watched event */ @Override public void process( WatchedEvent event ) { System.out.println( "Receive watched event:" + event ); if ( KeeperState.SyncConnected == event.getState() ) { if( EventType.None == event.getType() && null == event.getPath() ){ connectedSemaphore.countDown(); }else if( event.getType() == EventType.NodeChildrenChanged ){ //children list changed try { System.out.println( this.getChildren( event.getPath() ) ); _semaphore.countDown(); } catch ( Exception e ) {} } } } }
输出结果以下:dom
Receive watched event:WatchedEvent state:SyncConnected type:None path:null ===Start to get children znodes.=== [c1] Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test ===Start to get children znodes.=== [c1, c2]
在上面这个程序中,咱们首先建立了一个父节点:/get_children_test,以及一个子节点:/get_children_test/c1。而后调用getChildren的同步接口来获取/get_children_test节点下的全部子节点,调用的同时注册一个watches。以后,咱们继续向/get_children_test节点建立子节点:/get_children_test/c2,这个时候,由于咱们以前咱们注册了一个watches,所以,一旦此时有子节点被建立,ZooKeeperServer就会向客户端发出“子节点变动”的通知,因而,客户端能够再次调用getChildren方法来获取新的子节点列表。异步
这个例子固然是可以正常运行的。如今,咱们进行异步化改造,以下:ide
package book.chapter05; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; /** * ZooKeeper API 获取子节点列表,使用异步(ASync)接口。 * @author <a href="mailto:nileader@gmail.com">银时</a> */ public class ZooKeeper_GetChildren_API_ASync_Usage_Deadlock implements Watcher { private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); private static CountDownLatch _semaphore = new CountDownLatch( 1 ); private ZooKeeper zk; ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException { ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher ); try { connectedSemaphore.await(); } catch ( InterruptedException e ) { } return zookeeper; } /** create path by sync */ void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException { if ( zk == null ) { zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this ); } zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode ); } /** Get children znodes of path and set watches */ void getChildren( String path ) throws KeeperException, InterruptedException, IOException{ System.out.println( "===Start to get children znodes.===" ); if ( zk == null ) { zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this ); } final CountDownLatch _semaphore_get_children = new CountDownLatch( 1 ); zk.getChildren( path, true, new AsyncCallback.Children2Callback() { @Override public void proce***esult( int rc, String path, Object ctx, List children, Stat stat ) { System.out.println( "Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat ); _semaphore_get_children.countDown(); } }, null); _semaphore_get_children.await(); } public static void main( String[] args ) throws IOException, InterruptedException { ZooKeeper_GetChildren_API_ASync_Usage_Deadlock sample = new ZooKeeper_GetChildren_API_ASync_Usage_Deadlock(); String path = "/get_children_test"; try { sample.createPath_sync( path, "", CreateMode.PERSISTENT ); sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT ); //Get children and register watches. sample.getChildren( path ); //Add a new child znode to test watches event notify. sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT ); _semaphore.await(); } catch ( KeeperException e ) { System.err.println( "error: " + e.getMessage() ); e.printStackTrace(); } } /** * Process when receive watched event */ @Override public void process( WatchedEvent event ) { System.out.println( "Receive watched event:" + event ); if ( KeeperState.SyncConnected == event.getState() ) { if( EventType.None == event.getType() && null == event.getPath() ){ connectedSemaphore.countDown(); }else if( event.getType() == EventType.NodeChildrenChanged ){ //children list changed try { this.getChildren( event.getPath() ); _semaphore.countDown(); } catch ( Exception e ) { e.printStackTrace(); } } } } }
输出结果以下:性能
Receive watched event:WatchedEvent state:SyncConnected type:None path:null ===Start to get children znodes.=== Get Children znode result: [response code: 0, param path: /get_children_test, ctx: null, children list: [c1], stat: 555,555,1373931727380,1373931727380,0,1,0,0,0,1,556 Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test ===Start to get children znodes.===
在上面这个demo中,执行逻辑和以前的同步版本基本一致,惟一有区别的地方在于获取子节点列表的过程异步化了。这样一改造,问题就出来了,整个程序在进行第二次获取节点列表的时候,卡住了。和应用方确认了,以前同步版本历来没有出现过这个现象的,因此开始排查这个异步化中哪里会阻塞。
这里,咱们重点讲解在ZooKeeper客户端中,须要处理来自服务端的两类事件通知:一类是Watches时间通知,另外一类则是异步接口调用的响应。值得一提的是,在ZooKeeper的客户端线程模型中,这两个事件由同一个线程处理,而且是串行处理。具体能够本身查看事件处理的核心类:org.apache.zookeeper.ClientCnxn.EventThread。