# -*- coding: utf-8 -*- # __author__ = "maple" """""" # 1.请问 flask或django 项目在只有 一个进程一个线程的状况下,不少请求同时到来.每一个请求服务端须要处理2分钟.如何解决? # 2. celery是什么? """ celery是一个基于Python实现的模块,模块能够帮助咱们实现"任务管理". """ # 3.什么状况下使用celery? """ 一个请求的处理时间特别长,能够使用celery. """ # 4. 安装celery """ pip3 install celery """ # 5. 依赖redis作消息队列(存听任务可记过) # 6. 快速使用celery """ a.建立s1.py import time from celery import Celery app = Celery('tasks', broker='redis://123@123.207.56.67:6380', backend='redis://123@192.168.19.123:6379') @app.task def xxxxxx(x, y): time.sleep(10) return x + y b.启动worker 进入当前目录 celery worker -A s1 -l info celery worker -A s1 -l info -P eventlet (不推荐) c.建立任务 import s1 result = s1.xxxxxx.delay(88,671) print(result.id) d. 检查任务 import s1 from celery.result import AsyncResult ret = AsyncResult(id='90a2a6e8-299a-41a5-9aab-4c989f45d303',app=s1.app) if ret.successful(): data = ret.get() print(data) """ # 7. 注意事项 """ - 解释器版本和celery版本 - windows和linux - broker: - redis - rabbitMQ """ # 8. flask示例: flask12306 """ 问题: 1. 多任务结构目录划分 2. 定时任务 - 一次性 - 周期性 3. django示例 参考:https://www.cnblogs.com/wupeiqi/articles/8796552.html?tdsourcetag=s_pcqq_aiomsg """ # 9. 非周期性的定时任务: 示例见 demo3 """ import datetime import s1 # 使用 # 方式一 ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) eta_utc_time = utc_ctime + datetime.timedelta(seconds=5) # 方式二: #ctime = 数据库获取DateTime(2019-11-11) #eta_utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp()) result = s1.xxxxxx.apply_async(args=[1, 3], eta=eta_utc_time) # eta是执行时间(UTC时间) print(result.id) """ # 10. 周期性: 示例见 demo4 """ 1. 启动worker (执行任务的进程) celery worker -A s1 -l info -P eventlet 2. 启动建立任务的进程 celery beat -A s1 -l info 扩展: 周期性定时任务能够使用celery + linux crontab服务实现. """ import time from celery import Celery app = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379') @app.task def xxxxxx(x, y): time.sleep(1) return x + y app.conf.beat_schedule = { 'adfasdfasdf': { 'task': 's1.xxxxxx', 'schedule': 5, 'args': (2, 16) }, } # 11. django中使用celery """ 1. 在project同名目录建立celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 's13django_celery.settings') app = Celery('django_celery_demo') app.config_from_object('django.conf:settings', namespace='CELERY') # 自动去每一个已注册的app中找 task.py 并执行. app.autodiscover_tasks() 2. 在配置文件中编写broker和backend CELERY_BROKER_URL = 'redis://127.0.0.1:6379' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' CELERY_TASK_SERIALIZER = 'json' 3. 编写任务,在每一个已注册app中找到task.py里面编写: from celery import shared_task @shared_task def add(x, y): return x + y 4. 在项目同名的目录下编写: from .celery import app as celery_app __all__ = ('celery_app',) 5. 启动worker celery worker -A 项目目录中和项目同名的文件夹名称 -l info -P eventlet 6. 编写django程序 - 建立任务 from django.shortcuts import render,HttpResponse from app01 import tasks def gen(request): # 当即执行任务 # result = tasks.add.delay(1,2) # print(result.id) # 一次性执行一个定时任务 ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) eta_utc_time = utc_ctime + datetime.timedelta(seconds=5) result = tasks.add.apply_async(args=[1, 3], eta=eta_utc_time) print(result.id) return HttpResponse('任务建立成功:%s' %result.id) - 检查任务结果 from django.shortcuts import render,HttpResponse from celery.result import AsyncResult from s13django_celery import celery_app def check(request): task_id = request.GET.get('task') async = AsyncResult(id=task_id, app=celery_app) if async.successful(): data = async.get() print('成功', data) else: print('任务等待中被执行') return HttpResponse('...') """ # 12. 在django中周期性的执行定时任务 """ 方式一: s13django_celery/celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 's13django_celery.settings') app = Celery('s13django_celery') app.config_from_object('django.conf:settings', namespace='CELERY') # 自动去每一个已注册的app中找 task.py 并执行. app.autodiscover_tasks() # app.conf.beat_schedule = { # 'add-every-5-seconds': { # 'task': 'app01.tasks.add', # 'schedule': 5.0, # 'args': (16, 16) # }, # } 启动: celery worker -A s13django_celery -l info -P eventlet celery beat -A s13django_celery 方式二: pip3 install django_celery_beat 1. 注册app INSTALLED_APPS = [ ... 'django_celery_beat' ] 2. 数据库迁移 python manage.py migrate 3. 启动worker celery worker -A s13django_celery -l info -P eventlet celery beat -A s13django_celery --scheduler django_celery_beat.schedulers:DatabaseScheduler 4. 在admin中建立任务 Crontabs Intervals Periodic tasks(任务表) Solar events """ # 13. 常见面试问题 """ 1. 用过celery吗? - 用过,路飞. - 没用过,研究过. 2. broker的选择: - redis,简单应用. - rabbitMQ,复杂应用(专业消息队列). 3. app/broker/backend/worker 4. 作定时任务时,crontab和celery均可以?他们的区别是什么? - crontab是linux的一个服务,使用时须要为其编写特定脚本. - celery是一个基于Python实现的框架,代码和项目能够编写在一块儿. 5. shared_task的所用是什么? 6. django中使用celery是:使用定时任务能够实如今数据库指定任务(无需重启). """