原创文章,做者:Damon付,如若转载,请注明出处:《Python消息队列工具 Python-rq 中文教程》http://www.tiangr.com/python-xiao-xi-dui-lie-python-rq-zhong-wen-jiao-cheng-2.htmlhtml
翻译至python-rq官网 http://python-rq.org前端
安装方法python
pip install rq
首先,须要运行一个Redis服务,你可使用一个已经存在的Redis,放置任务(jobs)至队列(queues),你不须要作任何其余特别的事情,仅仅须要定义你本身须要加入队列中耗时的阻塞方法:web
import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
接着建立一个RQ队列:redis
from redis import Redis from rq import Queue q = Queue(connection=Redis())
入列方法:数据库
from my_module import count_words_at_url result = q.enqueue( count_words_at_url, 'http://nvie.com')
Worker进程 在后台开始执行队列中的方法须要开启一个worker进程。安全
$ rq worker *** Listening for work on default Got count_words_at_url('http://nvie.com') from default Job result = 818 *** Listening for work on default
一个"任务"(job)是一个Python对象,表示在一个"工做"(worker)进程(后台进程)中被异步调用的方法。任何Python方法均可以经过传递函数和其参数的引用值的方式被异步调用,这个过程称做"入队"(enqueueing)。异步
将任务放置入队列,首先申明一个函数:socket
import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
注意到了吗?此函数没有任何特殊的地方,任何函数均可以将其放入RQ队列。async
将 count_words_at_url
放入队列,在后台运行。
from rq import Queue from redis import Redis from somewhere import count_words_at_url # Tell RQ what Redis connection to use redis_conn = Redis() q = Queue(connection=redis_conn) # no args implies the default queue # Delay execution of count_words_at_url('http://nvie.com') job = q.enqueue(count_words_at_url, 'http://nvie.com') print job.result # => None # Now, wait a while, until the worker is finished time.sleep(2) print job.result # => 889
若是你想指定任务至特定的队列,能够如此定义队列名称:
q = Queue('low', connection=redis_conn) q.enqueue(count_words_at_url, 'http://nvie.com')
注意示例中的Queue('low')
,你可使用任意的名称替代以获取符合需求、具备扩展性的分布式队列任务模式。默认通用的队列命名方式(e.g. high, medium, low)能够区分队列任务的优先级。
此外,你能够添加一些额外的参数来控制队列任务。默认状况下,有以下键值对参数能够传递给任务方法:
timeout
, 指定任务的最大运行时间,超时将被丢弃。result_ttl
, 指定保存的任务结果过时时间。ttl
, 指定任务排队的最大时间,超时将被取消。depends_on
, 指定任务对应所需执行的依赖任务(或者job id),必须完成依赖任务再执行指定任务。job_id
, 为任务手动添加一个job_id标识。at_front
, 将任务放置在队列的最前端而不是最后。kwargs
和 args
, 绕开这些自动弹出的参数 ie: 为潜在的任务方法指定一个timeout参数。最后,建议使用更明晰的方法.enqueue_call()
来取代.enqueue()
q = Queue('low', connection=redis_conn) q.enqueue_call(func=count_words_at_url, args=('http://nvie.com',), timeout=30)
有些状况下,web进程没法进入运行在工做(worker) 中的源码(i.e. X中的代码须要调用一个Y中延迟函数),你能够经过字符串引用传递函数。
q = Queue('low', connection=redis_conn) q.enqueue('my_package.my_module.my_func', 3, 4)
除了任务入列,队列(queues)还包含了一些其它有用的方法:
from rq import Queue from redis import Redis redis_conn = Redis() q = Queue(connection=redis_conn) # Getting the number of jobs in the queue print len(q) # Retrieving jobs queued_job_ids = q.job_ids # Gets a list of job IDs from the queue queued_jobs = q.jobs # Gets a list of enqueued job instances job = q.fetch_job('my_id') # Returns job having ID "my_id"
RQ设计原理
使用RQ的过程当中,你你不须要提早建立任何的队列,你也不须要指定任何使用渠道,数据交换规则,路由规则等。你只须要将你的任务放入到任何你想要放入的队列中,一旦你将一个任务入列至一个还未存在的队列中,它将迅速被建立。
RQ没有使用任何中间人来指定消息的位置,你可能认为这是优点也可能任务这是不合理的,这取决与你想要解决的具体问题。
最后,RQ没有使用简明的协议,由于它依据的是Python自带的pickle模块来序列化工做任务。
当任务入列,queue.enqueue()
方法将会返回一个Job
实例,这仅仅是一个用于检测实际任务运行结果的代理对象。
为此,它拥有一个方便的结果访问属性,在任务尚未完成时将返回None,或者当任务完成时返回一个非空值(假设此任务第一时间返回了值)。
若是你想使用相似Celery的代码风格,你可能须要使用@task装饰器。RQ>=3.0版本将拥有相似的装饰器:
from rq.decorators import job @job('low', connection=my_redis_conn, timeout=5) def add(x, y): return x + y job = add.delay(3, 4) time.sleep(1) print job.result
为了方便测试,你能够入列一个任务而不须要绑定一个实际运行的工做(worker)进程(RQ >= 0.3.1 可用)。为了实现此功能,你能够在队列构造器中传递参数async=False
.
>>> q = Queue('low', async=False) >>> job = q.enqueue(fib, 8) >>> job.result 21
以上代码将在同一进程中同步执行函数fib(8)
而不须要任何一个激活的工做(worker)进程。相似于Celery中的ALWAYS_EAGER。
RQ 0.4.0版本的新特性,能够用来管理多任务直接的依赖关系。使用depends_on
参数实现执行一个依赖于另外一个任务的Job。
q = Queue('low', async=False) report_job = q.enqueue(generate_report) q.enqueue(send_report, depends_on=report_job) The ability to handle job dependencies allows you to split a big job into several smaller ones. A job that is dependent on another is enqueued only when its dependency finishes successfully.
(略)
技术上来讲,你能够放置任何Python方法到队列,但这不意味着你这样作是明智的,有些因素你在入列任务前必须考虑:
__module__
。这意味着你不能入列申明在__main__
模块中的任务方法。RQ 的工做进程依赖系统的fork()
方法,这意味着Windows下没法运行。
Worker是一个运行在后台的Python进程,用来执行一些你不想要在web进程中执行的冗长或者阻塞任务。
从项目的root目录下开启一个worker进程:
$ rq worker high normal low *** Listening for work on high, normal, low Got send_newsletter('me@nvie.com') from default Job ended normally without result *** Listening for work on high, normal, low ...
工做进程将无限循环读取给定队列(顺序很是重要)中的任务,并在全部任务方法执行完毕后继续等待新的任务。
每个工做进程一次将只执行一个任务,在一个worker进程中,不会并行处理任务,若是你想要并行执行任务,你只须要开启更多的worker进程。
默认状况下,工做进程将当即运行而且在运行完全部任务后一直阻塞直至新的任务产生。Worker进程一样可使用突发模式运行,此模式将完成全部队列中的任务,而且在全部给定的队列执行完毕后退出。
$ rq worker --burst high normal low *** Listening for work on high, normal, low Got send_newsletter('me@nvie.com') from default Job ended normally without result No more work, burst finished. Registering death.
这对批量执行须要周期性执行的任务或者大幅升高暂时性worker进程的峰值来讲十分有用。
Worker进程生命周期
StartedJobRegistry
。idle
,而且依据result_ttl
设置任务和任务执行结果至过时。Job任务将从 StartedJobRegistry
里移除,并在成功执行以后新增至 FinishedJobRegistry
,失败后新增至 FailedQueue
。基本上,rq worker
进程脚本是一个简单 获取-Fork-执行的循环
。当大量任务作了冗长的启动设置,或者他们全都依赖于一样的模块,你在执行每一个任务时都将花费大量额外的时间(由于你在fork
进程以后才进行import
操做)。这种方式很简洁,并且RQ也所以不会出现内存泄漏的状况,但与此同时带来的负面影响是执行效率下降了。
对此,你可以采用在fork
进程以前先import
引入必要模块的方式来提升生产力。RQ目前没有提供能够采起这种设置的参数,可是你能够在你的worker
进程进行循环以前先作import
导入。
你能够自定义本身的worker
脚本(替代掉原来使用的rq进程).一个简单的实例:
#!/usr/bin/env python import sys from rq import Connection, Worker # Preload libraries import library_that_you_want_preloaded # Provide queue names to listen to as arguments to this script, # similar to rq worker with Connection(): qs = sys.argv[1:] or ['default'] w = Worker(qs) w.work()
Workers
进程命名方式默认等于当前的hostname
与当前PID
链接。覆盖默认设置,能够在开始worker
进程时指定一个 --name
选项
任什么时候候,worker
进程收到SIGINT
(via Ctrl+C
) 或者 SIGTERM
(via kill
)信号,worker
进程将会等待当前正在执行任务完成工做后,再关闭循环,将其注册入死亡进程。
若是是在关闭进程阶段,再次收到SIGINT
或者 SIGTERM
,worker
进程将强制性使子进程中断(发送SIGKILL
),但依然会尝试将其注册入死亡进程。
0.3.2
版本中的新特性
若是你想要经过配置文件而不是命令行参数来配置rq进程,你能够建立一个名为settings.py
的Python文件:
REDIS_URL = 'redis://localhost:6379/1' # You can also specify the Redis DB to use # REDIS_HOST = 'redis.example.com' # REDIS_PORT = 6380 # REDIS_DB = 3 # REDIS_PASSWORD = 'very secret' # Queues to listen on QUEUES = ['high', 'normal', 'low'] # If you're using Sentry to collect your runtime exceptions, you can use this # to configure RQ for it in a single step SENTRY_DSN = 'http://public:secret@example.com/1'
图上示例展现了全部可用的配置选项。 注意:QUEUES
和REDIS_PASSWORD
设置在0.3.3
之后的版本才存在。 指定worker
进程读取配置文件的路径使用 -c
参数:
$ rq worker -c settings
版本0.4.0 的新特性
There are times when you want to customize the worker's behavior. Some of the more common requests so far are:
os.fork
的模型.multiprocessing
或者 gevent
. 使用 -w 参数指定 worker类路径:$ rq worker -w 'path.to.GeventWorker'
将来开放
You can tell the worker to use a custom class for jobs and queues using --job-class and/or --queue-class.
$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'
Don't forget to use those same classes when enqueueing the jobs.
For example:
from rq import Queue from rq.job import Job class CustomJob(Job): pass class CustomQueue(Queue): job_class = CustomJob queue = CustomQueue('default', connection=redis_conn) queue.enqueue(some_func)
版本 0.5.5 的新特性
若是你想根据不一样类型的任务来决定对应的异常操做,或者仅仅想重写异常处理,能够经过--exception-handler
选项:
$ rq worker --exception-handler 'path.to.my.ErrorHandler' # Multiple exception handlers is also supported $ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
入列任务是延迟函数的调用,也就是说咱们正在解决一个问题,可是须要等待一会才回来执行。
Python 方法若是有返回值,若是任务返回一个非空的值,worker进程将返回值写入Redis所存的任务对应Hash下,TTL默认的过时时间为任务结束后的500s。
The party that enqueued the job gets back a Job instance as a result of the enqueueing itself. Such a Job object is a proxy object that is tied to the job's ID, to be able to poll for results.
返回结果写入Redis时伴有一个有限的生存时间,能够避免Redis数据库数据无限增加。
RQ >= 0.3.1时,TTL的值能够在调用 enqueue_call()
时经过使用 result_ttl
关键词参数指定。它一样能够禁用过时,这时你须要手动清理数据。
q.enqueue_call(func=foo) # result expires after 500 secs (the default) q.enqueue_call(func=foo, result_ttl=86400) # result expires after 1 day q.enqueue_call(func=foo, result_ttl=0) # result gets deleted immediately q.enqueue_call(func=foo, result_ttl=-1) # result never expires--you should delete jobs manually
此外,你可使用来继续执行一些没有返回值,默认会当即删除的已完成任务,
q.enqueue_call(func=func_without_rv, result_ttl=500) # job kept explicitly
一般,任务能够在失败后抛出异常,RQ将用如下方式处理:
失败的任务须要关注而且失败任务的返回值不该该设置过时时间。更进一步,失败的任务须要再次运行测试。通常这些事情须要手动操做,由于RQ自己是没法字典或者可靠地自动判断任务从新执行是否安全。
当一个异常在任务中抛出,worker进程能够获取获得,并将其序列化,以键为exc_info
的hash存储在任务对应的Redis下。任务的引用随即被置于失败队列中。
任务自己拥有一些十分有用的属性帮助检测:
这些能够帮助你检测和手动追查问题,而且再次提交任务。
当worker
进程被 Ctrl+C
or kill
中断,RQ
几乎不会丢失任务。当前任务完成后,worker进程将其它未执行的任务终止。
然而,worker
进程可使用kill -9
强制终止,不过,这种方式worker
进程没法优雅地结束任务或者将任务放入失败队列中。所以,强制关闭一个worker
进程可能引发潜在的问题。
默认任务执行时间为180s,若是逾期未执行完毕,worker
进程将终止主进程而且将任务放入失败队列,并标识其超时。
若是一个任务须要更多(或更少)时间来完成,默认的超时时间也将改变,你能够在调用Queue.enqueue()
时经过参数指定:
q = Queue() q.enqueue(func=mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins You can also change the default timeout for jobs that are enqueued via specific queue instances at once, which can be useful for patterns like this: # High prio jobs should end in 8 secs, while low prio # work may take up to 10 mins high = Queue('high', default_timeout=8) # 8 secs low = Queue('low', default_timeout=600) # 10 mins # Individual jobs can still override these defaults low.enqueue_call(really_really_slow, timeout=3600) # 1 hr Individual jobs can still specify an alternative timeout, as workers will respect these.
在一些状况下,你可能须要进入当前的任务ID或者任务方法的实例,或者为任务存于任意数据。
版本 0.3.3 的新特性
由于任务方法和Python方法本质是同样的,你能够向RQ 获取当前任务ID:
from rq import get_current_job def add(x, y): job = get_current_job() print 'Current job: %s' % (job.id,) return x + y
版本 0.3.3 的新特性
为了给任务增长/更新自定义的状态信息,须要操做任务的meta属性,
import socket def add(x, y): job = get_current_job() job.meta['handled_by'] = socket.gethostname() job.save() return x + y
版本 0.4.7 的新特性
一个任务拥