Django集成Celery

2020 年 06 月 04 日 • 阅读数: 168

Django集成Celery

简介

Django本身是一个同步阻塞的Web框架,不建议直接在它的内部编写异步代码,如果要执行异步任务,建议使用celery,可以直接使用celery,也可以使用django-celery

安装

这里直接使用django-celery,它是Django对celery的一层简单的封装,将celery的命令集成到Django的manage.py中,方便我们调用(这里还需要一个消息队列,这里使用Redis)

pip install django-celery
pip install redis

配置

我们先创建一个Django的App,这里命名为 course ,在里面创建一个 tasks.py 的文件,任务的逻辑代码都写在这个里面

settings.py 的同级目录下创建celery的配置文件 celery_config.py

先来编写这个配置文件

from datetime import timedelta

import djcelery

djcelery.setup_loader()

# 连接Redis
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'

# 自定义队列
CELERY_QUEUES = {
    'beat_tasks': {
        'exchange': 'beat_tasks',
        'exchange_type': 'direct',
        'binding_key': 'beat_tasks'
    },
    'work_queue': {
        'exchange': 'work_queue',
        'exchange_type': 'direct',
        'binding_key': 'work_queue'
    }
}

# 默认使用的队列
CELERY_DEFAULT_QUEUE = 'work_queue'

# 导入任务模块
CELERY_IMPORTS = (
    'course.tasks',
)

# 设置定时任务,并指定队列
CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'course_task_add',
        'schedule': timedelta(seconds=10),
        'options': {
            'queue': 'beat_tasks'
        },
        'args': (2, 8)
    }
}

# 有些情况下可以防止死锁
CELERY_FORCE_EXECV = True

# 设置worker的并发数
CELERY_CONCURRENCY = 4

# 允许任务重试
CELERY_ACKS_LATE = True

# 每个worker最多执行100个任务,可以防止内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 100

# 设置单个任务的超时时间
CELERY_TASK_TIME_LIMIT = 5 * 60

修改 settings.py

导入celery配置文件中的所有项,在 INSTALLED_APPS 中添加我们创建的App和 djcelery

from amor.celery_config import *

INSTALLED_APPS = [
    ...
    'course',
    'djcelery',
]

编写任务

在 course 目录中的 tasks.py 中,我们编写任务逻辑代码

注: 以前我们都是通过函数和装饰器来编写任务的,其实我们也可以使用类的方式,这种方式中,我们需要继承 Task 且必须实现一个 run 方法,以及指定一个类参数 name

import time
from celery.task import Task


class CourseTask(Task):

    name = 'course_task'

    def run(self, *args, **kwargs):
        print('start course task')
        time.sleep(4)
        print('args={}, kwargs={}'.format(args, kwargs))
        print('end course task')


class CourseTaskAdd(Task):

    name = 'course_task_add'

    def run(self, x, y):
        print('start course task')
        time.sleep(4)
        print('end course task')
        return x + y

生产任务

作为一个Web应用,做基本的就是处理请求,这里我们编写一个简单的请求,在请求中,我们向消息队列中推入一个任务,然后立即响应请求,而任务交给celery的worker去异步的执行

编写 course 下的 views.py

from django.http import JsonResponse

from course.tasks import CourseTask


def do(request):
    # 执行异步任务
    print('start do request')
    CourseTask.delay()
    print('end do request')
    return JsonResponse({'code': 200, 'result': 'OK'})

编写根 urls.py

from django.urls import path
from course import views

urlpatterns = [
    path('do/', views.do, name='do')
]

执行任务

# 启动Django服务器
python manage.py run server
# 启动celery的worker监听并执行任务
python manage.py celery worker -l INFO
# 启动celery的定时任务调度
python manage.py celery beat -l INFO
标签: DjangoCelery
添加评论
评论列表
没有更多内容