DistributedAtomicide
public class DistributedAtomic { static CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "/atomic"; public static void distributedAtomicInteger() { CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { @Override public void run() { DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(client, path, new ExponentialBackoffRetry(100, 50)); try { countDownLatch.countDown(); countDownLatch.await(); AtomicValue<Integer> increment = distributedAtomicInteger.increment(); System.out.println("提交前:" + increment.preValue() + ",提交后:" + increment.postValue()); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) { DistributedAtomic.distributedAtomicInteger(); } }
运行结果以下:oop
大概流程:post
private AtomicValue<Integer> worker(final Integer addAmount) throws Exception { Preconditions.checkNotNull(addAmount, "addAmount cannot be null"); // 判断是否有节点,没有节点则赋值为0,有则转换为数字相加 MakeValue makeValue = new MakeValue() { @Override public byte[] makeFrom(byte[] previous) { int previousValue = (previous != null) ? bytesToValue(previous) : 0; int newValue = previousValue + addAmount; return valueToBytes(newValue); } }; AtomicValue<byte[]> result = value.trySet(makeValue); return new AtomicInteger(result); }
尝试设置值atom
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception { MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false); tryOptimistic(result, makeValue); if ( !result.succeeded() && (mutex != null) ) { // 没成功,再经过mutex获取锁重试 tryWithMutex(result, makeValue); } return result; }
重试机制下看可否设值成功spa
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception { long startMs = System.currentTimeMillis(); int retryCount = 0; boolean done = false; while ( !done ) { result.stats.incrementOptimisticTries(); // 尝试成功,跳出循环 if ( tryOnce(result, makeValue) ) { result.succeeded = true; done = true; } else { // 没成功,重试机制再试 if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) ) { done = true; } } } result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs); }
尝试设值code
private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception { Stat stat = new Stat(); // 获取当前值 boolean createIt = getCurrentValue(result, stat); boolean success = false; try { // 返回新的值 byte[] newValue = makeValue.makeFrom(result.preValue); if ( createIt ) { // 若是没有节点,则建立 client.create().creatingParentContainersIfNeeded().forPath(path, newValue); } else { // 若是有节点,则经过版本号设值 client.setData().withVersion(stat.getVersion()).forPath(path, newValue); } result.postValue = Arrays.copyOf(newValue, newValue.length); success = true; } catch ( KeeperException.NodeExistsException e ) { // do Retry } catch ( KeeperException.BadVersionException e ) { // do Retry } catch ( KeeperException.NoNodeException e ) { // do Retry } return success; }