用 Celery 实现邮件推送系统

系统需求

本文以Celery 实现分布式任务队列为基础,简述了一个邮件推送系统的模型。html

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了能够有多个 worker 的存在,队列表示其是异步操做,即存在一个产生任务提出需求的工头,和一群等着被分配工做的码农。前端

需求:ajax

1.在邮件推送系统中,咱们须要对成千上万的用户发送邮件,发送邮件具备时效性,即不能说今天开始发邮件,要等到明天才能发送完毕。redis

2.发送邮件过程当中,可能会遇到过于频繁,邮件服务器上信件堆积没法及时接受新信件而产生的拒信,或者邮件服务器将咱们的邮件判决为垃圾邮件。数据库

3.邮件发送的 I/O 时间较长,不能让程序在等待邮件服务器返回消息上浪费时间。django

因此咱们的推送系统要有如下特性:1.分布式处理做业;2.闭环监控;3.异步式分发做业服务器

系统框图

系统框图

前端经过 ajax 调用 views 中的 callpush 接口,该接口将被推送用户的筛选条件传入 service,而后 service 请求数据库,将返回数据做为参数调用 celery 接口中 addtask 函数。celery 接口中 addtask 根据 action 参数来判断所要添加的任务类型,根据不一样的类型分别进行处理,放入队列。app

系统的另一头,worker 从队列中取出任务,用 mail 函数推送邮件,若是发送失败就调用 error_handler 进行异常处理,此处咱们将全部 task 的执行状况放入 redis 中,给每一个任务进行标记,若是成功则标记为 1,失败则 0.异步

前端能够经过 ajax 调用 pushstatus 来向 redis 中读取任务执行状况,此处咱们返回了成功和失败任务的个数。分布式

伪代码实现

# Controller
from redis import StrictRedis
red = StrictRedis(host='localhost', port=6379, db=0)

def callpush(request):
  area = request.POST.get('area')
  return HttpResponse(str(mailpush(area)))

def pushstatus(request):
  failure = red.scard('status:0:task')
  success = red.scard('status:1:task')
  return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))

# Service
def mailpush(**kargs):
  targets = MtUser.objects.filter(kargs).values('username', 'address')
  addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings')
  return len(targets)

# Celery Interface (Dispatcher)
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

def addtask(action, data, **kargs):
  if action == 'mailpush':
    for (address, username) in data:
      app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler'))
  elif action == 'messagepush':
    pass
  else:
    pass

# Celery Backend (Worker)
from celery import Celery
from celery import Task
from redis import StrictRedis

app = Celery()
app.config_from_object('celeryconfig')
red = StrictRedis(host='localhost', port=6379, db=0)

@app.task(bind=True)
def mail(self, subject, content, address):
    from django.core.mail import EmailMessage
    msg = EmailMessage(subject, content, 'admin@admin.com', address)
    msg.content_subtype = 'html'
    msg.send()
    red.sadd('status:1:task', self.request.id)

# Overwrite the on_failure function in trace.py
@app.task
def error_handler(uuid, args):
    print uuid
    print args
    red.set(uuid, args)
    red.sadd('status:0:task', uuid)
    red.srem('status:1:task', uuid)
相关文章
相关标签/搜索