分布式任务队列学习笔记,执行异步义务和定时任务

一、前言

  Celery是1个依照python开发的分布式任务队列,假使不驾驭请阅读作者上1篇博文Celery入门与进阶,而做python
WEB开发最为流行的框架莫属Django,可是Django的呼吁处理进度都以同台的不能够完成异步职责,若要落成异步任务处理供给经过其余方法(前端的貌似消除方案是ajax操作),而后台Celery就是毋庸置疑的挑叁拣四。假使贰个用户在实施有些操作要求静观其变很久才重回,那大大下落了网址的吞吐量。上边将讲述Django的请求处理差不多流程(图片源于互联网):

美高梅开户网址 1

恳请进度差不多表达:浏览器发起呼吁–>请求处理–>请求经过中间件–>路由映射–>视图处理工作逻辑–>响应请求(template或response)

网上有不计其数celery +
django达成定时任务的科目,然则它们大多数是依照djcelery + celery三的;
可能是运用django_celery_beat配置较为麻烦的。

全体演示均依据Django二.0

Celery
是一个粗略、灵活且保证的,处理大量音讯的分布式系统,并且提供维护这么三个类别的不能缺少乏工人具。
它是二个在意于实时处理的天职队列,同时也支撑任务调度。
如上是celery本身官网的介绍

2、配置使用

  celery很简单集成到Django框架中,当然若是想要完成定时职责的话还索要设置django-celery-beta插件,后边会表明。需求小心的是Celery肆.0只支持Django版本>=1.八的,假使是自愧比不上一.八本子须要动用Celery三.一。

明显简洁而高速才是大家最后的求偶,而celery四已经不须求非凡插件即可与django结合完成定时职务了,原生的celery
beat就足以很好的完成定时职分效能。

celery是2个基于python开发的不难、灵活且有限帮助的分布式职分队列框架,扶助选用职务队列的主目的在于分布式的机器/进度/线程上进行任务调度。选取独立的生产者-消费者模型,首要由三片段构成:

celery的利用场景很普遍

配置

  新确立项目taskproj,目录结构(各类app下多了个tasks文件,用于定义任务):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在项目目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

进入项指标taskproj目录运转worker:

celery worker -A taskproj -l debug

理所当然使用原生方案的同时有几点插件所推动的功利被我们放弃了:

  • 音讯队列broker:broker实际上便是四个MQ队列服务,可以采纳redis、rabbitmq等作为broker
  • 处理职责的消费者workers:broker通告worker队列中有任务,worker去队列中取出职务执行,每二个worker正是多个进程
  • 积存结果的backend:执行结果存储在backend,默许也会储存在broker使用的MQ队列服务中,也足以独立安排用何种服务做backend
  • 拍卖异步职务
  • 任务调度
  • 拍卖定时任务
  • 分布式调度

概念与触发职分

  职务定义在各样tasks文件中,app0一/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

视图中触发职责

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

美高梅开户网址 2

 若想得到职务结果,能够由此task_id使用AsyncResult获取结果,还足以一向通过backend获取:

美高梅开户网址 3

 

  • 插件提供的定时职分管理将不在可用,当大家只须要职分定期执行而不须要人工资调整度的时候这一点忽略不计。
  • 没辙急速的田管或追踪定时职分,定时义务的跟踪其实交给日志更合理,然而对职务的修改就不曾那么便宜了,可是只要不供给常常改变/增减职分的话那点也在可承受范围内。

美高梅开户网址 4

利益也很多,尤其在利用python营造的利用类别中,无缝衔接,使用一定便利。

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还足以应用Django的orm作为结果存款和储蓄,当然要求设置重视插件,那样的利益在于大家可以直接通过django的数据查看到职责情状,同时为能够制定越多的操作,上面介绍怎么样选拔orm作为结果存储。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

伍.改动数据库

python3 manage.py migrate django_celery_results

这儿会看出数据库会多创设:

美高梅开户网址 5 当然你有时候供给对task表实行操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定时职务布署

在展开布署前先来看看项目组织:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

里头news是大家的app,用于从局地rss订阅源获取情报音讯,linux_news则是我们的project。大家需求关注的要紧是celery.py,settings.py,tasks.py和start-celery.sh。

第二是celery.py,想让celery执行义务就务须实例化3个celery
app,并把settings.py里的布置传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

安插就是那般简单,为了能在django里使用这几个app,大家必要在__init__.py中程导弹入它:

from .celery import app as celery_app

下一场大家来看tasks.py,它应当放在你的app目录中,前边大家配备了自行发现,所以celery会自动找到那么些tasks,大家的tasks将写在那壹模块中,代码涉及了某些orm的选用,为了顺应核心笔者做了些简洁明了:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是局地耗费时间操作,比如互连网IO恐怕数据库读写,因为大家不珍贵职责的重返值,所以利用@app.task(ignore_result=True)将其屏蔽了。

职责计划落成后大家即将安插celery了,大家选择redis作为任务队列,笔者强烈建议在生养环境中利用rabbitmq也许redis作为天职队列或结果缓存后端,而不该选取关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

下一场是咱们的定时职责设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定时职分布署对象是2个dict,由职分名和安排项整合,首要安顿想如下:

  • task:任务函数所在的模块,模块路径得写全,不然找不到将不可能运营该职责
  • schedule:定时策略,1般选择celery.schedules.crontab,上边例子为每小时的0分执行2遍职分,具体写法与linux的crontab类似能够参见文书档案表明
  • args:是个元组,给出职分急需的参数,要是不须要参数也得以不写进配置,就像是例子中的一样
  • 其余配置项较少用,能够参考文书档案
    迄今截至,配置celery beat的有的就得了了。

异步职分

Celery

三、Django中使用定时职分

  要是想要在django中使用定时职务成效雷同是靠beat完毕职务发送成效,当在Django中使用定时职分时,供给设置django-celery-beat插件。以下将介绍使用进程。

启动celery beat

配备实现后只须求运转celery了。

开发银行在此以前安插一下环境。不要用root运营celery!不要用root运转celery!不要用root运维celery!主要的事务说二回。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

分布式任务队列学习笔记,执行异步义务和定时任务。-A 表示app所在的目录,-B表示运行celery beat运营定时职务。
celery不荒谬运维后就足以通过日记来查阅职务是或不是健康运维了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

以上便是celery肆运营定时职责的剧情,如有错误和疏漏,欢迎指正。

自身的异步使用境况为品种上线:前端web上有个上线按钮,点击按钮后发请求给后端,后端执行上线进程要六分钟,后端在收取到请求后把职责放入队列异步执行,同时马上回到给前端二个职务执行中的结果。若果未有异步执行会怎样呢?同步的状态就是实践进程中前端平昔在等后端重临结果,页面转呀转的就转超时了。

安装

安装配置

1.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

三.数据库变更

python3 manage.py migrate django_celery_beat

4.各自运营woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

陆.开立用户

python3 manage.py createsuperuser 

7.登录admin实行政管理制(地址

美高梅开户网址 6

 

 使用示例:

美高梅开户网址 7

 

 

 

 

美高梅开户网址 8

 

 

 查看结果:

美高梅开户网址 9

 

异步职务计划

安装Celery

引入应用pip安装,借使你选拔的是虚拟环境,请在虚拟环境里设置

$ pip install celery

一遍开发

  django-celery-beat插件本质上是对数据库表变化检查,一旦有数量库表改变,调度珍视新读取任务展开调度,所以一旦想自身定制的职分页面,只须求操作beat插件的肆张表就能够了。当然你还足以友善定义调度器,django-celery-beat插件已经松手了model,只必要展开导入便可开始展览orm操作,以下我用django
reset api实行出现说法:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

美高梅开户网址 10

 

一.安装rabbitmq,那里大家利用rabbitmq作为broker,安装到位后私下认可运营了,也不须要此外任何配置

安装音讯中间件

Celery 援助 RabbitMQ、Redis 甚至其余数据库系统作为其新闻代理中间件

你指望用哪些中间件和后端就请自行设置,一般都应用redis只怕RabbitMQ

# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下行使apt-get命令就能够

$ sudo apt-get install redis-server

1经您利用redis作为中间件,还索要设置redis援助包,同样利用pip安装即可

$ pip install redis

能现身以下结果即为成功

redis 127.0.0.1:6379>

任何的redis知识那里不左介绍,借使有趣味,能够自行理解

借使您接纳RabbitMQ,也请安装RabbitMQ

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server
# pip3 install celery

使用Celery

三.celery用在django项目中,django项目目录结构(简化)如下

一句话来说直接行使

能够在急需的地方一向引进Celery,直接选拔即可。最简易的艺术只要求配置三个职责和中间人即可

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

本人那里运用了redis作为中间件,那是足以按自个儿的习惯替换的

鉴于私下认可的陈设不是最符合大家的类型实际上要求,壹般的话我们都亟待按大家团结的必要安顿部分,
然而由于供给将项目解耦,也好维护,咱们最佳应用单独的叁个文本编写配置。

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

独自布置配置文件

比地点的多少复杂一点,大家要求成立四个公文,3个为config.py的celery配置文件,在里面填写适合大家项目标布局,在开创3个tasks.py文件来编排大家的天职。文件的名字能够按您的喜好团结取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

陈设好以往能够用以下命令检查布置文件是不是科学(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

还有1种同等设置配置的秘诀,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先须要用以上措施批量创新配备文件。

4.创建 website/celery.py 主文件

在采用上选用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

运营celery只须要在proj同级目录下:

$ celery -A proj worker -l info
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

我们的django的品类的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中应用celery,我们率先必要在django中配置celery

大家需求在与工程名同名的子文件夹中添加美高梅开户网址 ,celery.py文件
在本例中也正是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

下一场大家须求在同级目录下的**init.py文本中安插如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

下一场我们就能够把需求的天职放到供给的app下的tasks.py中,现在项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

或是的二个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器能够让您创制task不须要app实体

在要求的地点调用相关义务即可,例如在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

下一场就足以运转项目,celery供给独自运转,所以必要开两个极端,分别

开行web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

下一场访问浏览器就足以在开发银行celery的终极中看到输出

美高梅开户网址 11

测试结果

5.在 website/__init__.py
文件中加进如下内容,确认保障django运维的时候这一个app可以被加载到

扩展

  • 1旦您的类型必要在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat

$ pip install django-celery-beat

绝不在采用django-celery,这么些类型已经终止更新好好多年。。。。

  1. 在settings.py中加上这几个app

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 一路一下数据库

$ python manage.py migrate
  1. 设置celery
    beat
    服务使用django_celery_beat.schedulers:DatabaseScheduler
    scheduler

$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

下一场在就可以admin界面看到了。

  • 比方您想选拔Django-O中华VM大概Django
    Cache作为后端,供给设置django-celery-results扩大(作者不提议)
  1. 使用pip安装django-celery-results

$ pip install django-celery-results

决不在使用django-celery,那些项目曾经甘休更新好好多年。。。。

  1. 在settings.py中增进那些app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 联合一下数据库

$ python manage.py migrate django_celery_results
  1. 配置后端,在settings.py中布局

# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

主干使用大约正是上述那一个,别的实际配置和平运动用还需协调查商讨读法定文档

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个体见解,如有错误或建议请立刻调换作者
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

陆.各使用创设tasks.py文件,那里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x + y

在意tasks.py必须建在各app的根目录下,且不得不叫tasks.py,不能够随随便便命名

7.views.py中引用使用这些tasks异步处理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

九.这么在调用post这几个法马时,里边的add就能够异步处理了

定时职责

定时任务的行使景况就很广泛了,比如笔者急需定时发送报告给老板娘~

定时任务安顿

  1. website/celery.py 文件添加如下配置以支撑定时职务crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了八个task:

  • 名叫’sum-task’的task,每20秒执行贰遍add函数,并传了两个参数五和陆
  • 名叫’send-report’的task,周周一深夜4:30推行report函数

timedelta是datetime中的一个指标,须要 from datetime import timedelta
引进,有如下多少个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件添加report方法:
@shared_task
def report():
  return 5

三.开发银行celery
beat,celery运维了贰个beat进度向来在不断的判定是还是不是有任务急需执行

# celery -A website beat -l info

Tips

一.比方你同时使用了异步职务和陈设任务,有一种更简单的运维情势
celery -A website worker -b -l info ,可同时开动worker和beat

二.借使选择的不是rabbitmq做队列那么供给在主配置文件中 website/celery.py
配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

三.celery无法用root用户运营的话需求在主配置文件中添加
platforms.C_FORCE_ROOT = True

4.celery在长日子运作后可能现身内部存款和储蓄器泄漏,供给加上配置
CELERYD_MAX_TASKS_PER_CHILD = 10
,表示每一个worker执行了有个别个义务就死掉

以上就是本文的全体内容,希望对大家的读书抱有帮衬,也愿意大家多多接济脚本之家。

您只怕感兴趣的文章:

  • 异步任务队列Celery在Django中的应用办法
  • Django使用Celery异步职责队列的运用
  • Django中使用celery达成异步任务的以身作则代码

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图