在处理Web客户端发送的命令请求时,某些操做的执行时间可能会比咱们预期的更长一些。经过将待执行任务的相关信息放入队列里面,并在以后对队列进行处理,用户能够推迟那些须要一段时间才能完成的操做,这种工做交给任务处理器来执行的作法被称为任务队列(task queue)
。如今有不少专门的任务队列软件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下来实现两种不一样类型的任务队列,第一种队列会根据任务被插入队列的顺序来尽快地执行任务,第二种队列具备安排任务在将来某个特定时间执行的能力。json
除了任务队列以外,还有先进先出(FIFO)队列、后进后出(LIFO)队列和优先级(priority)队列。
使用任务队列来记录邮件的收信人以及发送邮件的缘由,并构建一个能够在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工做进程(worker process)。安全
要编写的队列将以“先到先服务”(first-come,first-served)的方式发送邮件,而且不管发送是否成功,程序都会把发送结果记录到日志里面。Redis的列表结构容许用户经过RPUSH和LPUSH以及RPOP和LPOP,从列表的两端推入和弹出元素。邮件队列使用RPUSH命令来将待发送的邮件推入列表的右端,而且由于工做进程除了发送邮件以外不须要执行其余工做,因此它将使用阻塞版本的弹出命令BLPOP从队列中弹出待发送的邮件,而命令的最大阻塞时限为30秒。服务器
邮件队列由一个Redis列表构成,包含多个JSON编码对象。为了将待发送的邮件推入队列里面,程序会获取发送邮件所需的所有信息,并将这些信息序列化为JSON对象,最后使用RPUSH命令将JSON对象推入邮件队列里面。网络
def send_sold_email_via_queue(conn, seller, item, price, buyer): data = { 'seller_id': seller, 'item_id': item, 'price': price, 'buyer_id': buyer, 'time': time.time() } conn.rpush('queue:email', json.dumps(data))
从队列里获取待发送邮件,程序首先使用BLPOP命令从邮件队列里面弹出一个JSON对象,接着经过解码JSON对象来取得待发送邮件的相关信息,最后根据这些信息来发送邮件。ide
def process_sold_email_queue(conn): while not QUIT: packed = conn.blpop(['queue:email'], 30) //获取一封待发送邮件 if not packed: //队列里面暂时尚未待发送邮件,重试 continue to_send = json.loads(packed[1]) //从JSON对象中解码出邮件信息 try: fetch_data_and_send_sold_email(to_send) except EmailSendError as err: log_error("Failed to send sold email", err, to_send) else: log_success("Send sold email", to_send)
由于BLPOP命令每次只会从队列里面弹出一封待发送邮件,因此待发送邮件不会出现重复,也不会被重复发送。而且由于队列只会存放待发送邮件,因此工做进程要处理的任务是很是单一的。下面代码的工做进程会监视用户提供的多个队列,并从多个已知的已注册回调函数里面,选出一个函数来处理JSON编码的函数调用。函数
def worker_watch_queue(conn, queue, callback): while not QUIT: packed = conn.blpop([queue], 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: //没有找到任务指定的回调函数,用日志记录错误并重试 log_error("Unknown callback %s"%name) continue callbacks[name](*args) //执行任务
在使用队列的时候,程序可能会须要让特定的操做优先于其余操做执行。fetch
假设如今咱们须要为任务设置高、中、低3种优先级别,其中:高优先级任务在出现以后会第一时间被执行,而中等优先级任务则会在没有任何高优先级任务存在的状况下被执行,而低优先级任务则会在既没有任何高优先级任务,又没有任何中等优先级任务的状况下被执行。ui
def worker_watch_queues(conn, queues, callbacks): while not QUIT: packed = conn.blpop(queues, 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: log_error("Unknown callback %s"%name) continue callbacks[name](*args)
同时使用多个队列能够下降实现优先级特性的难度。除此以外,多队列有时候也会被用于分隔不一样的任务(如同一个队列存放公告邮件,而另外一个队列则存放提醒邮件),在这种状况下,处理不一样队列时可能出现不公平现象。为此,咱们能够偶尔从新排列各个队列的顺序,使得针对队列的处理操做变得更公平一些,当某个队列的增加速度比其余队列的增加速度快的时候,这种重拍操做尤其重要。编码
使用列表结构能够实现只能执行一种任务的队列,也能够实现经过调用不一样回调函数来执行不一样任务的队列,甚至还能够实现简单的优先级队列。
如下3种方法能够为队列中的任务添加延迟性质:日志
由于不管是进行短暂的等待,仍是将任务从新推入队列里面,都会浪费工做进程的时间,因此咱们不会采用第一种方法。此外,由于工做进程可能会由于崩溃而丢失本地记录的全部待执行任务,因此咱们也不会采用第二种方法。最后,由于使用有序集合的第三种方法最简单直接,因此咱们将采起这一方法,并使用锁来保证任务从有序集合移动到任务队列时的安全性。
有序集合队列(ZSET queue)存储的每一个被延迟的任务都是一个包含4个值的JSON列表,这4个分值分别是:惟一标识符、处理任务队列的名字、处理任务的回调函数的名字、传给回调函数的参数。在有序集合里面,任务的分值会被设置为任务的执行时间,而当即可执行的任务将被直接插入任务队列里面。下面代码展现了建立延迟任务(任务是否延迟是可选的,只要把任务的延迟时间设置为0就能够建立一个当即执行的任务)。
def execute_later(conn, queue, name, args, delay=0): identifier = str(uuid.uuid4()) item = json.dumps([identifier, queue, name, args]) if delay > 0: conn.zadd('delayed:', item, time.time() + delay) else: conn.rpush('queue:' + queue, item) return identifier
由于Redis没有提供直接的方法能够阻塞有序集合直到元素的分值低于当前UNIX时间戳为止,因此咱们须要本身来查找有序集合里面分值低于当前UNIX时间戳的任务。由于全部被延迟的任务都存储在同一个有序集合队列里面,因此程序只须要获取有序集合里面排名第一的元素以及该元素的分值就能够了:若是队列里面没有任何任务,或者任务的执行时间还没有来临,那么程序将在短暂等待以后重试;若是任务的执行时间已到,那么程序将根据任务包含的标识符来获取一个细粒度锁,接着从有序集合里面移除要被执行的任务,并将它添加到适当的任务队列里面。经过将可执行的任务添加到任务队列里面而不是直接执行它们,咱们能够把获取可执行任务的进程数量限制在一两个以内,而没必要根据工做进程的数量来决定运行多少个获取进程,这减小了获取可执行任务所需的花销。
def poll_queue(conn): while not QUIT: item = conn.zrange('delayed:', 0, 0, withscores=True) if not item or item[0][1] > time.time(): time.sleep(.01) continue item = item[0][0] identifier, queue, function, args = json.loads(item) locked = acquire_lock(conn, identifier) if not locked: continue if conn.zrem('delayed:', item): conn.rpush('queue:' + queue, item) release_lock(conn, identifier, locked)
由于有序集合并不具有像列表那样的阻塞弹出机制,因此程序须要不断地进行循环,并尝试从队列里面获取要被执行的任务,虽然这一操做会增大网络和处理器的负载,但由于咱们只会运行一两个这样的程序,因此不会消耗太多资源。