定时任务调度-Celery

确保任务不重叠解决方法:

from celery import task
from celery.five import monotonic
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes

@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    logger.debug('Importing feed: %s', feed_url)
    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            return Feed.objects.import_feed(feed_url).url
    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)

 

celery 特性:

  Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,而且提供维护这样一个系统的必需工具。因为在工做的平台中用到Celery系统(用于发送邮件、发送短信、发送上线等任务),记录一下学习的知识。

python

使用rabbitmq作celery的broker和redis作celery的broker的特性redis

         使用RabbitMQ做为Celery Broker的优势:
         Highly customizable routing(高度定制路由)
         Persistent queues(一致性队列)数据库

使用redis做为celery brocker的优势:
        high speed due to in memory datastore(速度极快的内存数据库)
        can double up as both key-value datastore and job queue(能够保证key-value 数据存储及job序列)django

celery-安装

  pip3 install celery(4.0版本celery beat不支持热加载)json

 

celery-flower监控安装

  pip3 install flowerapi

django  celery 安装

  pip3 install django-celery服务器

celery 原理介绍

      某个方法的消息请求celery执行,首先celery根据绑定的规则把任务消息放到制定的路由队列中去,此队列对应的worker节点取出执行。

说明:
      为何要定义多个worker?每一个worker都会新建一个进程,充分利用服务器资源,提升执行效率。
      同一个服务器能够启动多个worker节点?能够,启动参数里面写上不一样的–hostname便可。
      celery默认会建立一个celery任务队列,没有任何绑定的任务将会发送到此消息队列中。网络

 

 

celery 多woker实验

    celery加redis的多节点配置实例,因为资源限制只找了两台机器作测试多线程

10.10.42.33
10.10.190.234

  咱们把redis服务放在10.10.190.234那台服务器上
  咱们把flower服务也启动在10.10.42.33那台服务器上
  代码中定义的队列有queue_add、queue_sum (还有个默认队列celery)
  3三、234服务器用于启动worker节点
  33服务器上启动处理celery和queue_add队列的worker节点
  234服务器上启动处理celery和queue_sum队列的worker节点架构

 

配置文件展现

  celeryconfig配置文件:
cat celeryconfig.py
#!/usr/bin/python
#coding:utf-8
from kombu import Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
####################################
# 通常配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks', )
CELERYD_MAX_TASKS_PER_CHILD = 40 #  每一个worker执行了多少任务就会死掉
BROKER_POOL_LIMIT = 10 #默认celery与broker链接池链接数
CELERY_DEFAULT_QUEUE='default'
CELERY_DEFAULT_ROUTING_KEY='task.default'
CELERY_RESULT_BACKEND='redis://:fafafa@10.10.190.234:6379/0'  
BROKER_URL='redis://:fafafa@10.19.190.234:6379/0'  
#默认队列
CELERY_DEFAULT_QUEUE = 'celery' #定义默认队列
CELERY_DEFAULT_ROUTING_KEY = 'celery' #定义默认路由
CELERYD_LOG_FILE="./logs/celery.log"
CELERY_QUEUES = (
    Queue("queue_add", routing_key='queue_add'),
    Queue('queue_reduce', routing_key='queue_sum'),
    Queue('celery', routing_key='celery'),
    )
CELERY_ROUTES = {
    'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
    'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
} 
查看任务配置文件:

  cat task.py

import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
from celeryservice import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# task定义 #
####################################
@app.task
def add(x, y):
    return x + y
@app.task
def reduce(x, y):
    return x - y
@app.task
def sum(values):
    return sum([int(value) for value in values])
@app.task
def other(x, y):
    return x * y

  

flower任务配置文件

  cat flower.py

#!/usr/bin/env python
#coding:utf-8
broker_api = 'redis://:afafafafa@10.10.190.234:6379/0'
logging = 'DEBUG'
address = '0.0.0.0'
port = 5555
basic_auth = ['zero:zero']  #外部访问密码
persistent=True  #持久化celery tasks(若是为false的话,重启flower以后,监控的task就消失了)
db="./flower/flower_db"

 

 

启动服务:

  在33上启动服务

celery worker -A  task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33  >/dev/null 2>&1 &

  在234上启动服务

celery worker -A  task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33  >/dev/null 2>&1 &

 

服务验证:

  在任一台有celeryservice项目代码的服务器上,运行add、reduce、sum、other任务(测试可简单使用add.delay(1,2)等)
  add只会在33上运行,
  sum任务,可能会在33或234服务器的worker节点运行
  reduce任务,只会在234上运行。
  other任务可能会在33或者234上运行。

 

关于使用过程当中的优化

使用celery的错误处理机制

  以下内容来自于网站,还没实践,存档用。
  大多数任务并无使用错误处理,若是任务失败,那就失败了。在一些状况下这很不错,可是做者见到的多数失败任务都是去调用第三方API而后出现了网络错误,
  或者资源不可用这些错误,而对于这些错误,最简单的方式就是重试一下,也许就是第三方API临时服务或者网络出现问题,没准立刻就行了,那么为何不试着加个重试测试一下呢?

@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
    try:
        print("doing stuff here...")
    except SomeNetworkException as e:
        print("maybe do some clenup here....")
        self.retry(e)

  

 

 定时任务遇到的问题:

  

  经过flower 查看 跑多线程报错, 须要减小线程数.

 

 celery配置文件的一些详细解释:

# -*- coding:utf-8 -*-                                                                                                                                                  
from datetime import timedelta
from settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB_NUM
 
 
# 某个程序中出现的队列,在broker中不存在,则马上建立它
CELERY_CREATE_MISSING_QUEUES = True
 
CELERY_IMPORTS = ("async_task.tasks", "async_task.notify")
 
# 使用redis 做为任务队列
BROKER_URL = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/' + str(REDIS_DB_NUM)
 
#CELERY_RESULT_BACKEND = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/10'
 
CELERYD_CONCURRENCY = 20  # 并发worker数
 
CELERY_TIMEZONE = 'Asia/Shanghai'
 
CELERYD_FORCE_EXECV = True    # 很是重要,有些状况下能够防止死锁
 
CELERYD_PREFETCH_MULTIPLIER = 1
 
CELERYD_MAX_TASKS_PER_CHILD = 100    # 每一个worker最多执行万100个任务就会被销毁,可防止内存泄露
# CELERYD_TASK_TIME_LIMIT = 60    # 单个任务的运行时间不超过此值,不然会被SIGKILL 信号杀死 
# BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 90}
# 任务发出后,通过一段时间还未收到acknowledge , 就将任务从新交给其余worker执行
CELERY_DISABLE_RATE_LIMITS = True   
 
# 定时任务
CELERYBEAT_SCHEDULE = {
    'msg_notify': {
        'task': 'async_task.notify.msg_notify',
        'schedule': timedelta(seconds=10),
        #'args': (redis_db),
        'options' : {'queue':'my_period_task'}
    },
    'report_result': {
        'task': 'async_task.tasks.report_result',
        'schedule': timedelta(seconds=10),
      #'args': (redis_db),
        'options' : {'queue':'my_period_task'}
    },
    #'report_retry': {
    #    'task': 'async_task.tasks.report_retry',
    #    'schedule': timedelta(seconds=60),
    #    'options' : {'queue':'my_period_task'}
    #},
 
}
################################################
# 启动worker的命令
# *** 定时器 ***
# nohup celery beat -s /var/log/boas/celerybeat-schedule  --logfile=/var/log/boas/celerybeat.log  -l info &
# *** worker ***
# nohup celery worker -f /var/log/boas/boas_celery.log -l INFO &

 

celery总体架构图:

相关文章
相关标签/搜索