Skip to content

about

本篇笔记中的所有示例运行环境是:celery-v5.2.6 + python3.9 + Ubuntu2004 + redis

最新的官档:https://docs.celeryq.dev/en/latest/index.html,但遗憾的是,最新的官档是英文的。

celery3.x中文文档:参考:http://docs.jinkan.org/docs/celery/getting-started/index.html

celery4.x中文文档:https://www.celerycn.io/ru-men/celery-jian-jie

Gighub地址:https://github.com/celery/celery/

Celery是一个python第三方模块,是一个功能完备即插即用的分布式异步任务队列框架。它适用于异步处理问题,当大批量发送邮件、或者大文件上传, 批图图像处理等等一些比较耗时的操作,我们可将其异步执行,这样的话原来的项目程序在执行过程中就不会因为耗时任务而形成阻塞,导致出现请求堆积过多的问题。celery通常用于实现异步任务或定时任务。

Celery的特点

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,支持多线程、多进程、协程模式运行,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。

celery的作用

  • 应用解耦,异步处理,流量削锋,消息通讯......

celery运行原理

关于celery的使用说明

  • Celery不建议在windows系统下使用,Celery在4.0版本以后不再支持windows系统,所以如果要在windows下使用只能安装4.0以前的版本,而且即便是4.0之前的版本,在windows系统下也是不能单独使用的,需要安装gevent、geventlet或eventlet协程模块。

    • 来自某celery5.3官档:Celery 是一个资金最少的项目,因此我们不支持 Microsoft Windows。请不要提出与该平台相关的任何问题。
  • celery4.x版本和celery5.x版本的使用是有区别的,所以大家在使用过程中,一定要注意自己用的是celery的哪个版本,才好针对性的去翻阅资料。

  • celery4.x是最后一个支持Python2.7的版本,到了celery5.x,就必须使用Python3.6及更新的版本;celery5.2.x版本需要Python3.7及更新的版本;celery5.3需要Python ❨3.8, 3.9, 3.10, 3.11❩,来自:https://docs.celeryq.dev/en/latest/getting-started/introduction.html

参考视频:

武sirb站的视频:celery4.x

https://www.bilibili.com/video/BV1i24y1m7WW?p=106&vd_source=f56f96c7f7894594fdc04129b7d97ff6

广林的路飞视频:celery5.x

https://www.luffycity.com/play/46032

安装

pip install -U celery==5.2.6 -i  https://pypi.tuna.tsinghua.edu.cn/simple

作为单独的项目使用

首先我有这样一个目录结构:

bash
mycelery/										# 项目根目录
├── add_tasks.py								# 动态添加异步任务和延时任务的
├── celerybeat-schedule.bak						# 这三个是定时任务创建的本地文件
├── celerybeat-schedule.dat
├── celerybeat-schedule.dir
├── get_result.py								# 获取异步任务和延时任务的执行结果
├── logs										# 日志和pid存放目录
│   ├── celery.log
│   └── worker1.pid
├── main.py										# 入口文件,非常重要
├── mytasks										# 我将所有的任务都放到这个目录中了
│   ├── email									# 一个Python包,就是一个同类任务的集合,想要让这里面的任务被发现,需要将这个包路径添加到main.py中
│   │   ├── __init__.py
│   │   └── tasks.py							# 每个任务的Python包内,都必须将任务写在这个叫做tasks.py的文件中,注意,文件名必须是tasks.py
│   ├── __init__.py
│   ├── periodic_tasks.py						# 我将所有的定时任务都放到这个文件中了
│   └── sms										# 另一个任务集合
│       ├── __init__.py
│       └── tasks.py							# 每个任务的Python包内,都必须将任务写在这个叫做tasks.py的文件中,注意,文件名必须是tasks.py
├── settings.py									# 整个celery项目的配置文件
└── utils										# 工具包
    ├── __init__.py
    └── middle.py								# 我自己封装的获取异步任务和延时任务的结果的类所在的文件

注意,celery的相关命令和实现,对目录层级要求非常高,一点搞不对,比如运行命令的路径不对,你都运行不起来相关命令。

所以无论你翻阅任何的资料都要注意这点。

异步任务&&延时任务

创建任务

任务文件名必须是tasks.py。否则celery不识别。

celery任务类型通常有四种:

  1. 没有参数,也没有返回值的任务。
  2. 有参数,但没有返回值的任务。
  3. 没有参数,但有返回值的任务。
  4. 有参数,也有返回结果的任务。

每一个任务都希望有其任务名称,而关于任务名称:

  • 如果我们不手动指定任务名称,那么celery会自动帮我们生成一个任务名,但这个任务名称比较长,它生成名称的规则是项目名.任务包名.tasks.函数名
  • 如果我们手动指定任务名称,就以我们指定的为准。
  • 另外,如果有任务名的话,请保证任务名的唯一性。

上面的四种写法的任务,都可以当做延时任务和定时任务来使用。

上示例

首先是入口函数main,在mycelery/main.py中:

python
import os
import sys
from celery import Celery

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# print(BASE_DIR)
sys.path.insert(0, BASE_DIR)

# 实例化celery应用,参数一般为项目应用名
app = Celery("Worker1")

# 通过app实例对象加载配置文件
app.config_from_object("settings")

# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
app.autodiscover_tasks(["mytasks.sms", ])  # 列表中填写你的一个或者多个任务

没完!还有配置文件,在mycelery/settings.py中,添加:

python
# 任务队列的链接地址
broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/15'

# 更改时区为亚洲上海,禁用utc时间
# 其实不设置的话也是默认使用的是操作系统的时区,所以只要操作系统是本地时区,基本上就没啥问题
enable_utc = False
timezone = 'Asia/Shanghai'

创建任务,mytasks/sms/tasks.py

python
from main import app

"""
@app.task(name='send_sms1')
name参数如果不传的话,任务名字则是这样的:
    mytasks.sms.tasks.send_sms1
传值的话,任务名字就按你传的来
任务名有啥用,没啥用,就是比较方便观察罢了,可以在任务调用时,通过如下方式获取任务名字
    send_sms1.__name__
"""
@app.task(name='send_sms1')   # 必须加上装饰器
def send_sms1():
    print('send_sms1执行了.....')


@app.task(name="send_sms2")
def send_sms2(mobile, code):
    """有参数的异步任务"""
    print(f'任务:send_sms2执行了...mobile={mobile}, code={code}')

@app.task(name='send_sms3')
def send_sms3():
    """有结果的异步任务"""
    print('任务:send_sms3执行了...')
    return 100

@app.task(name="send_sms4")
def send_sms4(x, y):
    """有结果的异步任务"""
    print('任务:send_sms4执行了...')
    return x + y

写好了一个个任务,并且也已经在main.py中奖任务加载到任务列表中了。

打开终端,注意终端路径必须在项目根目录下执行下面的命令,否则命令不生效或者报错,在mycelery根目录下:

bash
# 前台启动,阻塞终端
celery -A main worker --loglevel=info
celery -A main worker -l info

# ps aux|grep celery
# ps -ef|grep celery



# 启动多工作进程,以守护进程的模式运行[按CPU核数+1创建进程数]
# 注意:pidfile和logfile必须以绝对路径来声明
celery multi start worker -A main -E --pidfile="/home/moluo/Desktop/mycelery/logs/worker1.pid" --logfile="/home/moluo/Desktop/mycelery/logs/celery.log" -l info -n worker1
celery multi start worker -A main -E --pidfile="/home/moluo/Desktop/mycelery/logs/worker2.pid" --logfile="/home/moluo/Desktop/mycelery/logs/celery.log" -l info -n worker2

# 关闭运行的工作进程
celery multi stop worker -A main --pidfile="/home/moluo/Desktop/mycelery/logs/worker1.pid" --logfile="/home/moluo/Desktop/mycelery/logs/celery.log"
celery multi stop worker -A main --pidfile="/home/moluo/Desktop/mycelery/logs/worker2.pid" --logfile="/home/moluo/Desktop/mycelery/logs/celery.log"
celery -A main beat -l info

调用任务

注意:

想要调用任务的话,就必须先启动celery的工作进程,如果没有启动,就开始运行执行任务的代码,则任务的状态一直是PENDING

想要调用任务,也就是让任务队列中的任务开始执行,有几种方式:

  1. 异步任务,就是从任务队列中,放到执行队列中,如果执行队列中有未执行完的任务,你的任务就要排队等待执行,如果执行队列队列为空,就会立即执行你的任务。
  2. 延时任务,你可以指定你的任务在多少秒之后开始执行。需要注意的是,如果你启动了这个定时任务后,但这个定时任务还没有开始执行,你就获取这个任务的执行结果,那么此时是会阻塞的,就是等待这个定时任务到时间执行,然后直到返回结果。
  3. 定时任务,就是每隔多少时间执行一次,比如每小时执行一次。

为了方便后续查询任务结果,我这里自定义一个根据任务id查询任务结果的类。

mycelery/utils/middle.py

python
import redis


pool = redis.ConnectionPool(
    host="127.0.0.1",
    port=6379,
    password="",
    db=14,
    max_connections=100,
    decode_responses=True  # 如果使用这个参数,那么必须在连接池中声明
)


class Md(object):
    """
    通常,添加一个任务之后,都会返回一个UUID作为任务id,我们就可以根据这个任务id来获取该任务的执行结果,但通常任务id不方便记录,我干脆就写了
    这个类,主要就是想着任务名字好记,能根据任务名,获取到其对应的任务id
    在动态添加异步任务和延时任务时,都需要将添加的任务名字和任务id添加到Redis中,将来通过此类查询指定任务的任务结果
    """
    # Redis中的hash数据的key
    default_hash = "result_hash"

    def __init__(self):
        self.conn = redis.Redis(connection_pool=pool)

    def add_id(self, task_id, task_name):
        """
        动态添加一个任务,都需要将任务id和任务名传递进来,报错到redis中
        :param task_id: 
        :param task_name: 
        :return: 
        """
        self.conn.hset(name=self.default_hash, key=task_name, value=task_id)

    def add_ids(self, mapping):
        """
        以字典的形式批量添加多个任务
        :param mapping: 
        :return: 
        """
        self.conn.hset(name=self.default_hash, mapping=mapping)

    def get_id(self, task_name=None):
        """
        根据任务名返回其任务id,方便后续根据任务id获取其执行结果
        :param task_name: 
        :return: 
        """
        return self.conn.hget(self.default_hash, task_name)

    def get_all(self):
        """
        返回hash中所有的任务
        :return: 
        """
        return self.conn.hgetall(self.default_hash)
    
if __name__ == '__main__':
    # obj = Md()
    # obj.add_id('111', 'send1')
    # obj.add_id('222', 'send2')
    # print(obj.get_id(task_name='send1'))
    # print(obj.get_id(task_name='send2'))
    # obj.add_ids({"send3": "333", "send4": "444"})
    # print(obj.get_all())
    pass

然后我们就可以在一个添加任务脚本的文件中,直接添加任务即可,注意,这里只添加任务,虽然也能获取任务结果,但是不方便,所以,这里最终实现的方式,就是只添加任务,后续有别的获取结果的脚本来获取任务执行结果。

mycelery/add_tasks.py

python
from time import sleep
from mytasks.sms.tasks import send_sms1, send_sms2, send_sms3, send_sms4
"""
对于没有任何参数也没有返回值的任务,直接通过delay触发执行即可
"""
# result = send_sms1.delay()
# # 获取当前任务的唯一id
# print(result.id)
# # 当前任务的状态
# print(result.status)
# # 获取当前任务的任务名字
# print(send_sms1.__name__)

# 获取任务的执行结果,注意,如果此时还不到任务的执行时间,则会阻塞住,
# 所以,我们通常只有当执行状态为SUCCESS时,才去获取其执行结果
# print(result.get())  # 当任务执行成功之后,获取其返回值

# while True:
#     if result.status == "SUCCESS":
#         print(f'任务{send_sms1.__name__},执行结果:{result.get()}....')
#         break
#     else:
#         print(f'任务{send_sms1.__name__}:{result.status
}....')
#     sleep(1)


"""
对于有参数但没有返回值的任务,直接通过delay并传参触发执行即可,不过要注意传参的数据类型是否符合要求
"""

# result = send_sms2.delay("18211101111", "123456")
# while True:
#     if result.status == "SUCCESS":
#         print(f'任务{send_sms2.__name__},执行结果:{result.get()}....')
#         break
#     else:
#         print(f'任务{send_sms2.__name__}:{result.status}....')
#     sleep(1)


"""
对于没有参数但有返回值的任务,直接通过delay触发执行即可,然后通过点get获取其返回值
"""

# result = send_sms3.delay()
# while True:
#     if result.status == "SUCCESS":
#         print(f'任务{send_sms3.__name__},执行结果:{result.get()}....')
#         break
#     else:
#         print(f'任务{send_sms3.__name__}:{result.status}....')
#     sleep(1)

"""
对于有参数也有返回值的任务,直接通过delay并传参触发执行即可,不过要注意传参的数据类型是否符合要求
然后通过点get获取其返回值
"""

result = send_sms4.delay(x=1, y=10)
while True:
    if result.status == "SUCCESS":
        print(f'任务{send_sms4.__name__},执行结果:{result.get()}....')
        break
    else:
        print(f'任务{send_sms4.__name__}{result.status}....')
    sleep(1)

任务执行的状态:

  • SUCCESS:任务执行成功。
  • PENDING:排队状态,即该任务已经从任务队列提交到工作队列,但此时该任务还没有真正执行。
  • FAILURE:一般是有celery worker部分代码出了问题,导致执行任务并获取结果时出现这个状态。

查询任务结果

mycelery/get_result.py

bash
from celery.result import AsyncResult
from main import app
from utils.middle import Md
md = Md()

# 可以从md对象中,直接拿到所有的任务名和任务id,根据任务id,就可以通过AsyncResult来获取该任务的执行情况
print(md.get_all())
# 根据任务名称获取任务id
print(md.get_id('send_sms4'))  # 5c1d6307-f6ce-4863-b8af-54c8c2ab53ca

# 有可能这个任务被删除了,或者还没有被添加,所以,很可能拿不到这个任务id,所以,要做个判断
send_sms4_id = md.get_id('send_sms4')  # 5c1d6307-f6ce-4863-b8af-54c8c2ab53ca  或者 None
if send_sms4_id:
    # send_sms4 = AsyncResult(md.get_id('send_sms4'), app=app)
    send_sms4 = AsyncResult(md.get_id('send_sms4'), app=app)
    print(send_sms4.status)
    print(send_sms4.result)
    print(send_sms4.task_id)
    print(send_sms4.get())
else:
    print('没有这个任务id')

ok,现在平常一些异步任务都可以这么来处理了。

定时任务

关于轮训的任务,实现起来稍显麻烦。

首先说下没实现的需求:

  1. 获取定时任务的任务id。
  2. 获取定时任务的执行结果。
  3. 在celery程序运行期间,动态添加和删除定时任务。

来看基本用法怎么用吧。

创建定时任务

有可能会创建各式各样的定时任务,所以,我把定时任务都写在了一个单独的文件中了,mycelery/mytasks/periodic_tasks.py

python
# 通过timedelta或者crontab实现定时周期
from datetime import timedelta
from celery.schedules import crontab
# 在下面的字典中添加各种定时任务,一个key对应一个定时任务,key名字随便写
PERIODIC_DICT = {
    # 每几秒执行一次的,可以按照下面的写法
    "task1": {   # 这个名字可以随便起
        "task": "send_email1",   # 这个send_email1是定时任务的函数名,要真实存在,并且添加到了main.py文件中app.autodiscover_tasks的列表中了
        # "schedule": 5,
        "schedule": timedelta(seconds=5),  # 执行周期,每五秒执行一次
        "args": ('xxxx@qq.com', 'oooo@qq.com', 'xxxxoooo'),  # 定时任务需要的参数,支持args和kwargs传参
    },
    # 每几分钟执行一次的,可以按照下面的写法
    "task2": {
        "task": "send_email2",
        "schedule": timedelta(minutes=1),
        "kwargs": {"from_obj": "xxxx@qq.com", "to": "oooo@qq.com", "content": "xxxxoooo"}
    },
}

在main中挂载

mycelery/main.py中挂载上定时任务,为啥要这么做呢,因为定时任务没法动态添加,所以只能提前写好定时任务,然后在main中添加到任务队列中去。注意,每次定时任务的调整,都要重启worker进程。

python
import os
import sys
from celery import Celery
from mytasks.periodic_tasks import PERIODIC_DICT


BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# print(BASE_DIR)
sys.path.insert(0, BASE_DIR)

app = Celery("Worker1",)
app.config_from_object("settings")

# 无论是什么任务,都要在这里任务列表中进行配置,否则发现不了,会报错
app.autodiscover_tasks([
    "mytasks.sms",
    "mytasks.email",  # 添加上定时任务的函数所在的任务包名
])

# 对于周期性的任务,我都写在了mytasks/periodic_tasks.py中的字典中
# 这里要进行加载,后续结合beat命令才能真正的执行成功
# 注意,我暂时没找到如何在单独使用celery时,动态的添加定时任务
# 所以,我只好单独的将定时任务都放到一个字典中,在worker进程启动时就加载上
# 注意,如果新添加或者删除了定时任务,都要重启worker和beat这两个终端命令
app.conf.beat_schedule = PERIODIC_DICT

然后根目录下终端重启worker和启动beat:

bash
# 终端1
celery -A main worker --loglevel=info

# 终端2
celery -A main beat -l info purge

这样两个终端都分别启动了各自的进程之后,你就可以通过日志观察到定时任务在执行了。

crontab函数

timedelta

celery应用于Django项目

环境和版本很重要,如果是windows系统,不用往下看了,我的示例的环境是:

ubuntu20.04 + python3.11 + django5.0.3 + celery5.2.6 + redis5.0.7

目录结构

celery相对还是比较难用的,尤其是执行命令所在的目录,以及各种参数,一点不对就跑不起来,所以,你应该清楚的知道在什么条件下才能执行这些命令。

bash
demo/						# 项目根目录,同时也是项目名称
├── api
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py			# 你的任务文件名必须叫做tasks.py,且必须处于django 的app目录下
│   ├── tests.py
│   └── views.py			# 视图函数无需多说
├── celerybeat-schedule.bak
├── celerybeat-schedule.dat
├── celerybeat-schedule.dir
├── celery_main.py			# celery的主文件,它必须处于项目根目录下,通常叫做celery.py,但我为和模块名进行区分,这里叫做celery_main.py
├── db.sqlite3
├── demo					# 项目同名目录
│   ├── asgi.py
│   ├── __init__.py			# 这个文件也很重要,只要django服务启动,那么就会进行celery初始化动作
│   ├── settings.py			# celery相关的配置文件
│   ├── urls.py
│   └── wsgi.py
├── logs					# 日志目录,需要你手动创建
│   └── celery.log			# celery的相关日志在这个日志文件中
└── manage.py

异步任务和延时任务

这里演示下在项目中如何集成异步任务。

适用于这样的场景:比如客户端发起一个比较耗时的请求,我们试图函数在接收到这个请求之后,直接调用异步任务来完成耗时的工作,而视图函数则给客户端返回一个明确的信号,表示后台正在处理,请稍后获取结果。

异步任务和延时任务

在执行异步任务的时候,可以选择立即执行异步任务和延时一段时间再执行异步任务这两种。

python
# 立即执行
send_sms2.delay()
    
# 延时十秒在执行异步任务
send_sms2.apply_async(countdown=10)

具体的配置过程

前提是已经安装好了Redis,并且可用。

python
"""
在项目根目录下创建celery_main.py
"""
import os
from celery import Celery

# 必须在实例化celery应用对象之前执行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')

# 实例化celery应用对象,名字我一般选择celery_开头再加上项目名,方便区分
app = Celery('celery_demo')

# 指定任务的队列名称,名称也是celery_开头再加上项目名再加上队列名
app.conf.task_default_queue = 'celery_demo_queue'

# 既然是在Django项目中使用,那么配置写在django的settings中了
# 两个参数介绍:
#   'django.conf:settings' 算是固定写法,不用动
#   'namespace='CELERY'  也算是固定写法,目的是在内部会读取Django配置文件中以"CELERY_"开头为celery的配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动根据配置查找django的所有子应用(INSTALLED_APPS列表)下的tasks任务文件
app.autodiscover_tasks()
python
"""
django的配置文件中,务必保证Redis安装好了。
"""
# --------------- 确保app已经注册到了INSTALLED_APPS列表中 ---------------
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'api',
]

# --------------- 时区也很重要 ---------------
# LANGUAGE_CODE = 'en-us'
LANGUAGE_CODE = 'zh-hans'

# TIME_ZONE = 'UTC'
# datetime.datetime.now() - 东八区时间 / datetime.datetime.utcnow() => utc时间
TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

# 影响自动生成数据库时间字段
# USE_TZ = True,创建UTC时间写入到数据库。
# USE_TZ = False,根据TIME_ZONE设置的时区进行创建时间并写入数据库
# USE_TZ = True
USE_TZ = False

# --------------- celery相关的配置 ---------------
# Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
# 任务队列
# CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/14'  # redis有密码
CELERY_BROKER_URL = 'redis://:@127.0.0.1:6379/14'          # redis无密码
# 结果队列
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/15'
# 时区,与django的时区同步
CELERY_TIMEZONE = TIME_ZONE
# 防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量,根据硬件性能自行调整,不太懂就默认200
CELERYD_CONCURRENCY = 200
# 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败记录]
CELERY_ACKS_LATE = True
# 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
CELERYD_MAX_TASKS_PER_CHILD = 500
# 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
CELERYD_TIME_LIMIT = 10 * 60
# 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS = True
# celery的任务结果内容格式
CELERY_ACCEPT_CONTENT = ['json',]

# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    "api-tasks": {  # 定时任务的注册标记符[必须唯一的]
        "task": "send_sms3",   # 定时任务的任务名称
        "schedule": 5,  # 定时任务的调用时间,5表示每隔5秒调用一次add任务
        # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
    }
}
python
"""
现在可以在你各个app目录下创建tasks.py文件了
"""
# -*- coding = utf-8 -*-
from celery import shared_task


@shared_task(name='send_sms1')
def send_sms1(mobile, code):
    """ 带参数的异步任务,返回值的话,根据需要决定 """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms1:已向{mobile}手机发送验证码{code},请注意查收!')
        return {"send_sms1 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"手机号:{mobile},发送短信失败错误: {e}")

@shared_task(name='send_sms2')
def send_sms2():
    """ 无参数的异步任务,返回值的话,根据需要决定 """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms2执行了')
        return {"send_sms2 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"send_sms2执行失败了,错误: {e}")

@shared_task(name='send_sms3')
def send_sms3():
    """
    结合beat实现的定时任务
    一般这类定时任务基本上不带参数的,都是自己写所有逻辑
    返回值的话,根据需要决定
    """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms3执行了')
        return {"send_sms3 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"send_sms3执行失败了,错误: {e}")
        return {"send_sms3 task status": "error", "msg": str(e)}
python
"""
在视图函数中,或者在中间件等需要异步任务的地方,写你的调用异步任务代码
"""
import random
import string
from django.shortcuts import HttpResponse
from .tasks import send_sms1, send_sms2

def index(request):
    code = ''.join(random.sample(string.digits, 4))
    mobile = '182'.join(random.sample(string.digits, 8))
    # 立即触发异步的执行,带参数的直接传参
    send_sms1.delay(mobile, code)

    # 立即触发异步的执行,不带参数的,直接delay()就完了
    send_sms2.delay()
    
    # 延时十秒在执行异步任务
    send_sms2.apply_async(countdown=10)
    return HttpResponse("index ok")
python
"""
快速的配个路由
"""
from django.contrib import admin
from django.urls import path
from api.views import index

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/', index, name='index'),
]
bash
# 注意,必须是在根目录下执行下面的命令
# 启动worker
# 前台启动,日志输出到终端中,关闭终端worker也关闭了
celery -A demo worker -l info

# 前台启动,日志输出到日志中,日志路径填写绝对路径,关闭终端worker也关闭了
celery -A demo worker -l info --logfile="/home/moluo/Desktop/demo/logs/celery.log"

# 以守护进程且是多进程的形式启动,日志输出到日志文件中,终端关闭worker不会关闭
# 但是这种运行模型,一旦停止,需要手动启动。
celery multi start worker -A demo -E --pidfile="/home/moluo/Desktop/demo/logs/worker1.pid" --logfile="/home/moluo/Desktop/demo/logs/celery.log" -l info -n worker1

# 关闭的话就执行下面的命令
celery multi stop worker -A demo --pidfile="/home/moluo/Desktop/demo/logs/worker1.pid" --logfile="/home/moluo/Desktop/demo/logs/celery.log"
bash
# 执行异步任务,按照上面的示例,你可以把django项目跑起来,然后把celery的worker跑起来
# 就可以浏览器访问了http://127.0.0.1:8000/index/,后观察日志文件,就能看到日志文件了

[2024-03-22 13:09:41,386: INFO/MainProcess] Task send_sms1[c430a14f-b867-4a20-a8ce-c2d5b9503b09] received
[2024-03-22 13:09:41,389: WARNING/ForkPoolWorker-7] send_sms1:已向41828182118231820182618251827手机发送验证码5426,请注意查收!
[2024-03-22 13:09:41,391: INFO/ForkPoolWorker-7] Task send_sms1[c430a14f-b867-4a20-a8ce-c2d5b9503b09] succeeded in 0.0020210590009810403s: {'send_sms1 task status': 'ok'}
[2024-03-22 13:09:41,391: INFO/MainProcess] Task send_sms2[b08c6c34-5572-4e7f-b4cf-3dfea9812015] received
[2024-03-22 13:09:41,393: WARNING/ForkPoolWorker-8] send_sms2执行了
[2024-03-22 13:09:41,395: INFO/ForkPoolWorker-8] Task send_sms2[b08c6c34-5572-4e7f-b4cf-3dfea9812015] succeeded in 0.0026413249997858657s: {'send_sms2 task status': 'ok'}


# ---------------------------------------------------------
# 你也可以django的shell中调用异步任务,终端切换到项目根目录下执行,也都会有对应的日志输出
(celerypy) moluo@ubuntu:~/Desktop/demo$ python manage.py shell
Python 3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 18:08:17) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from api.tasks import send_sms2  # 导入异步任务函数
>>> res = send_sms2.delay()          # 立即执行
>>> res.id
'5c301e23-8449-4b5e-937c-4e25b732c868'
>>> res.state
'SUCCESS'
>>> res.result
{'send_sms2 task status': 'ok'}
>>> res = send_sms2.apply_async(countdown=10)  # 延时十秒执行
>>> res.state
'PENDING'
>>> res.state
'PENDING'
>>> res.state
'PENDING'
>>> res.state
'SUCCESS'
>>> res.id
'246cb42e-4863-4c56-afee-d60793772060'
>>> res.result
{'send_sms2 task status': 'ok'}

定时任务

定时任务需要结合beat进程来完成,然后结果由worker进程来处理,也就是说想要使用定时任务,你必须保证beat和worker这两个进程同时运行才能完成。

定时任务的配置跟上面异步任务基本一致,主要修改的点就是settings中定时任务部分的字典。

我这里仍然把所有的步骤都列举一下。

前提是已经安装好了Redis,并且可用。

python
"""
在项目根目录下创建celery_main.py
"""
import os
from celery import Celery

# 必须在实例化celery应用对象之前执行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')

# 实例化celery应用对象,名字我一般选择celery_开头再加上项目名,方便区分
app = Celery('celery_demo')

# 指定任务的队列名称,名称也是celery_开头再加上项目名再加上队列名
app.conf.task_default_queue = 'celery_demo_queue'

# 既然是在Django项目中使用,那么配置写在django的settings中了
# 两个参数介绍:
#   'django.conf:settings' 算是固定写法,不用动
#   'namespace='CELERY'  也算是固定写法,目的是在内部会读取Django配置文件中以"CELERY_"开头为celery的配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动根据配置查找django的所有子应用(INSTALLED_APPS列表)下的tasks任务文件
app.autodiscover_tasks()
python
"""
django的配置文件中,务必保证Redis安装好了。
"""
# --------------- 确保app已经注册到了INSTALLED_APPS列表中 ---------------
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'api',
]

# --------------- 时区也很重要 ---------------
# LANGUAGE_CODE = 'en-us'
LANGUAGE_CODE = 'zh-hans'

# TIME_ZONE = 'UTC'
# datetime.datetime.now() - 东八区时间 / datetime.datetime.utcnow() => utc时间
TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

# 影响自动生成数据库时间字段
# USE_TZ = True,创建UTC时间写入到数据库。
# USE_TZ = False,根据TIME_ZONE设置的时区进行创建时间并写入数据库
# USE_TZ = True
USE_TZ = False

# --------------- celery相关的配置 ---------------
# Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
# 任务队列
# CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/14'  # redis有密码
CELERY_BROKER_URL = 'redis://:@127.0.0.1:6379/14'          # redis无密码
# 结果队列
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/15'
# 时区,与django的时区同步
CELERY_TIMEZONE = TIME_ZONE
# 防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量,根据硬件性能自行调整,不太懂就默认200
CELERYD_CONCURRENCY = 200
# 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败记录]
CELERY_ACKS_LATE = True
# 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
CELERYD_MAX_TASKS_PER_CHILD = 500
# 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
CELERYD_TIME_LIMIT = 10 * 60
# 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS = True
# celery的任务结果内容格式
CELERY_ACCEPT_CONTENT = ['json',]

############################### 上面配置不变,重点就是下面这部分 ###############################
# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    "api-tasks": {  # 定时任务的注册标记符[必须唯一的]
        "task": "send_sms3",   # 定时任务的任务名称
        "schedule": 5,  # 定时任务的调用时间,5表示每隔5秒调用一次add任务
        # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
    }
}
python
"""
定时任务的执行函数仍然写在tasks.py文件中
"""
# -*- coding = utf-8 -*-
from celery import shared_task


@shared_task(name='send_sms1')
def send_sms1(mobile, code):
    """ 带参数的异步任务,返回值的话,根据需要决定 """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms1:已向{mobile}手机发送验证码{code},请注意查收!')
        return {"send_sms1 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"手机号:{mobile},发送短信失败错误: {e}")

@shared_task(name='send_sms2')
def send_sms2():
    """ 无参数的异步任务,返回值的话,根据需要决定 """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms2执行了')
        return {"send_sms2 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"send_sms2执行失败了,错误: {e}")

@shared_task(name='send_sms3')
def send_sms3():
    """
    结合beat实现的定时任务
    一般这类定时任务基本上不带参数的,都是自己写所有逻辑
    返回值的话,根据需要决定
    """
    try:
        # 这里可以写你的具体要干的事
        print(f'send_sms3执行了')
        return {"send_sms3 task status": "ok"}
    except Exception as e:  # 失败的话,写日志还是怎么着都随你
        print(f"send_sms3执行失败了,错误: {e}")
        return {"send_sms3 task status": "error", "msg": str(e)}
python
"""
在视图函数中,或者在中间件等需要异步任务的地方,写你的调用异步任务代码
定时任务只要worker进程和beat进程启动,就会自动执行,所以这个文件不是重点
"""
import random
import string
from django.shortcuts import HttpResponse
from .tasks import send_sms1, send_sms2

def index(request):
    code = ''.join(random.sample(string.digits, 4))
    mobile = '182'.join(random.sample(string.digits, 8))
    # 立即触发异步的执行,带参数的直接传参
    send_sms1.delay(mobile, code)

    # 立即触发异步的执行,不带参数的,直接delay()就完了
    send_sms2.delay()
    
    # 延时十秒在执行异步任务
    send_sms2.apply_async(countdown=10)
    return HttpResponse("index ok")
python
"""
快速的配个路由
定时任务只要worker进程和beat进程启动,就会自动执行,所以这个文件不是重点
"""
from django.contrib import admin
from django.urls import path
from api.views import index

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/', index, name='index'),
]
bash
# 注意,定时任务的需要worker和beat进程同时运行才可以
# 注意,必须是在根目录下执行下面的命令
#  -------------- 启动worker --------------
# 前台启动,日志输出到终端中,关闭终端worker也关闭了
celery -A demo worker -l info

# 前台启动,日志输出到日志中,日志路径填写绝对路径,关闭终端worker也关闭了
celery -A demo worker -l info --logfile="/home/moluo/Desktop/demo/logs/celery.log"

# 以守护进程且是多进程的形式启动,日志输出到日志文件中,终端关闭worker不会关闭
# 但是这种运行模型,一旦停止,需要手动启动。
celery multi start worker -A demo -E --pidfile="/home/moluo/Desktop/demo/logs/worker1.pid" --logfile="/home/moluo/Desktop/demo/logs/celery.log" -l info -n worker1

# 关闭的话就执行下面的命令
celery multi stop worker -A demo --pidfile="/home/moluo/Desktop/demo/logs/worker1.pid" --logfile="/home/moluo/Desktop/demo/logs/celery.log"


#  -------------- 启动beat --------------
# 注意,必须是在根目录下执行下面的命令
# 再打开一个终端,cd到根目录
# 前台启动,日志输出到终端中,关闭终端beat进程也关闭了
celery -A demo beat -l info

# 以守护进程且是多进程的形式启动,日志输出到日志文件中,终端关闭beat进程不会关闭
# 但是这种运行模型,一旦停止,需要手动启动。
nohup celery -A demo beat >/dev/null 2>&1 &

# 关闭的话就执行下面的命令,过滤出进程ID,再kill掉就完了
ps -ef|grep beat  
kill 15454
 
 # 补充,同时杀死worker和beat进程
 pkill -9 celery
bash
# 执行异步任务,按照上面的示例,你可以把django项目跑起来,然后把celery的worker跑起来
# 就可以浏览器访问了http://127.0.0.1:8000/index/,后观察日志文件,就能看到日志文件了

[2024-03-22 13:09:41,386: INFO/MainProcess] Task send_sms1[c430a14f-b867-4a20-a8ce-c2d5b9503b09] received
[2024-03-22 13:09:41,389: WARNING/ForkPoolWorker-7] send_sms1:已向41828182118231820182618251827手机发送验证码5426,请注意查收!
[2024-03-22 13:09:41,391: INFO/ForkPoolWorker-7] Task send_sms1[c430a14f-b867-4a20-a8ce-c2d5b9503b09] succeeded in 0.0020210590009810403s: {'send_sms1 task status': 'ok'}
[2024-03-22 13:09:41,391: INFO/MainProcess] Task send_sms2[b08c6c34-5572-4e7f-b4cf-3dfea9812015] received
[2024-03-22 13:09:41,393: WARNING/ForkPoolWorker-8] send_sms2执行了
[2024-03-22 13:09:41,395: INFO/ForkPoolWorker-8] Task send_sms2[b08c6c34-5572-4e7f-b4cf-3dfea9812015] succeeded in 0.0026413249997858657s: {'send_sms2 task status': 'ok'}


# ---------------------------------------------------------
# 你也可以django的shell中调用异步任务,终端切换到项目根目录下执行,也都会有对应的日志输出
(celerypy) moluo@ubuntu:~/Desktop/demo$ python manage.py shell
Python 3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 18:08:17) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from api.tasks import send_sms2  # 导入异步任务函数
>>> res = send_sms2.delay()          # 立即执行
>>> res.id
'5c301e23-8449-4b5e-937c-4e25b732c868'
>>> res.state
'SUCCESS'
>>> res.result
{'send_sms2 task status': 'ok'}
>>> res = send_sms2.apply_async(countdown=10)  # 延时十秒执行
>>> res.state
'PENDING'
>>> res.state
'PENDING'
>>> res.state
'PENDING'
>>> res.state
'SUCCESS'
>>> res.id
'246cb42e-4863-4c56-afee-d60793772060'
>>> res.result
{'send_sms2 task status': 'ok'}
bash
celery -A demo beat -l info
celery -A demo beat -l info --logfile="/home/moluo/Desktop/demo/logs/celery.log"

连接Redis密码配置

首先Redis的配置文件应该加上:

bash
bind 127.0.0.1 192.168.235.128
requirepass 1234a

其中:

  • bind控制谁能访问,多个IP以空格分割。注意,这里如果要支持远程访问的话,不要bind 0.0.0.0应该将0.0.0.0替换为实际的IP地址192.168.235.128,不然celery运行时会有提示,比较烦人。
  • requirepass控制访问密码。

然后让Redis服务基于这个配置文件运行。

然后celery中就可以连接了:

from celery import Celery

app = Celery('tasks', broker='redis://mast:{}@192.168.235.128:6379'.format('1234a'), backend='redis://mast:{}@192.168.235.128:6379'.format('1234a'))
# mast冒号后面的花括号填入你的Redis密码

flower

参考:https://cloud.tencent.com/developer/article/1758821

bash
pip install flower

终端打开,在根目录下执行:

bash
celery -A main flower

# 手动指定端口
celery -A main flower --port=5555

后续,你正常的启动celery进程,正常的调用任务,那么都可以在flower提供的web终端中观察到这些任务执行信息:

1832669301601992704.png