Python 异步并发:从 asyncio 到结构化并发的实战思考
一、asyncio 的坑:当并发量上来之后
Python 的 asyncio 确实是异步编程的事实标准,但真把它推向生产环境的极限时,你会发现它远没有教程里那么优雅。
之前我们维护的一个向量检索服务,用 asyncio 并发查询 5 个 Milvus Collection。理论上,并发查询的延迟应该只比单次查询高一点点,但实测 P99 延迟反而比串行高了 40%。
问题出在哪?协程泄漏。
某个查询没设超时,卡在 TCP 等待上。其他协程虽然创建了,但都在等事件循环调度,而事件循环被那个阻塞的协程占着。更常见的问题是协程孤儿——你asyncio.create_task()创建了一个任务,没await它,也没存引用。这个任务就在后台默默跑,异常被吞掉,资源占着,你还以为一切正常。
asyncio 的原始 API 是面向底层事件循环设计的,给你create_task、gather、wait,就像给你一堆砖头让你盖房子。你需要的是结构化并发——让协程的生命周期与代码块绑定,就像with语句让资源生命周期与代码块绑定一样。
二、结构化并发:给协程套上“笼子”
非结构化 vs 结构化
非结构化并发就像 Go 的go func()——启动一个 goroutine,它什么时候结束、出不出错、有没有泄漏,你全不知道。结构化并发要求简单粗暴:所有子任务必须在父任务的作用域内完成。
graph TB subgraph 非结构化: 协程散养 A1[主协程] --> B1[子协程1] A1 --> B2[子协程2] B1 -.-> C1[??? 何时结束] B2 -.-> C2[??? 异常去哪了] end subgraph 结构化: 生命周期受控 A2[主协程] --> B3[子协程1] A2 --> B4[子协程2] B3 --> D1[完成/超时/取消] B4 --> D2[完成/超时/取消] D1 --> A2 D2 --> A2 end核心就三条:
- 子任务不逃逸:子协程的引用不能泄露到父作用域外。
- 父等子结束:父协程必须等所有子协程完事(不管成功还是失败)才退出。
- 异常不丢失:子协程的异常必须传给父协程。
Python 的实现路径
Python 3.11 引入了TaskGroup,算是官方的结构化并发方案。但在生产环境,你还需要超时控制、并发限制和优雅取消。
sequenceDiagram participant Main as 主协程 participant TG as TaskGroup participant T1 as 任务1 participant T2 as 任务2 Main->>TG: async with TaskGroup() TG->>T1: create_task(query_1) TG->>T2: create_task(query_2) Note over T2: T2 抛出异常 T2-->>TG: 异常! TG->>T1: 取消 (cancel) TG-->>Main: ExceptionGroup三、生产级工具库实现
下面是一个我在生产环境用的结构化并发工具库,支持超时、并发限制和异常聚合。
""" 结构化并发工具库 - 生产级 asyncio 并发管理 """ import asyncio import time from dataclasses import dataclass, field from typing import Any, Callable, Coroutine, TypeVar from contextlib import asynccontextmanager T = TypeVar("T") @dataclass class TaskResult: """单个任务的执行结果""" task_id: str success: bool value: Any = None error: Exception | None = None duration_ms: float = 0.0 @dataclass class BatchResult: """批量任务的聚合结果""" total: int = 0 succeeded: int = 0 failed: int = 0 cancelled: int = 0 results: list[TaskResult] = field(default_factory=list) total_duration_ms: float = 0.0 @property def success_rate(self) -> float: return self.succeeded / self.total if self.total > 0 else 0.0 class ConcurrencyLimiter: """ 并发限制器:基于 Semaphore,防止下游服务被压垮 额外提供了等待队列深度监控 """ def __init__(self, max_concurrency: int): self._semaphore = asyncio.Semaphore(max_concurrency) self._active_count = 0 self._waiting_count = 0 @property def active_count(self) -> int: return self._active_count @property def waiting_count(self) -> int: return self._waiting_count @asynccontextmanager async def acquire(self): """上下文管理器方式获取并发槽位""" self._waiting_count += 1 try: await self._semaphore.acquire() self._waiting_count -= 1 self._active_count += 1 try: yield finally: self._active_count -= 1 self._semaphore.release() except asyncio.CancelledError: self._waiting_count -= 1 raise class StructuredConcurrencyRunner: """ 结构化并发执行器 核心保证:所有任务在作用域结束时要么完成、要么取消,不存在孤儿任务 """ def __init__( self, max_concurrency: int = 10, task_timeout: float = 30.0, retry_on_timeout: bool = False, ): self._limiter = ConcurrencyLimiter(max_concurrency) self._task_timeout = task_timeout self._retry_on_timeout = retry_on_timeout async def run_single( self, coro_factory: Callable[[], Coroutine], task_id: str, ) -> TaskResult: """执行单个任务,带并发限制和超时控制""" start = time.monotonic() try: async with self._limiter.acquire(): # 超时控制:在并发槽位内才开始计时 result = await asyncio.wait_for( coro_factory(), timeout=self._task_timeout, ) duration = (time.monotonic() - start) * 1000 return TaskResult( task_id=task_id, success=True, value=result, duration_ms=duration, ) except asyncio.TimeoutError: duration = (time.monotonic() - start) * 1000 return TaskResult( task_id=task_id, success=False, error=TimeoutError(f"任务超时 ({self._task_timeout}s)"), duration_ms=duration, ) except asyncio.CancelledError: duration = (time.monotonic() - start) * 1000 return TaskResult( task_id=task_id, success=False, error=CancelledError("任务被取消"), duration_ms=duration, ) except Exception as e: duration = (time.monotonic() - start) * 1000 return TaskResult( task_id=task_id, success=False, error=e, duration_ms=duration, ) async def run_batch( self, coro_factories: list[tuple[str, Callable[[], Coroutine]]], fail_fast: bool = False, ) -> BatchResult: """ 批量执行任务,结构化保证:所有任务在返回前都已结束 fail_fast=True 时,任一任务失败立即取消其余任务 """ batch_start = time.monotonic() batch_result = BatchResult(total=len(coro_factories)) if not coro_factories: return batch_result pending_tasks: dict[asyncio.Task, str] = {} async def _wrapped(task_id: str, factory: Callable[[], Coroutine]): """包装每个任务,确保结果被收集""" result = await self.run_single(factory, task_id) batch_result.results.append(result) if result.success: batch_result.succeeded += 1 else: batch_result.failed += 1 if fail_fast and not isinstance(result.error, asyncio.CancelledError): # 快速失败:取消所有未完成的任务 for task in pending_tasks: task.cancel() return result try: async with asyncio.TaskGroup() as tg: for task_id, factory in coro_factories: task = tg.create_task(_wrapped(task_id, factory)) pending_tasks[task] = task_id except* Exception as eg: # Python 3.11+ ExceptionGroup 语法 for exc in eg.exceptions: if not isinstance(exc, (asyncio.CancelledError,)): batch_result.failed += 1 batch_result.total_duration_ms = (time.monotonic() - batch_start) * 1000 batch_result.cancelled = batch_result.total - batch_result.succeeded - batch_result.failed return batch_result # ===== 使用示例:向量检索服务的并发查询 ===== async def mock_milvus_query(collection: str, vector: list[float]) -> dict: """模拟 Milvus 查询,随机延迟""" import random delay = random.uniform(0.1, 0.5) await asyncio.sleep(delay) return {"collection": collection, "matches": [{"id": 1, "score": 0.95}]} async def main(): """并发查询多个 Collection,结构化保证无泄漏""" runner = StructuredConcurrencyRunner( max_concurrency=5, # 最多 5 个并发查询 task_timeout=2.0, # 单次查询超时 2 秒 ) collections = ["articles", "papers", "docs", "wiki", "code"] query_vector = [0.1] * 128 # 构建任务列表:每个任务是一个 (id, factory) 元组 tasks = [ (f"query_{col}", lambda col=col: mock_milvus_query(col, query_vector)) for col in collections ] result = await runner.run_batch(tasks) print(f"总数: {result.total}, 成功: {result.succeeded}, " f"失败: {result.failed}, 取消: {result.cancelled}") print(f"成功率: {result.success_rate:.1%}") print(f"总耗时: {result.total_duration_ms:.0f}ms") for r in result.results: status = "✓" if r.success else "✗" print(f" {status} {r.task_id}: {r.duration_ms:.0f}ms") if __name__ == "__main__": asyncio.run(main())几个关键设计点
coro_factory而非coro:传入协程工厂函数而非协程对象。因为协程对象一旦创建就开始计时,如果你在队列里等了 5 秒才拿到并发槽位,超时早就过了。工厂函数确保只在获取槽位后才创建协程。
fail_fast模式:当任一任务失败时,立即取消所有未完成的任务。这在“全有或全无”的业务场景中很有用,比如分布式事务中的并行校验。
ExceptionGroup处理:Python 3.11 的TaskGroup会抛出ExceptionGroup,里面可能包含多个异常。用except*语法可以分类处理,避免一个异常掩盖其他异常。
四、结构化并发的代价:不是所有场景都该用
TaskGroup 的异常传播问题
TaskGroup的设计哲学是“一个失败,全部取消”。这在很多场景下过于激进。比如你要并发查询 5 个数据源,其中 1 个超时了,你可能希望拿到其余 4 个的结果,而不是全部取消。
应对策略:在TaskGroup内部用try/except包裹每个任务,把异常转化为TaskResult,不让异常传播到TaskGroup层面。上面的代码就是这么做的——_wrapped函数吞掉了异常,转为结果对象。
并发限制的背压问题
ConcurrencyLimiter用 Semaphore 控制并发数,但 Semaphore 不提供背压机制——当并发槽位满了,新的请求会排队等待,而不是被拒绝。如果上游的请求速率远超下游的处理能力,等待队列会无限增长,最终 OOM。
应对策略:给 Semaphore 加上等待超时。如果等了 5 秒还拿不到槽位,直接返回“服务繁忙”错误,而不是让请求无限等待。
适用边界
- CPU 密集型任务:asyncio 是单线程的,CPU 密集型任务会阻塞事件循环。用
ProcessPoolExecutor+loop.run_in_executor替代。 - 需要精确顺序的场景:结构化并发的子任务完成顺序不确定,如果业务要求严格的执行顺序,应该用串行或链式异步。
- 长时间运行的后台任务:结构化并发要求子任务在父作用域内结束,不适合“启动后不管”的后台任务。这种场景用
asyncio.create_task+ 显式生命周期管理更合适。
五、总结
结构化并发的核心思想是让协程的生命周期与代码作用域绑定,消除协程泄漏和孤儿任务的风险。Python 3.11 的TaskGroup提供了官方支持,但生产环境还需要并发限制、超时控制和异常聚合等增强能力。协程工厂模式确保超时计时从获取并发槽位开始,而非从创建协程开始。结构化并发不适用于 CPU 密集型任务和长时间后台任务,在这些场景下应选择多进程或显式生命周期管理。