Celery

2020 年 06 月 04 日 • 阅读数: 158

Clelery

简介

  • Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
  • 专注于实时处理的异步任务队列
  • 同时也支持任务调度

基本结构

  • 首先任务是由一个生产者(producer)来产生
  • 任务也可以通过调度器(celery beat)来调度产生
  • 然后将产生的任务丢到任务队列中(队列一般是放在RabbitMQ 或者Redis)
  • 然后启动消费者(consumer)也就是一个worker来对任务进行处理(它的内部是以协程的方式实现的并发)
  • 最终可以将结果存储到数据库中(同样可以使用RabbitMQ 或者Redis)

使用场景

  • 异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信/邮件、消息推送、音频视频处理等等
  • 定时任务:类似 crontab,比如每日数据统计

安装配置

Celery和普通的Python包安装一样,同时我们还需要使用到消息队列,这里使用的是Redis

pip insatll celery[redis]

celery4.1以后版本对win10似乎不兼容,这里需要在安装一个库来在win10上运行celery

pip install eventlet

Demo

我们创建一个Celery应用,命名为 tasks.py

import time
from celery import Celery


broker = 'redis://127.0.0.1:6379/1' # 注意这里不能用localhost,反正我用了报错了 ╯︿╰
backend = 'redis://127.0.0.1:6379/2'
app = Celery('tasks', broker=broker, backend=backend)


@app.task
def add(x, y):
    print('enter call func...')
    time.sleep(3)
    return x+y

在创建一个生成 task 的应用,命名为 app.py

from tasks import add


if __name__ == '__main__':
    print('start task...')
    result = add.delay(2, 8)
    print('end task...')
    print(result)

运行 app.py 它会向Redis中推入一个任务(你也可以多推入几个任务),但是这个任务这时候并没有执行

start task...
end task...
d0bdebae-6851-462a-818b-1ca3810129c1

只有当我们启动 worker 或者调度任务的时候它才正真的执行(-P eventlet 解决win10兼容性问题)

celery -A tasks worker -l info -P eventlet

正常情况下,最后它会输出执行结果,并保存到指定的Redis中

Task tasks.add[d0bdebae-6851-462a-818b-1ca3810129c1] succeeded in 3.0s: 10

我们也可以直接在命令行模式中向Redis推入任务

In [1]: from tasks import add
In [2]: add.delay(6, 18)
Out[3]: <AsyncResult: bc820010-13df-4fd4-ad11-634024b42384>

然后可以看到在celery的终端中输出了结果

Task tasks.add[bc820010-13df-4fd4-ad11-634024b42384] succeeded in 3.0s: 24

小栗子

上面我们直接将所有的配置,task,celery都写在一个文件中,一般来说并不推荐这么做

我们可以创建一个Python包 celery_app

目录结构如下

|scelery_app
|--__init__.py
|--celery_config.py
|--task1.py
|--task2.py

init 文件中我们声明一个Celery对象

from celery import Celery


app = Celery('demo')
# 通过Celery实例加载配置模块
app.config_from_object('celery_app.celery_config')

在上面的文件中,我们加载了一个配置文件,所以在下面,我们编写一下这个配置文件

BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'

# 导入指定的任务模块
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2',
)

编辑两个任务模块

task1:

import time

from celery_app import app



@app.task
def add(x, y):
    time.sleep(3)
    return x + y

task2:

import time

from celery_app import app


@app.task
def multiply(x, y):
    time.sleep(4)
    return x * y

先启动celery的worker监听

celery -A celery_app worker -l info -P eventlet

改写 app.py ,让它调用 celery_app 中的任务(也可以直接在Shell模式下启用)

from celery_app import task1
from celery_app import task2


print('start......')
task1.add.delay(2, 4)
task2.multiply.delay(4, 5)
print('end......')

正常情况下可以看到以下输出

Received task: celery_app.task1.add[d4a3e532-898f-442c-94b8-280f0b24d606]
Received task: celery_app.task2.multiply[bcb3b589-4246-4525-87c5-8f94f3ee6205]
Task celery_app.task1.add[d4a3e532-898f-442c-94b8-280f0b24d606] succeeded in 3.0s: 6
Task celery_app.task2.multiply[bcb3b589-4246-4525-87c5-8f94f3ee6205] succeeded in 4.0s: 20

定时任务

在配置文件中配置定时任务

from datetime import timedelta

from celery.schedules import crontab


# 定时任务
CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'celery_app.task1.add',
        'schedule': timedelta(seconds=10), # 每10秒执行一次
        'args': (2, 8)
    },
    'task2': {
        'task': 'celery_app.task2.multiply',
        'schedule': crontab(hour=18, minute=10), # 设定一个特定的时间执行
        'args': (2, 8)
    }
}

启动celery的调度器来对任务进行调度

celery -A celery_app beat -l info

启动celery的worker来执行任务

celery -A celery_app worker -l info -P eventlet
标签: Celery任务队列Python
添加评论
评论列表
没有更多内容