news 2026/3/13 15:07:09

FastAPI中的异步任务执行-celery

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
FastAPI中的异步任务执行-celery

🚀 Celery 完整学习指南

Part 1: 什么是 Celery?(简介与核心概念)

Celery总体简要概括:

celery是一个独立于你FastAPI的web服务的一个服务,其中celery是采用生产者和消费者的方式来实现将web服务端提交任务作为生产者传输给消息中间件(通常是Redis/RabbitMQ-消息队列),再由celery的worker来作为消费者将消息中间件中的任务进行异步执行。celery也可以执行具体的定时任务,例如定时清理日志、调整数据库存储等等。

1. 简介

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息。它主要专注两个领域:

  • 实时操作 (Real-time processing):比如用户注册后立即发送邮件,但不想让用户在网页上转圈等待。
  • 任务调度 (Task scheduling):比如每天凌晨 2 点生成数据报表。

一句话总结:Celery 帮你把“耗时”的任务从主程序(如 Django/FastAPI)中剥离出来,扔给后台默默执行。

2. celery在现实中的类比

为了方便理解,我们可以把 Celery 架构比作一个繁忙的餐厅:

  • Producer (你的 Web 代码):就像服务员
    • 职责:接收客人的需求,记在小票上,扔到后厨,然后立刻去接待下一位客人(异步,不阻塞)。
  • Broker (消息中间件):就像后厨的订单栏/挂票钩
    • 职责:暂存所有的任务单。如果不存下来(持久化),断电了订单就丢了。
    • 常用组件Redis(速度快,推荐) 或RabbitMQ(稳健,企业级)。
  • Worker (消费者):就像厨师
    • 职责:盯着订单栏,一旦有单子就拿走去做。厨师可以有一个,也可以有100个(分布式扩展)。
  • Result Backend (结果存储):就像传菜台/出餐口
    • 职责:厨师做完菜,把结果放在这里,等待服务员来查验或端走。
    • 常用组件:Redis, MySQL。

Part 2: 分阶段实战学习计划

📅 第一阶段:环境搭建与 Hello World

目标:跑通第一个异步任务,解决环境报错。

1. 安装你需要安装 Celery 和 Redis 的驱动(假设你已经安装了 Redis 数据库)。

pip install "celery[redis]"

2. 编写 Worker 代码 (tasks.py)这是“厨师”的手册,定义了能做什么菜。

# tasks.py from celery import Celery import time # 初始化 Celery 应用 # broker: 任务存到 Redis 的 0 号数据库 # backend: 结果存到 Redis 的 1 号数据库 app = Celery('demo_project', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1') # @app.task 装饰器将普通函数转换为 Celery 任务 @app.task def add(x, y): print(f"开始计算 {x} + {y} ...") time.sleep(3) # 模拟耗时 3 秒 return x + y

3. 启动 Worker打开终端(Terminal),运行以下命令启动“厨师”:

# Linux / Mac celery -A tasks worker --loglevel=info # Windows (注意:Windows 下必须加 -P eventlet 或 -P solo,否则可能卡死) # pip install eventlet celery -A tasks worker --loglevel=info -P eventlet

4. 编写 Producer 代码 (main.py)这是“服务员”的操作,负责派单。

# main.py from tasks import add import time print("1. 接收到用户请求...") # 使用 .delay() 发送任务,这是最简单的调用方式 # 这行代码瞬间执行完毕,不会等待 3 秒 task = add.delay(4, 6) print(f"2. 任务已发送,任务ID: {task.id}") print("3. 主程序继续做其他事情,不被阻塞...") # 如果你需要结果,可以去后端查询 (实际业务中很少在这里死等) # while not task.ready(): # time.sleep(0.5) # print("等待结果中...") # print(f"4. 拿到结果: {task.get()}")

📅 第二阶段:常用配置与容错处理

目标:掌握任务重试、延时执行。这是生产环境必备技能。

代码更新 (tasks.py)

@app.task(bind=True, max_retries=3) # bind=True 允许访问 self def send_email(self, email): try: print(f"尝试给 {email} 发送邮件...") # 模拟一个随机网络错误 import random if random.choice([True, False]): raise Exception("SMTP 连接断开") return "发送成功" except Exception as exc: print("发送失败,3秒后重试...") # countdown=3: 等待3秒再重试 # exc=exc: 记录原本的错误信息 raise self.retry(exc=exc, countdown=3)

调用方式 (main.py)

from tasks import send_email # 场景1:立即重试演示 send_email.delay("user@example.com") # 场景2:延时任务 (比如:用户注册10分钟后发送关怀邮件) # countdown=600 (秒) send_email.apply_async(args=["user@example.com"], countdown=600)

📅 第三阶段:工作流编排 (Workflow)

目标:处理复杂的依赖关系。

代码示例main.py中尝试以下高级编排:

from celery import chain, group from tasks import add # 1. 链式调用 (Chain) # 逻辑:(4 + 4) -> 结果再 + 10 # add(4,4) 的返回值会自动传给下一个 add 作为第一个参数 print("执行链式任务...") chain_task = chain(add.s(4, 4) | add.s(10)) res = chain_task.apply_async() print(res.get()) # 输出 18 # 2. 并行调用 (Group) # 逻辑:同时计算 1+1, 2+2, 3+3 print("执行并行任务...") group_task = group(add.s(i, i) for i in range(1, 4)) res = group_task.apply_async() print(res.get()) # 输出 [2, 4, 6]

📅 第四阶段:定时任务 (Celery Beat)

目标:取代系统的 Crontab。

你需要配置beat_schedule并启动一个单独的 Beat 进程。

配置 (tasks.py增加配置)

from celery.schedules import crontab app.conf.beat_schedule = { 'every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, # 每30秒执行一次 'args': (16, 16) }, 'every-morning-8am': { 'task': 'tasks.send_email', 'schedule': crontab(hour=8, minute=0), # 每天早上8点 'args': ('admin@boss.com',) }, }

运行你需要开两个终端窗口:

  1. 启动 Worker:celery -A tasks worker ...
  2. 启动 Beat:celery -A tasks beat

Part 3: 巩固与思考题

为了确保你真正掌握,请尝试回答以下问题:

  1. 关于 Broker 的持久化
    • 如果在 Worker 关闭的情况下,Producer 发送了 10 个任务。当你 1 小时后启动 Worker,这 10 个任务会执行吗?为什么?
    • 提示:这取决于你用 Redis 还是 RabbitMQ 以及它们的配置。
  1. 关于代码变更
    • 如果你修改了tasks.py里的代码,但是没有重启 Worker 进程,再次发送任务时,执行的是新代码还是旧代码?
    • 提示:Python 内存加载机制。
  1. 关于参数传递
    • 我有一个 50MB 的大字典数据需要处理,我应该直接task.delay(big_dict)传进去,还是应该把数据存数据库,只传task.delay(data_id)?为什么?
    • 提示:序列化开销与 Broker 压力。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/12 16:11:49

英文论文写作工具推荐:8大AI+翻译润色排名+

英文论文写作工具推荐:8大AI翻译润色排名 �� 8款英文论文AI写作工具核心对比 工具名称 核心功能 处理速度 适合场景 独特优势 aibiye 降AIGC率查重 20分钟 学术论文优化 适配知网/维普检测规则 aicheck AIGC检测降重 20分钟 AI…

作者头像 李华
网站建设 2026/2/27 14:22:03

免费论文生成工具排名:8大网站+无水印下载推荐

免费论文生成工具排名:8大网站无水印下载推荐 核心工具对比速览 工具名称 核心功能 处理速度 适用场景 特色优势 aibiye AI论文生成降重 15-30分钟 初稿快速生成 学术术语精准匹配 aicheck AIGC检测降AI率 20分钟 论文合规性优化 知网/维普规则适配 …

作者头像 李华
网站建设 2026/3/8 3:07:49

XSS(跨站脚本攻击)

XSS(跨站脚本攻击) 什么是XSS? XSS(Cross-Site Scripting) 是一种Web安全漏洞,攻击者将恶意脚本注入到其他用户会访问的网页中。 当用户浏览被感染的网页时,恶意脚本会在用户浏览器中执行&…

作者头像 李华
网站建设 2026/2/26 23:04:14

强制式双卧轴搅拌机:型号谱系、核心参数与性能深度剖析!

在混凝土工程领域,强制式双卧轴搅拌机凭借其高效、均匀的搅拌性能,已成为商混站、预制构件厂及大型基建项目的核心设备。小编从行业标准出发,结合工程实践数据,系统梳理双卧轴搅拌机的型号分类、关键参数及性能优化方向&#xff0…

作者头像 李华
网站建设 2026/3/7 16:27:12

信捷XDPLC十轴及以下万能通用程序模板:进制的巧妙运用

信捷XDPLC十轴(包含)及以下万能通用程序模板,用进制在自动化控制领域,信捷XDPLC的应用十分广泛。今天咱就来聊聊基于进制思维打造的信捷XDPLC十轴及以下万能通用程序模板,绝对能给你的PLC编程工作带来不少便利。 一、进制在PLC编程中的重要性…

作者头像 李华