news 2026/3/11 23:49:22

如何使用 SQLAlchemy 异步进行数据库请求

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
如何使用 SQLAlchemy 异步进行数据库请求

原文:towardsdatascience.com/how-to-use-sqlalchemy-to-make-database-requests-asynchronously-e90a4c8c11b1

数据库请求是一个典型的 I/O 密集型任务,因为它大部分时间都在等待数据库服务器的响应。因此,如果你的应用程序进行了大量的数据库请求,那么通过并发执行它们,性能可以得到显著提升,这是 SQLAlchemy(一个多功能的 Python SQL 工具包和对象关系映射器)所支持的。

此外,异步编程在 Python 中越来越受欢迎,尤其是在使用 FastAPI 进行 Web 开发时,我们经常需要在协程中执行数据库请求,即在用async def语句定义的函数中。不幸的是,我们不能使用经典的同步版本的 SQLAlchemy,而需要创建引擎、连接和会话的异步版本。

在本文中,我们将介绍如何在不同的场景下使用 SQLAlchemy 异步,即使用简单的 SQL 查询、Core 和 ORM。重要的是,我们将介绍如何在多个异步任务中并发使用它,如果使用得当,可以显著提高 I/O 密集型应用程序的效率。


准备工作

我们将使用 Docker 在本地启动一个 MySQL 服务器,在其中我们将创建用于演示的数据库和表:

# Create a volume to persist the data.$ docker volume create mysql8-data# Create the container for MySQL.$ docker run--name mysql8-d-e MYSQL_ROOT_PASSWORD=root-p13306:3306-v mysql8-data:/var/lib/mysql mysql:8# Connect to the local MySQL server in Docker.$ dockerexec-it mysql8 mysql-u root-proot mysql>SELECT VERSION();+-----------+|VERSION()|+-----------+|8.3.0|+-----------+1rowinset(0.00sec)
CREATE DATABASE sales;CREATE TABLE `sales`.`customers`(`id` SMALLINT NOT NULL AUTO_INCREMENT,`name` VARCHAR(50)NOT NULL,`job` VARCHAR(50)DEFAULT'',PRIMARY KEY(`id`),UNIQUE `UQ_name`(`name`));INSERT INTO sales.customers(name,job)VALUES('Lynn','Backend Developer');

然后让我们创建一个 虚拟环境,这样我们就可以尝试最新的 Python 和库版本:

conda create-n sql python=3.12conda activate sql pip install-U"sqlalchemy[asyncio]>=2.0,<2.1"pip install-U"aiomysql>=0.2,<0.3"pip install-U"cryptography>=42.0,<42.1"
  • sqlalchemy[asyncio]– SQLAlchemy 与greenlet依赖项一起安装,这是一个 SQLAlchemy 用于异步工作的库。

  • aiomysql– 一个从 asyncio 框架访问 MySQL 数据库的驱动程序,它背后使用 PyMySQL。

  • cryptography– 由 SQLAlchemy 用于身份验证。


异步执行简单的 SQL 查询

要使用 SQLAlchemy 异步运行 SQL 查询,我们首先需要使用create_async_engine()创建一个异步引擎。然后,在创建连接、执行查询和处置引擎时,我们需要使用await

importasynciofromsqlalchemyimporttextfromsqlalchemy.ext.asyncioimportcreate_async_engineasyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithasync_engine.begin()asconn:insert_query=text(""" INSERT INTO sales.customers (name, job) VALUES (:name, :job) """)awaitconn.execute(insert_query,{"name":"Hans","job":"Data Engineer"})# Check the data afer it's inserted.asyncwithasync_engine.connect()asconn:select_query=text(""" SELECT * FROM sales.customers WHERE name = :name """)result=awaitconn.execute(select_query,{"name":"Hans"})print(result.fetchall())# Close and clean-up pooled connections.awaitasync_engine.dispose()asyncio.run(main())

注意,当异步执行如上所示的简单 SQL 查询时,我们需要使用字典传递变量,而不是像同步版本那样使用关键字参数。

当运行上面的代码时,你将看到以下结果打印出来:

[(2,'Hans','Data Engineer'))]

当你想快速开始使用 SQLAlchemy 而又不了解 Core 和 ORM 功能时,使用简单的 SQL 查询是一个不错的选择。然而,如你所见,它并不太符合 Python 风格,因为它使用了自由风格的简单 SQL 查询。当你对 SQLAlchemy 有更多经验时,你可能想使用 Core 或 ORM 功能。


使用 SQLAlchemy Core 异步执行

在 SQLAlchemy 2.0 中,核心功能,通常意味着直接与Table对象交互,现在非常强大。它实际上与 ORM 功能混合到了非常高的程度。例如,select操作符可以用于核心和 ORM。

对于核心使用,我们还需要创建一个异步引擎,然后使用它来异步创建连接。基本工作流程与普通查询相同,不同之处在于语句是通过核心操作符如insertselect构建的。

importasynciofromsqlalchemyimportColumn,Integer,insertfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithengine.begin()asconn:stmt=insert(table).values(name="Jack",job="Frontend Developer")awaitconn.execute(stmt)# Check the data afer it's inserted.asyncwithengine.connect()asconn:result=awaitconn.execute(select(table).where(table.c.name=="Jack"))print(result.fetchall())# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,将显示以下结果:

[(3,'Jack','Frontend Developer')]

使用 SQLAlchemy ORM 异步

使用 SQLAlchemy ORM 的 ORM 功能要复杂一些,尤其是在 2.0 版本中,ORM 类的创建语法发生了显著变化。特别是,Mapped[]用于指定类型,mapped_column()构造其他列属性。

fromsqlalchemyimportStringfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")

要异步处理 ORM,我们需要使用async_sessionmaker()创建一个异步会话工厂,然后使用with来创建异步会话实例:

# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:...

处理 ORM 的完整异步代码如下:

importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:# Insert new data with a transation.asyncwithsession.begin():session.add(Customer(name="Stephen",job="Manager"))# Check the data afer it's inserted.asyncwithasync_session()assession:result=awaitsession.execute(select(Customer).where(Customer.name=="Stephen"))customer=result.scalars().one()print(f"name ={customer.name}, job ={customer.job}")# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,将显示以下结果:

name=Stephen,job=Manager

使用 SQLAlchemy Core 在多个异步任务中

在多个异步任务中并发使用 SQLAlchemy Core 简单,因为连接对象可以直接在多个异步任务中传递和使用:

importasynciofrompprintimportpprintfromsqlalchemyimportColumn,IntegerfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefget_customer(name,conn):result=awaitconn.execute(select(table).where(table.c.name==name))returnresult.fetchone()asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithengine.connect()asconn:fornameinnames:tasks.append(get_customer(name,conn))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,你会看到以下结果打印出来:

[(1,'Lynn','Backend Developer'),(2,'Hans','Data Engineer'),(3,'Jack','Frontend Developer'),(4,'Stephen','Manager')]

在多个异步任务中使用 SQLAlchemy ORM

另一方面,在多个异步任务中使用 SQLAlchemy ORM 要复杂一些,因为不能直接在并发任务中使用相同的AsyncSession实例。

让我们直接尝试使用它并看看会发生什么:

importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name,session):result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithasync_session()assession:fornameinnames:tasks.append(get_customer(name,session))results=awaitasyncio.gather(*tasks)print(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,你会看到以下错误:

sqlalchemy.exc.InvalidRequestError:This sessionisprovisioning a new connection;concurrent operations arenotpermitted

这个错误意味着单个AsyncSession实例不能在多个并发任务(例如使用asyncio.gather()之类的函数)之间共享。如果你想深入了解这个话题,可以查看这个参考。

解决这个问题的简单可行方案是在每个任务中创建一个AsyncSession实例。我们将重构代码以全局创建engineasync_session_factory,然后在每个任务中调用async_session_factory()来创建一个独立的会话:

importasynciofrompprintimportpprintfromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_column# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session_factory=async_sessionmaker(engine,expire_on_commit=False)classBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name):# Create an async session instance.asyncwithasync_session_factory()assession:result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.fornameinnames:tasks.append(get_customer(name))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当代码运行时,你会看到以下结果打印出来:

[{'job':'Backend Developer','name':'Lynn'},{'job':'Data Engineer','name':'Hans'},{'job':'Frontend Developer','name':'Jack'},{'job':'Manager','name':'Stephen'}]

就像 HTTP 请求一样,数据库请求也是 I/O 密集型任务,因为它们大部分时间都在等待数据库服务器的响应。因此,我们可以通过并发而不是顺序地执行数据库请求来显著提高应用程序的效率。

另一方面,异步地执行数据库请求也越来越重要,因为异步编程在 Python 中变得越来越流行,尤其是在使用 FastAPI 进行 Web 开发时,这也突出了学习这个主题的必要性。

在这篇文章中,我们介绍了如何在不同的场景下使用 SQLAlchemy 进行异步操作,即使用纯 SQL 查询、Core 和 ORM。你可以简单地调整代码以适应你的特定使用。我们特别介绍了如何在多个异步任务中并发使用 SQLAlchemy,如果应用程序需要并发执行大量数据库请求,这将提高应用程序的效率。


相关文章

  • 学习基础知识并开始使用 SQLAlchemy ORM

  • 如何在 Python 中使用 SQLAlchemy 执行纯 SQL 查询

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/3 18:39:51

2024年AI原生应用趋势:事件驱动架构深度解析

2024年AI原生应用趋势&#xff1a;事件驱动架构深度解析 关键词&#xff1a;事件驱动架构、AI原生应用、事件流、实时处理、解耦设计、微服务、持续学习 摘要&#xff1a;2024年&#xff0c;AI原生应用&#xff08;AI-Native Applications&#xff09;正从“能用”向“好用”快…

作者头像 李华
网站建设 2026/3/10 8:54:15

大模型推理延迟优化:GPU加速+Token流式输出

大模型推理延迟优化&#xff1a;GPU加速与流式输出的协同实践 在今天的AI应用中&#xff0c;用户已经不再满足于“能不能回答”&#xff0c;而是更关心“多久能答出来”。当你向一个智能助手提问时&#xff0c;哪怕只是多等一两秒&#xff0c;那种轻微的卡顿感也会悄然削弱信任…

作者头像 李华
网站建设 2026/2/24 3:20:22

使用Markdown表格整理PyTorch函数对照清单

使用 Markdown 表格整理 PyTorch 函数对照清单 在深度学习项目中&#xff0c;一个常见的挑战是团队成员之间对函数用法的理解不一致&#xff0c;尤其是在跨版本迁移或协作开发时。PyTorch 虽然以易用著称&#xff0c;但其 API 在不同版本间仍存在细微差异&#xff0c;加上 CUDA…

作者头像 李华
网站建设 2026/2/27 11:02:40

PyTorch反向传播机制深入理解与调试技巧

PyTorch反向传播机制深入理解与调试技巧 在现代深度学习实践中&#xff0c;模型训练的稳定性往往取决于开发者对底层机制的理解程度。即便使用了如PyTorch这样“开箱即用”的框架&#xff0c;一旦遇到梯度爆炸、NaN损失或参数不更新等问题&#xff0c;若仅停留在调用 .backward…

作者头像 李华
网站建设 2026/3/11 19:37:44

PyTorch镜像中实现梯度裁剪(Gradient Clipping)防止爆炸

PyTorch镜像中实现梯度裁剪防止梯度爆炸 在深度学习的实践中&#xff0c;你是否曾遇到训练进行到一半&#xff0c;损失突然变成 NaN&#xff0c;模型彻底“死亡”&#xff1f;尤其是在训练RNN、Transformer这类深层或序列模型时&#xff0c;这种现象尤为常见。问题的根源往往不…

作者头像 李华
网站建设 2026/3/6 20:33:46

D触发器电路图电平触发与边沿触发区别:一文说清

D触发器电路图电平触发与边沿触发区别&#xff1a;一文说清 在数字电路的世界里&#xff0c; D触发器电路图 几乎是每个工程师都绕不开的核心元件。无论你是设计一个简单的计数器&#xff0c;还是构建复杂的CPU流水线&#xff0c;D触发器都是实现数据同步、状态保持和时序控制…

作者头像 李华