为何须要消息队列python
系统中引入消息队列机制是对系统一个很是大的改善。例如一个web系统中,用户作了某项操做后须要发送邮件通知到用户邮箱中。你可使用同步方式让用户等待邮件发送完成后反馈给用户,可是这样可能会由于网络的不肯定性形成用户长时间的等待从而影响用户体验。
有些场景下是不可能使用同步方式等待完成的,那些须要后台花费大量时间的操做。例如极端例子,一个在线编译系统任务,后台编译完成须要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据状况再此反馈用户与否。
另外适用消息队列的状况是那些系统处理能力有限的状况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的状况下也能稳定的处理掉高并发的任务。
消息队列能够用来作排队机制,只要系统须要用到排队机制的地方就可使用消息队列来做。
rabbitmq的优先级作法
目前成熟的消息队列产品有不少,著名的例如rabbitmq。它使用起来相对仍是比较简单的,功能也相对比较丰富,通常场合下是彻底够用的。可是有个很烦人的就是它不支持优先级。
例如一个发邮件的任务,某些特权用户但愿它的邮件可以更加及时的发送出去,至少比普通用户要优先对待。默认状况下rabbitmq是没法处理掉 的,扔给rabbitmq的任务都是FIFO先进先出。可是咱们可使用一些变通的技巧来支持这些优先级。建立多个队列,并为rabbitmq的消费者设 置相应的路由规则。
例如默认状况下有这样一个队列,咱们拿list来模拟 [task1, task2, task3],消费者轮流按照FIFO的原则一个个拿出task来处理掉。若是有高优先级的任务进来,它也只能跟在最后被处理[task1, task2, task3, higitask1]. 可是若是使用两个队列,一个高优先级队列,一个普通优先级队列。 普通优先级[task1, task2, task3], 高优先级[hightask1 ] 而后咱们设置消费者的路由让消费者随机从任意队列中取数据便可。
而且咱们能够定义一个专门处理高优先级队列的消费者,它空闲的时候也不处理低优先级队列的数据。这相似银行的VIP柜台,普通客户在银行取号排队,一个VIP来了他虽然没有从取号机里拿出一个排在普通会员前面的票,可是他仍是能够更快地直接走VIP通道。
使用rabbitmq来作支持优先级的消息队列的话,就像是上面所述同银行VIP会员同样,走不一样的通道。可是这种方式只是相对的优先级,作不 到绝对的优先级控制,例如我但愿某一个优先级高的任务在绝对意义上要比其余普通任务优先处理掉,这样上面的方案是行不通的。由于rabbitmq的消费者 只知道再本身空闲的状况下从本身关心的队列中“随机”取某一个队列里面的第一个数据来处理,它无法控制优先取找哪个队列。或者更加细粒度的优先级控制。 或者你系统里面设置的优先级有10多种。这样使用rabbitmq也是很难实现的。
可是若是使用redis来作队列的话上面的需求均可以实现。
使用redis怎么作消息队列
首先redis它的设计是用来作缓存的,可是因为它自身的某种特性使得他能够用来作消息队列。它有几个阻塞式的API可使用,正是这些阻塞式的API让他有作消息队列的能力。
试想一下在”数据库解决全部问题“的思路下,不使用消息队列也是能够完成你的需求的。咱们把任务所有存放在数据库而后经过不断的轮询方式来取任 务处理。这种作法虽然能够完成你的任务可是作法很粗劣。可是若是你的数据库接口提供一个阻塞的方法那么就能够避免轮询操做了,你的数据库也能够用来作消息 队列,只不过目前的数据库尚未这样的接口。
另外作消息队列的其余特性例如FIFO也很容易实现,只须要一个List对象从头取数据,从尾部塞数据便可实现。
redis能作消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,因此能够用来作消息队列。
redis消息队列优先级的实现
一些基础redis基础知识的说明
redis> blpop tasklist 0
"im task 01"
这个例子使用blpop命令会阻塞方式地从tasklist列表中取头一个数据,最后一个参数就是等待超时的时间。若是设置为0则表示无限等 待。另外redis存放的数据都只能是string类型,因此在任务传递的时候只能是传递字符串。咱们只须要简单的将负责数据序列化成json格式的字符 串,而后消费者那边再转换一下便可。
这里咱们的示例语言使用python,连接redis的库使用redis-py. 若是你有些
编程基础把它切换成本身喜欢的语言应该是没问题的。
1.简单的FIFO队列
复制代码
import redis, time
def handle(task):
print task
time.sleep(4)
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop('tasklist', 0)
handle(result[1])
if __name__ == "__main__":
main()
复制代码
上例子即便一个最简单的消费者,咱们经过一个无限循环不断地从redis的队列中取数据。若是队列中没有数据则没有超时的阻塞在那里,有数据则取出往下执行。
通常状况取出来是个复杂的字符串,咱们可能须要将其格式化后做为再传给处理函数,可是为了简单咱们的例子就是一个普通字符串。另外例子中的处理函数不作任何处理,仅仅sleep 用来模拟耗时的操做。
咱们另开一个redis的客户端来模拟生产者,自带的客户端就能够。多往tasklist 队列里面塞上一些数据。
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'
随后在消费者端便会看到这些模拟出来的任务被挨个消费掉。
2.简单优先级的队列
假设一种简单的需求,只须要高优先级的比低优先级的任务率先处理掉。其余任务之间的顺序一律无论,这种咱们只须要在在遇到高优先级任务的时候将它塞到队列的前头,而不是push到最后面便可。
由于咱们的队列是使用的redis的 list,因此很容易实现。遇到高优先级的使用rpush 遇到低优先级的使用lpush
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'
随后会看到,高优先级的老是比低优先级的率先执行。可是这个方案的缺点是高优先级的任务之间的执行顺序是先进后出的。
3.较为完善的队列
例子2中只是简单的将高优先级的任务塞到队列最前面,低优先级的塞到最后面。这样保证不了高优先级任务之间的顺序。
假设当全部的任务都是高优先级的话,那么他们的执行顺序将是相反的。这样明显违背了队列的FIFO原则。
不过只要稍加改进就能够完善咱们的队列。
跟使用rabbitmq同样,咱们设置两个队列,一个高优先级一个低优先级的队列。高优先级任务放到高队列中,低的放在低优先队列中。redis和rabbitmq不一样的是它能够要求队列消费者从哪一个队列里面先读。
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
handle(result[1])
上面的代码,会阻塞地从'high_task_queue', 'low_task_queue'这两个队列里面取数据,若是第一个没有再从第二个里面取。
因此只须要将队列消费者作这样的改进即可以达到目的。
复制代码
redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004
复制代码
经过上面的测试看到,高优先级的会被率先执行,而且高优先级之间也是保证了FIFO的原则。
这种方案咱们能够支持不一样阶段的优先级队列,例如高中低三个级别或者更多的级别均可以。
4.优先级级别不少的状况
假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是相似0-99999这么多级别。那么咱们第三种方案将不太合适了。
虽然redis有sorted set这样的能够排序的数据类型,看是很惋惜它没有阻塞版的接口。因而咱们仍是只能使用list类型经过其余方式来完成目的。
有个简单的作法咱们能够只设置一个队列,并保证它是按照优先级排序号的。而后经过二分查找法查找一个任务合适的位置,并经过 lset 命令插入到相应的位置。
例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,咱们经过本身的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置而后插入到指定地点便可。
由于二分查找是比较快的,而且redis自己也都在内存中,理论上速度是能够保证的。可是若是说数据量确实很大的话咱们也能够经过一些方式来调优。
回想咱们第三种方案,把第三种方案结合起来就会很大程度上减小开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。咱们能够设置 10个或者100个不一样的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不一样等级拆分后它单个队列的数据 就减小许多,这样二分查找匹配的效率也会高一点。可是数据所占的资源基本是不变的,十万数据该占多少内存仍是多少。只是
系统里面多了一些队列而已。