本文将手把手带你搭建FastAPI(后端 API)+ Celery(异步任务队列)+ Redis(消息中间件 / 结果存储)+ Vue(前端)的全栈异步项目,实现异步任务提交、任务状态查询、前端实时查看进度的完整功能,适合处理耗时操作(文件导出、数据处理、邮件发送、批量数据计算、AI 模型推理等大中型业务场景)。在现代 Web 应用开发中,同步处理耗时任务会直接导致接口超时、前端页面卡死、服务器并发能力急剧下降,而基于 Celery 的异步架构能完美解决这些问题。本教程从环境搭建、代码编写、服务启动到前后端联调全流程讲解,代码可直接复制运行,零基础也能快速掌握全栈异步开发核心技能。
一、项目架构总览
本项目采用前后端分离架构,整体分为四层结构,每层职责清晰、低耦合高内聚,符合企业级项目开发规范。
1.1 技术栈分工
- 前端:Vue3 + Axios + Element Plus,负责任务提交界面渲染、异步请求发送、任务状态轮询、进度条实时展示,为用户提供流畅的交互体验
- 后端:FastAPI,高性能 Python Web 框架,基于 Starlette 和 Pydantic 构建,提供 RESTful API 接口,处理前端请求、响应数据、跨域配置,支持异步请求处理
- 异步任务核心:Celery,分布式任务队列,专门处理耗时、非实时的后台任务,支持任务进度更新、异常捕获、结果返回
- 中间件存储:Redis,高性能内存数据库,同时承担两个核心角色:
- Broker:消息代理,存储 Celery 的待执行任务队列
- Backend:结果存储,保存 Celery 任务的执行状态、进度数据和最终返回结果
1.2 完整项目执行流程
- 前端 Vue 页面用户输入任务信息,点击提交按钮,通过 Axios 发送请求到 FastAPI 后端接口
- FastAPI 接收请求后,不执行耗时操作,直接将任务投递到 Celery 任务队列,立即返回唯一任务 ID 给前端
- 前端获取任务 ID 后,启动定时器轮询 FastAPI 的任务状态查询接口
- Celery Worker 监听 Redis 任务队列,自动拉取任务并在后台异步执行
- 任务执行过程中,Celery 实时更新进度信息到 Redis
- 前端通过轮询获取最新任务状态和进度,实时渲染进度条和状态文字
- 任务执行完成 / 失败后,Celery 将最终结果存入 Redis,前端获取结果后停止轮询并展示
整个流程实现了请求与执行分离,后端接口毫秒级响应,前端无阻塞,系统并发能力大幅提升。
二、环境准备
2.1 安装依赖软件
本项目依赖三大基础环境,必须提前安装配置完成:
Python 3.9 及以上版本推荐使用 Python3.9/3.10,兼容性最佳,官网下载:https://www.python.org/downloads/安装时勾选
Add Python to PATH,配置环境变量。Redis 6.0+(核心中间件)Redis 是本项目的核心依赖,Celery 必须依赖 Broker 才能运行,本项目统一使用 Redis 作为 Broker 和 Backend。
- Windows 系统:推荐使用第三方维护版本,下载地址:https://github.com/tporadowski/redis/releases,下载后解压直接运行
redis-server.exe即可启动 - Linux 系统:
sudo apt update && sudo apt install redis-server - Mac 系统:
brew install redis启动后默认端口:6379,无密码(本地开发环境)。
- Windows 系统:推荐使用第三方维护版本,下载地址:https://github.com/tporadowski/redis/releases,下载后解压直接运行
Node.js 16 及以上版本用于运行 Vue3 前端项目,官网下载:https://nodejs.org/安装完成后,打开命令行执行
node -v和npm -v验证是否安装成功。
2.2 创建标准项目目录结构
遵循企业级项目规范,创建清晰的目录结构,方便后续维护和扩展:
plaintext
async_project/ ├── backend/ # 后端服务目录(FastAPI + Celery) │ ├── main.py # FastAPI主入口文件,接口定义 │ ├── celery_app.py # Celery核心配置文件 │ ├── tasks.py # 异步任务定义文件 │ └── requirements.txt # Python依赖清单 ├── frontend/ # 前端目录(Vue3项目) │ └── ...(vue自动生成的项目文件) └── start.sh # 可选:Linux/Mac一键启动所有服务脚本2.3 后端 Python 依赖安装
进入backend目录,创建requirements.txt文件,写入项目所需的所有 Python 依赖:
txt
fastapi==0.104.1 uvicorn==0.24.0 celery==5.3.6 redis==5.0.1 python-multipart==0.0.6 cors-middleware==0.0.1 eventlet==0.33.3安装命令:打开命令行,执行以下指令一键安装所有依赖:
bash
运行
cd backend pip install -r requirements.txtfastapi:Web 框架核心uvicorn:ASGI 服务器,用于运行 FastAPIcelery:异步任务队列redis:Redis 客户端,连接 Redis 服务python-multipart:处理文件上传等表单数据cors-middleware:解决前后端跨域问题eventlet:Windows 系统运行 Celery 的必备协程库
三、后端核心代码实现
后端是整个项目的核心,分为Celery 配置、异步任务定义、FastAPI 接口开发三部分,代码逻辑清晰,模块化设计。
3.1 Celery 配置(celery_app.py)
该文件用于初始化 Celery 实例,配置 Redis 连接、时区、序列化方式等核心参数,是 Celery 的入口文件。
python
运行
from celery import Celery # 1. 配置Redis连接地址(本地默认配置,无密码) # 格式:redis://:密码@主机地址:端口/数据库号 REDIS_URL = "redis://localhost:6379/0" # 2. 创建Celery应用实例 celery = Celery( "async_tasks", # 任务模块名称,自定义即可 broker=REDIS_URL, # 指定Broker:Redis存储任务队列 backend=REDIS_URL, # 指定Backend:Redis存储任务结果 # 时区配置(解决时间异常问题) timezone="Asia/Shanghai", enable_utc=False, # 序列化配置:统一使用json格式,保证前后端数据兼容 task_serializer="json", result_serializer="json", accept_content=["json"], # 任务过期时间:任务结果保存1小时,自动清理 result_expires=3600, )核心配置说明:
broker:必须配置,Celery 的任务队列存储位置backend:可选,用于存储任务执行结果和进度,本项目必须配置timezone:统一时区,避免任务时间异常result_expires:自动清理过期任务结果,节省 Redis 内存
3.2 异步任务定义(tasks.py)
该文件用于编写所有耗时异步任务,支持进度实时更新、异常捕获处理,是 Celery 的核心功能实现。
python
运行
from celery_app import celery import time from celery.exceptions import Ignore # 定义耗时异步任务,bind=True表示绑定任务实例,可使用self更新状态 @celery.task(bind=True, name="tasks.long_time_task") def long_time_task(self, task_name: str): """ 耗时异步任务(模拟企业级耗时操作) self:Celery任务上下文对象,用于更新任务进度和状态 task_name:前端传入的任务名称,用于个性化展示 """ # 定义总进度步数,模拟10秒的耗时操作 total_steps = 10 try: # 循环模拟任务执行过程 for i in range(1, total_steps + 1): # 模拟耗时:文件处理、数据计算、接口调用等 time.sleep(1) # 核心代码:更新任务进度,前端可实时获取 self.update_state( state="PROGRESS", # 状态:执行中 meta={ "current": i, # 当前进度 "total": total_steps, # 总进度 "status": f"正在处理:{task_name},进度:{i*10}%" } ) # 任务执行完成,返回标准化结果 return { "code": 200, "state": "执行完成", "status": "SUCCESS", "result": f"任务【{task_name}】执行完成!", "progress": 100 } # 异常处理:任务执行失败时捕获错误信息 except Exception as e: self.update_state( state="FAILURE", meta={ "exc_type": str(type(e).__name__), # 异常类型 "exc_message": str(e) # 异常信息 } ) # 忽略默认异常处理,返回自定义错误信息 raise Ignore()任务功能说明:
- 支持实时进度更新,每执行一步更新一次进度信息
- 标准化异常处理,错误信息清晰返回给前端
- 支持自定义任务参数,适配不同业务场景
- 可扩展:添加任务重试、超时控制、日志记录等功能
3.3 FastAPI 接口开发(main.py)
该文件是后端 API 入口,提供任务提交、任务状态查询、健康检查三大接口,并解决前后端跨域问题。
python
运行
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from celery_app import celery from tasks import long_time_task from celery.result import AsyncResult # 1. 创建FastAPI应用实例 app = FastAPI( title="FastAPI+Celery异步任务项目", description="全栈异步任务实战接口文档", version="1.0.0" ) # 2. 核心配置:跨域中间件(解决Vue前端访问后端的跨域问题) app.add_middleware( CORSMiddleware, allow_origins=["*"], # 开发环境允许所有域名,生产环境替换为前端地址 allow_credentials=True, allow_methods=["*"], # 允许所有请求方式 allow_headers=["*"], # 允许所有请求头 ) # 3. 接口1:提交异步任务(前端调用) @app.post("/api/submit_task", summary="提交耗时异步任务") def submit_task(task_name: str): """ 接收前端请求,投递任务到Celery队列 :param task_name: 任务名称(前端传入参数) :return: 标准化响应 + 唯一任务ID """ # delay():核心方法,将任务异步投递到队列,非阻塞执行 task = long_time_task.delay(task_name) # 返回任务ID,前端通过该ID查询状态 return { "code": 200, "msg": "任务已成功提交至异步队列,后台处理中", "task_id": task.id } # 4. 接口2:根据任务ID查询任务状态和进度 @app.get("/api/task_status/{task_id}", summary="查询异步任务状态") def get_task_status(task_id: str): """ 前端轮询调用,获取任务实时状态 :param task_id: 提交任务时返回的唯一ID :return: 任务状态、进度、结果/错误信息 """ # 根据任务ID获取任务执行结果 task_result = AsyncResult(task_id, app=celery) # 分状态处理任务结果 if task_result.state == "PENDING": # 状态1:任务排队中,未开始执行 response = { "state": "等待执行", "status": "任务排队中...", "progress": 0 } elif task_result.state == "PROGRESS": # 状态2:任务执行中,返回实时进度 response = { "state": "执行中", "progress": int(task_result.info["current"] / task_result.info["total"] * 100), "status": task_result.info["status"] } elif task_result.state == "SUCCESS": # 状态3:任务执行完成,返回结果 response = task_result.result elif task_result.state == "FAILURE": # 状态4:任务执行失败,返回错误信息 response = { "state": "执行失败", "error": task_result.info["exc_message"], "progress": 0 } else: # 其他未知状态 response = { "state": task_result.state, "status": "未知状态,请检查任务是否存在", "progress": 0 } return response # 5. 接口3:健康检查(测试服务是否正常运行) @app.get("/", summary="服务健康检查") def health_check(): return { "code": 200, "message": "FastAPI+Celery异步服务运行正常!", "docs": "接口文档地址:http://localhost:8000/docs" }接口说明:
- 所有接口返回标准化 JSON 数据,前端易于解析
- 自动生成接口文档:访问
http://localhost:8000/docs可在线测试接口 - 完整的状态判断,覆盖任务所有生命周期
- 跨域配置保证 Vue 前端可以正常访问
四、启动后端服务
后端需要同时启动 3 个服务,启动顺序不能颠倒,缺一不可:Redis → FastAPI → Celery Worker。
4.1 启动 Redis 服务
Redis 是基础服务,必须第一个启动:
bash
运行
# Windows系统(进入Redis安装目录) redis-server.exe # Linux/Mac系统 redis-server启动成功后,命令行会显示Ready to accept connections,表示 Redis 正常运行。
4.2 启动 FastAPI 服务
FastAPI 提供 API 接口,使用 uvicorn 运行:
bash
运行
cd backend uvicorn main:app --host 0.0.0.0 --port 8000 --reload参数说明:
--host 0.0.0.0:允许所有 IP 访问--port 8000:服务端口号--reload:开发模式,代码修改后自动重启
启动成功后,访问:
- 服务地址:http://localhost:8000
- 接口文档:http://localhost:8000/docs
4.3 启动 Celery Worker 服务
Celery Worker 是任务执行者,监听 Redis 队列,必须单独启动:
bash
运行
cd backend # Windows系统(必须加-P eventlet,否则无法运行) celery -A celery_app worker --loglevel=info -P eventlet # Linux/Mac系统 celery -A celery_app worker --loglevel=info启动成功标志:命令行出现ready字样,表示 Celery 已就绪,等待接收任务。
五、Vue 前端实现
前端采用 Vue3 + Vite 构建,使用 Element Plus 组件库快速搭建界面,Axios 发送请求,实现任务提交和进度实时展示。
5.1 创建 Vue3 项目
bash
运行
# 进入项目根目录,创建Vue项目 npm create vite@latest frontend -- --template vue cd frontend # 安装核心依赖 npm install axios element-plusaxios:发送 HTTP 请求,与后端交互element-plus:Vue3 UI 组件库,提供进度条、按钮、输入框等组件
5.2 前端核心代码(src/App.vue)
替换App.vue文件全部内容,实现完整功能:
vue
<template> <div class="async-task-container"> <h2>FastAPI+Celery 全栈异步任务演示</h2> <!-- 任务提交表单区域 --> <div class="task-form"> <el-input v-model="taskName" placeholder="请输入任务名称" style="width: 300px; margin-right: 10px" /> <el-button type="primary" @click="submitTask" :loading="submitting" size="default" > 提交异步任务 </el-button> </div> <!-- 任务进度展示区域(有任务ID时显示) --> <div v-if="taskId" class="task-progress-card"> <h3>任务状态:{{ taskInfo.state }}</h3> <!-- 进度条组件 --> <el-progress :percentage="taskInfo.progress" status="primary" style="margin: 20px 0" /> <!-- 状态文字展示 --> <div class="status-text"> {{ taskInfo.status || taskInfo.result || taskInfo.error }} </div> </div> </div> </template> <script setup> import { ref, onMounted, onUnmounted } from 'vue' import axios from 'axios' import { ElMessage } from 'element-plus' import ElementPlus from 'element-plus' import 'element-plus/dist/index.css' // 配置Axios请求基础地址 const request = axios.create({ baseURL: 'http://localhost:8000/api', timeout: 10000 // 请求超时时间 }) // 响应式数据定义 const taskName = ref('测试异步任务') const submitting = ref(false) // 提交加载状态 const taskId = ref('') // 任务ID const taskInfo = ref({ state: '', progress: 0, status: '' }) // 任务信息 let timer = null // 轮询定时器 /** * 1. 提交异步任务到后端 */ const submitTask = async () => { // 表单验证 if (!taskName.value.trim()) { ElMessage.warning('请输入任务名称!') return } submitting.value = true try { // 发送POST请求提交任务 const res = await request.post('/submit_task', null, { params: { task_name: taskName.value } }) // 保存任务ID taskId.value = res.data.task_id ElMessage.success('任务提交成功!正在后台处理...') // 启动轮询查询任务状态 startQueryTaskStatus() } catch (err) { ElMessage.error('任务提交失败,请检查后端服务是否启动!') console.error('提交任务错误:', err) } finally { submitting.value = false } } /** * 2. 查询任务状态(轮询执行) */ const queryTaskStatus = async () => { if (!taskId.value) return try { const res = await request.get(`/task_status/${taskId.value}`) taskInfo.value = res.data // 任务完成或失败时,停止轮询 if (taskInfo.value.state === '执行完成' || taskInfo.value.state === '执行失败') { clearInterval(timer) timer = null if (taskInfo.value.state === '执行完成') { ElMessage.success(taskInfo.value.result) } else { ElMessage.error('任务执行失败!') } } } catch (err) { ElMessage.error('查询任务状态失败') clearInterval(timer) timer = null } } /** * 3. 启动轮询(每1秒查询一次状态) */ const startQueryTaskStatus = () => { // 清除已有定时器,避免重复轮询 if (timer) clearInterval(timer) timer = setInterval(queryTaskStatus, 1000) } // 页面销毁时清除定时器,防止内存泄漏 onUnmounted(() => { if (timer) clearInterval(timer) }) </script> <style scoped> .async-task-container { max-width: 800px; margin: 80px auto; padding: 0 20px; text-align: center; font-family: "Microsoft YaHei", sans-serif; } .task-form { margin: 40px 0; display: flex; justify-content: center; align-items: center; } .task-progress-card { margin-top: 40px; padding: 30px; border: 1px solid #e4e7ed; border-radius: 12px; background: #fafafa; box-shadow: 0 2px 12px rgba(0, 0, 0, 0.08); } .status-text { font-size: 16px; color: #303133; min-height: 30px; } </style>前端功能说明:
- 简洁美观的界面,支持任务名称输入、提交按钮
- 加载状态提示,提升用户体验
- 实时进度条展示,直观显示任务执行进度
- 自动轮询状态,任务完成 / 失败后自动停止轮询
- 完善的错误提示和表单验证
5.3 启动 Vue 前端服务
bash
运行
cd frontend npm run dev启动成功后,默认访问地址:http://localhost:5173
六、项目运行完整演示
按照以下步骤操作,即可体验全栈异步任务流程:
- 启动顺序:启动 Redis → 启动 FastAPI → 启动 Celery Worker → 启动 Vue 前端
- 打开前端页面:http://localhost:5173,输入任务名称(如:数据导出任务)
- 点击提交异步任务按钮,前端立即提示提交成功
- 页面实时显示任务状态和进度条,从 0% 逐步增长到 100%
- 10 秒后任务执行完成,进度条显示 100%,弹出成功提示
- 若任务执行异常,页面会显示错误信息,轮询自动停止
整个过程中,前端页面无任何卡顿,后端接口无阻塞,完美实现异步处理效果。
七、核心知识点总结
7.1 Celery 三大核心角色
- Broker:任务队列存储,本项目使用 Redis,负责接收、存储待执行任务
- Worker:任务执行者,独立进程,持续监听 Broker,拉取任务执行
- Backend:结果存储,本项目使用 Redis,保存任务进度、状态、结果
7.2 核心技术点
- 任务进度更新:通过
self.update_state()实现任务执行过程中的实时进度同步 - 异步解耦:FastAPI 只负责接收请求和返回结果,耗时任务交给 Celery 执行
- 前后端交互:前端轮询查询状态,替代长连接,降低服务器压力
- 跨域处理:FastAPI 配置 CORSMiddleware,解决前后端分离跨域问题
- 任务唯一标识:每个任务分配唯一 ID,精准查询任务状态
7.3 异步架构优势
- 接口响应极快:耗时任务后台执行,接口毫秒级响应
- 前端无阻塞:用户提交任务后可继续操作页面,无需等待
- 高并发支持:支持多 Worker 并行处理任务,提升系统处理能力
- 异常可控:任务异常不会影响主接口,单独捕获处理
- 适用场景广:文件导出、数据计算、邮件发送、AI 推理、批量处理等
八、生产环境优化建议
本项目为开发环境版本,部署到生产环境需要进行以下优化,提升系统稳定性、安全性和性能:
8.1 安全优化
- Redis 设置密码:修改 Redis 配置文件,添加密码,Celery 连接时携带密码
- 限制跨域域名:FastAPI 跨域配置不使用
*,指定前端正式域名 - 接口鉴权:添加 Token/JWT 认证,仅授权用户可提交任务
8.2 性能优化
- Celery 多 Worker:启动多个 Worker 进程,
celery -A celery_app worker -c 8(8 个并发) - FastAPI 托管:使用 Gunicorn+Uvicorn 托管 FastAPI,支持多进程
- 前端优化:使用 WebSocket 替代轮询,实现服务端主动推送,减少无效请求
- Redis 持久化:开启 Redis 持久化,防止服务重启丢失任务数据
8.3 运维优化
- 日志记录:添加任务执行日志,方便问题排查
- 任务监控:集成 Flower 监控 Celery 任务,实时查看任务状态
- 自动重启:使用 Supervisor 托管 Celery 和 FastAPI,服务崩溃自动重启
- 超时控制:为异步任务设置最大执行时间,避免死循环任务占用资源
九、常见问题排查
9.1 Celery 启动失败
- Windows 系统必须加
-P eventlet参数,否则无法启动 - 检查 Redis 是否启动,连接地址是否正确
- 检查 Python 版本是否兼容
9.2 任务状态一直为 PENDING
- Celery Worker 未启动
- 任务名称不匹配,检查
@celery.task的 name 参数 - Redis 连接失败,无法获取任务
9.3 前端无法访问后端接口
- 检查 FastAPI 是否启动
- 检查跨域配置是否正确
- 检查请求地址是否正确
9.4 任务进度不更新
- 检查任务中
self.update_state()代码是否正确 - 检查 Redis 是否正常存储进度数据
总结
本项目完整实现了FastAPI+Celery+Vue+Redis全栈异步任务开发流程,从环境搭建、代码编写、服务启动到前后端联调全覆盖,是企业级异步开发的标准实践方案。
- 开箱即用:所有代码可直接复制运行,无需额外修改,适合新手快速上手
- 功能完整:实现任务提交、实时进度、状态查询、异常处理全流程
- 架构标准:前后端分离,模块化设计,易于扩展和维护
- 场景通用:可直接改造用于文件导出、数据处理、邮件发送等实际业务
- 核心价值:彻底解决耗时任务阻塞前端、接口超时、并发能力低等痛点
项目启动顺序必须牢记:Redis → FastAPI → Celery Worker → Vue 前端,四个服务缺一不可。掌握本套技术栈,将大幅提升你的全栈开发能力,轻松应对大中型 Web 项目的异步需求。