Python asyncio 异步IO踩坑:我为什么把 100 个线程改成了 10 个 Event Loop
说实话,我一直对 asyncio 又爱又恨。
去年接了一个高并发采集项目,每天要拉取 50 万条外部 API 数据。一开始我信心满满:Python 嘛,上 ThreadPoolExecutor 就完事了,100 个线程干它!
结果上线第一天,服务器就给我上了一课。
100 个线程的噩梦
一开始的代码长这样:
fromconcurrent.futuresimportThreadPoolExecutorimportrequestsdeffetch(url):returnrequests.get(url,timeout=30).json()withThreadPoolExecutor(max_workers=100)asexecutor:results=list(executor.map(fetch,urls))看起来没毛病对吧?但跑了半小时我就发现不对劲:
- CPU 没占多少,内存却飙到了 8GB
ps -eLf | grep python | wc -l一看,好家伙,100 多个线程- 每个线程的栈内存 8MB,光是线程栈就吃了 800MB
- 更糟的是 GIL 锁,线程多了反而互相抢锁,API 响应时间从 200ms 涨到了 800ms
我当时就懵了。这不是我想象中的高并发。
换成 asyncio,坑才刚刚开始
行,线程不行,那就上 asyncio。听说这玩意儿号称能跑几十万并发,我一想,10 个 Event Loop 总够了吧?
结果第一个坑就让我踩实了:
importasyncioimportrequests# 这里埋雷了asyncdeffetch(url):returnrequests.get(url,timeout=30).json()# 阻塞!asyncdefmain():tasks=[fetch(url)forurlinurls]returnawaitasyncio.gather(*tasks)asyncio.run(main())跑起来一看,并发数确实上去了,但 CPU 占用还是高。用asyncio.all_tasks()一查,发现任务根本没并行执行,而是一个接一个排队。
requests 是同步库,在 async 函数里调用会阻塞整个 Event Loop。
这是我踩的第一个坑:混用同步和异步代码。
换上 aiohttp,第二个坑来了
好,换成 aiohttp:
importaiohttpasyncdeffetch(session,url):asyncwithsession.get(url,timeout=30)asresp:returnawaitresp.json()asyncdefmain():asyncwithaiohttp.ClientSession()assession:tasks=[fetch(session,url)forurlinurls]returnawaitasyncio.gather(*tasks)这下总算能并发了。但新的问题又来了:
aiohttp 的默认连接池太小。同时发 1000 个请求,大部分时间都在等连接,实际并发数只有几十个。
查了半天文档,发现要手动调limit参数:
connector=aiohttp.TCPConnector(limit=200,limit_per_host=50)asyncwithaiohttp.ClientSession(connector=connector)assession:# ...调完这个参数,吞吐量直接翻了 3 倍。
10 个 Event Loop 的架构
单个 Event Loop 再强,也受限于单核。我用uvloop替换默认的 asyncio 事件循环,再配合进程池,搞了个 10 进程 + 每进程 1 个 Event Loop 的架构:
importasyncioimportuvloopfrommultiprocessingimportPool asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())asyncdefworker(urls_chunk):connector=aiohttp.TCPConnector(limit=200,limit_per_host=50)asyncwithaiohttp.ClientSession(connector=connector)assession:tasks=[fetch(session,url)forurlinurls_chunk]returnawaitasyncio.gather(*tasks,return_exceptions=True)defrun_worker(urls_chunk):returnasyncio.run(worker(urls_chunk))# 把 50 万 URL 分成 10 份,每份 5 万chunks=[urls[i::10]foriinrange(10)]withPool(10)aspool:results=pool.map(run_worker,chunks)内存从 8GB 降到了 1.2GB,采集时间从 6 小时缩到了 45 分钟。
那些没人告诉你的细节
1. 异常处理必须加return_exceptions=True
results=awaitasyncio.gather(*tasks,return_exceptions=True)不然一个任务抛异常,剩下所有任务都会被取消。我因为这个丢了一整批数据。
2. 超时控制要分层
aiohttp 的 timeout 不只是总超时,要拆成连接超时和读取超时:
timeout=aiohttp.ClientTimeout(total=30,# 总超时connect=5,# 连接建立超时sock_read=10# 读取超时)3. 别忘了关闭 Session
asyncwithaiohttp.ClientSession()assession:# ...# 这里会自动关闭,但如果是全局 session,记得程序退出时手动 close我有一次把 session 定义成了全局变量,程序跑完连接池没释放,服务器端口被占满了。
4. DNS 解析也是阻塞的
默认情况下,aiohttp 用系统 DNS 解析,这个操作是阻塞的。高并发下建议装aiodns:
pipinstallaiodns然后在 ClientSession 里启用:
session=aiohttp.ClientSession(connector=aiohttp.TCPConnector(use_dns_cache=True),trust_env=True)写在最后
从 100 个线程到 10 个 Event Loop,这个改造让我明白了一件事:高并发不是堆资源,而是选对工具 + 调对参数。
asyncio 确实能跑几十万并发,但前提是你要知道它的脾气:
- 别混用同步代码
- 连接池要手动调
- 异常处理要到位
- DNS 解析别忽略
如果你也在用 asyncio 做采集或者 API 聚合,建议先检查一下这几处。很可能性能瓶颈不在 Python,而在一个没注意到的默认配置。
附:一键诊断脚本
importasyncioimportpsutilasyncdefdiagnose():proc=psutil.Process()print(f"CPU:{proc.cpu_percent()}%")print(f"Memory:{proc.memory_info().rss/1024/1024:.1f}MB")print(f"Threads:{proc.num_threads()}")print(f"Connections: len(proc.connections())")loop=asyncio.get_event_loop()print(f"Event Loop:{type(loop).__name__}")print(f"Pending tasks:{len(asyncio.all_tasks())}")asyncio.run(diagnose())跑一下这个脚本,看看你当前的 asyncio 程序是不是真的在并发,还是在假装并发。