Redis之上的分布式Java队列

最近学习的势头大涨,码了不少干货。分享给你们参考学习!

经过优锐课的java学习笔记中,了解到关于让咱们使用Redisson Java框架讨论六种不一样类型的基于Redis的分布式队列。java

 

一、在Redis中使用队列

Redis是一个功能强大的工具,支持从字符串和列表到映射和流的许多不一样类型的数据结构。 开发人员将Redis用于多种目的,包括用于数据库,缓存和消息代理。redis

像任何消息代理同样,Redis须要以正确的顺序发送消息。 能够根据消息的年龄或某些其余预约义的优先级等级发送消息。数据库

为了存储这些未决消息,Redis开发人员须要队列数据结构。 Redisson是使用Redis和Java进行分布式编程的框架,它提供了许多分布式数据结构(包括队列)的实现。编程

Redisson经过提供Java API使Redis开发更加容易。 Redisson不须要开发人员学习Redis命令,而是包括全部众所周知的Java接口,例如Queue和BlockingQueue。 Redisson还处理Redis中繁琐的幕后工做,例如链接管理,故障转移处理和数据序列化。缓存

二、基于Redis的分布式Java队列

Redisson提供了Java中基本队列数据结构的多个基于Redis的实现,每种实现都有不一样的功能。 这使能够选择最适合目的的队列类型。服务器

下面,咱们将使用Redisson Java框架讨论六种不一样类型的基于Redis的分布式队列。数据结构

三、队列

Redisson中的RQueue对象实现了java.util.Queue接口。 队列用于须要从最先的最先的元素开始处理(也称为“先进先出”或FIFO)的状况。架构

与普通Java同样,可使用peek()方法检查RQueue的第一个元素,或者使用poll()方法检查和删除RQueue的第一个元素:框架

1 RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
2 
3 queue.add(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();

 

四、阻塞队列

Redisson中的RBlockingQueue对象实现了java.util.BlockingQueue接口。分布式

BlockingQueues是阻塞线程的队列,这些线程试图从空队列中进行轮询,或者试图在已满的队列中插入元素。 该线程将被阻塞,直到另外一个线程将一个元素插入到空队列中,或从完整队列中轮询第一个元素为止。

下面的示例代码演示了RBlockingQueue的正确实例化和使用。 特别是,可使用参数指定对象将等待线程变得可用的时间来调用poll()方法:

1 RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
2 
3 queue.offer(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();
8 
9 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

在故障转移或从新链接到Redis服务器的过程当中,将自动从新预订poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

五、BoundedBlockingQueue

Redisson中的RBoundedBlockingQueue对象实现了有界的阻塞队列结构。 有界阻塞队列是容量已受限制(即有限)的阻塞队列。

如下代码演示了如何在Redisson中实例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用于尝试设置阻塞队列的容量。 trySetCapacity()返回布尔值“ true”或“ false”,这取决因而否成功设置了容量或是否已经设置了容量:

 1 RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
 2 
 3 queue.trySetCapacity(2);
 4 
 5 queue.offer(new SomeObject(1));
 6 
 7 queue.offer(new SomeObject(2));
 8 
 9 // will be blocked until free space available in queue
10 
11 queue.put(new SomeObject());
12 
13 SomeObject obj = queue.peek();
14 
15 SomeObject someObj = queue.poll();
16 
17 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

六、延迟排队

Redisson中的RDelayedQueue对象容许Redis中实现延迟队列。 当使用诸如指数补偿的策略将消息传递给消费者时,这可能会颇有用。 每次尝试发送邮件失败后,重试之间的时间将成倍增长。

在与元素一块儿指定的延迟以后,延迟队列中的每一个元素将被转移到目标队列。 此目标队列能够是实现RQueue接口的任何队列,例如RBlockingQueue或RBoundedBlockingQueue。

 1 RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
 2 
 3 RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
 4 
 5 // move object to destinationQueue in 10 seconds
 6 
 7 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
 8 
 9 // move object to destinationQueue in 1 minute
10 
11 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

 

在再也不须要队列以后,经过使用destroy()方法销毁延迟的队列是一个好主意。 可是,若是要关闭Redisson,则没有必要。

七、PriorityQueue

Redisson中的RPriorityQueue对象实现了java.util.Queue接口。 优先级队列是否是按元素的使用期限而是按照与每一个元素相关联的优先级排序的队列。

以下面的示例代码所示,RPriorityQueue使用比较器对队列中的元素进行排序:

 1 RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.poll();

 

八、PriorityBlockingQueue

Redisson中的RPriorityBlockingQueue对象结合了RPriorityQueue和RBlockingQueue的功能。 与RPriorityQueue同样,RPriorityBlockingQueue也使用Comparator对队列中的元素进行排序。

 1 RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.take();

 

在故障转移或从新链接到Redis服务器的过程当中,将自动从新预订poll(),pollLastAndOfferFirstTo()和take()Java方法。

 文章分享到这里,若有不足之处,欢迎补充评论!

抽丝剥茧,细说架构那些事!

相关文章
相关标签/搜索