🚀 Celery 完整学习指南
Part 1: 什么是 Celery?(简介与核心概念)
Celery总体简要概括:
celery是一个独立于你FastAPI的web服务的一个服务,其中celery是采用生产者和消费者的方式来实现将web服务端提交任务作为生产者传输给消息中间件(通常是Redis/RabbitMQ-消息队列),再由celery的worker来作为消费者将消息中间件中的任务进行异步执行。celery也可以执行具体的定时任务,例如定时清理日志、调整数据库存储等等。
1. 简介
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息。它主要专注两个领域:
- 实时操作 (Real-time processing):比如用户注册后立即发送邮件,但不想让用户在网页上转圈等待。
- 任务调度 (Task scheduling):比如每天凌晨 2 点生成数据报表。
一句话总结:Celery 帮你把“耗时”的任务从主程序(如 Django/FastAPI)中剥离出来,扔给后台默默执行。
2. celery在现实中的类比
为了方便理解,我们可以把 Celery 架构比作一个繁忙的餐厅:
- Producer (你的 Web 代码):就像服务员。
- 职责:接收客人的需求,记在小票上,扔到后厨,然后立刻去接待下一位客人(异步,不阻塞)。
- Broker (消息中间件):就像后厨的订单栏/挂票钩。
- 职责:暂存所有的任务单。如果不存下来(持久化),断电了订单就丢了。
- 常用组件:Redis(速度快,推荐) 或RabbitMQ(稳健,企业级)。
- Worker (消费者):就像厨师。
- 职责:盯着订单栏,一旦有单子就拿走去做。厨师可以有一个,也可以有100个(分布式扩展)。
- Result Backend (结果存储):就像传菜台/出餐口。
- 职责:厨师做完菜,把结果放在这里,等待服务员来查验或端走。
- 常用组件:Redis, MySQL。
Part 2: 分阶段实战学习计划
📅 第一阶段:环境搭建与 Hello World
目标:跑通第一个异步任务,解决环境报错。
1. 安装你需要安装 Celery 和 Redis 的驱动(假设你已经安装了 Redis 数据库)。
pip install "celery[redis]"2. 编写 Worker 代码 (tasks.py)这是“厨师”的手册,定义了能做什么菜。
# tasks.py from celery import Celery import time # 初始化 Celery 应用 # broker: 任务存到 Redis 的 0 号数据库 # backend: 结果存到 Redis 的 1 号数据库 app = Celery('demo_project', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1') # @app.task 装饰器将普通函数转换为 Celery 任务 @app.task def add(x, y): print(f"开始计算 {x} + {y} ...") time.sleep(3) # 模拟耗时 3 秒 return x + y3. 启动 Worker打开终端(Terminal),运行以下命令启动“厨师”:
# Linux / Mac celery -A tasks worker --loglevel=info # Windows (注意:Windows 下必须加 -P eventlet 或 -P solo,否则可能卡死) # pip install eventlet celery -A tasks worker --loglevel=info -P eventlet4. 编写 Producer 代码 (main.py)这是“服务员”的操作,负责派单。
# main.py from tasks import add import time print("1. 接收到用户请求...") # 使用 .delay() 发送任务,这是最简单的调用方式 # 这行代码瞬间执行完毕,不会等待 3 秒 task = add.delay(4, 6) print(f"2. 任务已发送,任务ID: {task.id}") print("3. 主程序继续做其他事情,不被阻塞...") # 如果你需要结果,可以去后端查询 (实际业务中很少在这里死等) # while not task.ready(): # time.sleep(0.5) # print("等待结果中...") # print(f"4. 拿到结果: {task.get()}")📅 第二阶段:常用配置与容错处理
目标:掌握任务重试、延时执行。这是生产环境必备技能。
代码更新 (tasks.py)
@app.task(bind=True, max_retries=3) # bind=True 允许访问 self def send_email(self, email): try: print(f"尝试给 {email} 发送邮件...") # 模拟一个随机网络错误 import random if random.choice([True, False]): raise Exception("SMTP 连接断开") return "发送成功" except Exception as exc: print("发送失败,3秒后重试...") # countdown=3: 等待3秒再重试 # exc=exc: 记录原本的错误信息 raise self.retry(exc=exc, countdown=3)调用方式 (main.py)
from tasks import send_email # 场景1:立即重试演示 send_email.delay("user@example.com") # 场景2:延时任务 (比如:用户注册10分钟后发送关怀邮件) # countdown=600 (秒) send_email.apply_async(args=["user@example.com"], countdown=600)📅 第三阶段:工作流编排 (Workflow)
目标:处理复杂的依赖关系。
代码示例在main.py中尝试以下高级编排:
from celery import chain, group from tasks import add # 1. 链式调用 (Chain) # 逻辑:(4 + 4) -> 结果再 + 10 # add(4,4) 的返回值会自动传给下一个 add 作为第一个参数 print("执行链式任务...") chain_task = chain(add.s(4, 4) | add.s(10)) res = chain_task.apply_async() print(res.get()) # 输出 18 # 2. 并行调用 (Group) # 逻辑:同时计算 1+1, 2+2, 3+3 print("执行并行任务...") group_task = group(add.s(i, i) for i in range(1, 4)) res = group_task.apply_async() print(res.get()) # 输出 [2, 4, 6]📅 第四阶段:定时任务 (Celery Beat)
目标:取代系统的 Crontab。
你需要配置beat_schedule并启动一个单独的 Beat 进程。
配置 (tasks.py增加配置)
from celery.schedules import crontab app.conf.beat_schedule = { 'every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, # 每30秒执行一次 'args': (16, 16) }, 'every-morning-8am': { 'task': 'tasks.send_email', 'schedule': crontab(hour=8, minute=0), # 每天早上8点 'args': ('admin@boss.com',) }, }运行你需要开两个终端窗口:
- 启动 Worker:
celery -A tasks worker ... - 启动 Beat:
celery -A tasks beat
Part 3: 巩固与思考题
为了确保你真正掌握,请尝试回答以下问题:
- 关于 Broker 的持久化:
- 如果在 Worker 关闭的情况下,Producer 发送了 10 个任务。当你 1 小时后启动 Worker,这 10 个任务会执行吗?为什么?
- 提示:这取决于你用 Redis 还是 RabbitMQ 以及它们的配置。
- 关于代码变更:
- 如果你修改了
tasks.py里的代码,但是没有重启 Worker 进程,再次发送任务时,执行的是新代码还是旧代码? - 提示:Python 内存加载机制。
- 如果你修改了
- 关于参数传递:
- 我有一个 50MB 的大字典数据需要处理,我应该直接
task.delay(big_dict)传进去,还是应该把数据存数据库,只传task.delay(data_id)?为什么? - 提示:序列化开销与 Broker 压力。
- 我有一个 50MB 的大字典数据需要处理,我应该直接