java中有信号量Semaphore控制特定资源的访问数量,在多进程甚至跨服务器跨网络的状况下,咱们能够用reids来实现。
java的Semaphore,查看源码可知道经过设置state,每次被获取state-1,释放+1,等于0就等待,大于0就唤醒其余的线程。在redis中没有办法去唤醒其余的等待进程,因此能够用while循环来判断是否获取到信号量。
在while循环中,有如下几个步骤:java
上面的第五点,是两个集合的交集计算,先看看如下的例子。redis
如下例子中,setkey1有三个元素,one、two、three,setkey2有两个元素,one、two,因此取他们的交集就是one和two,对应的score是相加的。segmentfault
@Test public void test() { Transaction trans = JedisUtils.multi(); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // 1+4=5,2+5=7 trans.zinterstore(dstkey, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
运行结果以下:服务器
[[one,5.0], [two,7.0]]
上面例子的权重很明显是1:1,若是咱们想设置其余值,就须要引入ZParams的wight方法。下面的例子权重是7,8,因此计算规则为17+48=39,27+58=54。网络
@Test public void testWeight() { Transaction trans = JedisUtils.multi(); ZParams params = new ZParams(); params.weights(7, 8); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // 1*7+4*8=39,2*7+5*8=54 trans.zinterstore(dstkey, params, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
运行结果以下:dom
[[one,39.0], [two,54.0]]
除了权重也有最大值,最小值,平均值。这边只演示最大值。下面例子的计算规则为max(1,4)8=48=32,max(2,5)8=58=40。分布式
@Test public void testAggregate() { Transaction trans = JedisUtils.multi(); ZParams params = new ZParams(); params.aggregate(ZParams.Aggregate.MAX); params.weights(7, 8); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // max(1,4)*8=4*8=32,max(2,5)*8=5*8=40 trans.zinterstore(dstkey, params, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
运行结果以下:ide
[[one,32.0], [two,40.0]]
获取锁参考分布式锁的文章ui
static int cnt = 10; static int timeOut = 1000; static int limit = 2; static CountDownLatch countDownLatch = new CountDownLatch(cnt); static String count = "count:"; static String out = "out:"; static String semaphore = "semaphore:"; private static String acquire() { Transaction trans = JedisUtils.multi(); // 移除过时的信号量 trans.zremrangeByScore(out, Long.MIN_VALUE, System.currentTimeMillis() - timeOut); ZParams params = new ZParams(); params.weights(1, 0); // 取交集,若是out的元素过时被移除了,则和semaphore交集后,再赋值给semaphore。 // 相对于semaphore对应的元素也被移除了。 // 若是没有过时,经过out权重为0,也不影响semaphore的分数 trans.zinterstore(semaphore, params, semaphore, out); // 获取自增加的计数器 trans.incr(count); List<Object> results = trans.exec(); // 获取自增加的值 int counter = ((Long) results.get(results.size() - 1)).intValue(); trans = JedisUtils.multi(); String uuid = UUID.randomUUID().toString(); // 添加到信号量集合 trans.zadd(semaphore, counter, uuid); // 添加到超时集合 trans.zadd(out, System.currentTimeMillis(), uuid); // 查看排名 trans.zrank(semaphore, uuid); results = trans.exec(); // 获取当前排名 int result = ((Long) results.get(results.size() - 1)).intValue(); // 在限制的数量之内,则返回uuid if (result < limit) { return uuid; } // 没有在限制的数量之内,说明没获取到,则移除上面新增的值 release(uuid); return null; } private static void release(String uuid) { Transaction trans = JedisUtils.multi(); trans.zrem(semaphore, uuid); trans.zrem(out, uuid); trans.exec(); } @Test public void test() throws InterruptedException { // 锁的时间,达到这个时间就释放 int lockTime = 1; // 锁的超时时间,达到这个时间就放弃获取 long timeOut = 2500; for (int i = 0; i < cnt; i++) { new Thread(new LockThread(i, lockTime, timeOut)).start(); countDownLatch.countDown(); } TimeUnit.SECONDS.sleep(10); } static class LockThread implements Runnable { int lockTime; long timeOut; int idx; public LockThread(int idx, int lockTime, long timeOut) { this.idx = idx; this.lockTime = lockTime; this.timeOut = timeOut; } @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { // 获取锁 String lock = TestDistributedLocking.lock(TestDistributedLocking.lockName, lockTime, timeOut); String uuid = acquire(); // 释放锁 JedisUtils.del(lock); if (null != uuid) { SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(formatter.format(new Date()) + "---" + idx + ":获取到了信号量"); try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } release(uuid); break; } try { // 休眠10毫秒 TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }