原文:
towardsdatascience.com/how-to-use-sqlalchemy-to-make-database-requests-asynchronously-e90a4c8c11b1
数据库请求是一个典型的 I/O 密集型任务,因为它大部分时间都在等待数据库服务器的响应。因此,如果你的应用程序进行了大量的数据库请求,那么通过并发执行它们,性能可以得到显著提升,这是 SQLAlchemy(一个多功能的 Python SQL 工具包和对象关系映射器)所支持的。
此外,异步编程在 Python 中越来越受欢迎,尤其是在使用 FastAPI 进行 Web 开发时,我们经常需要在协程中执行数据库请求,即在用async def语句定义的函数中。不幸的是,我们不能使用经典的同步版本的 SQLAlchemy,而需要创建引擎、连接和会话的异步版本。
在本文中,我们将介绍如何在不同的场景下使用 SQLAlchemy 异步,即使用简单的 SQL 查询、Core 和 ORM。重要的是,我们将介绍如何在多个异步任务中并发使用它,如果使用得当,可以显著提高 I/O 密集型应用程序的效率。
准备工作
我们将使用 Docker 在本地启动一个 MySQL 服务器,在其中我们将创建用于演示的数据库和表:
# Create a volume to persist the data.$ docker volume create mysql8-data# Create the container for MySQL.$ docker run--name mysql8-d-e MYSQL_ROOT_PASSWORD=root-p13306:3306-v mysql8-data:/var/lib/mysql mysql:8# Connect to the local MySQL server in Docker.$ dockerexec-it mysql8 mysql-u root-proot mysql>SELECT VERSION();+-----------+|VERSION()|+-----------+|8.3.0|+-----------+1rowinset(0.00sec)CREATE DATABASE sales;CREATE TABLE `sales`.`customers`(`id` SMALLINT NOT NULL AUTO_INCREMENT,`name` VARCHAR(50)NOT NULL,`job` VARCHAR(50)DEFAULT'',PRIMARY KEY(`id`),UNIQUE `UQ_name`(`name`));INSERT INTO sales.customers(name,job)VALUES('Lynn','Backend Developer');然后让我们创建一个 虚拟环境,这样我们就可以尝试最新的 Python 和库版本:
conda create-n sql python=3.12conda activate sql pip install-U"sqlalchemy[asyncio]>=2.0,<2.1"pip install-U"aiomysql>=0.2,<0.3"pip install-U"cryptography>=42.0,<42.1"sqlalchemy[asyncio]– SQLAlchemy 与greenlet依赖项一起安装,这是一个 SQLAlchemy 用于异步工作的库。
aiomysql– 一个从 asyncio 框架访问 MySQL 数据库的驱动程序,它背后使用 PyMySQL。
cryptography– 由 SQLAlchemy 用于身份验证。
异步执行简单的 SQL 查询
要使用 SQLAlchemy 异步运行 SQL 查询,我们首先需要使用create_async_engine()创建一个异步引擎。然后,在创建连接、执行查询和处置引擎时,我们需要使用await:
importasynciofromsqlalchemyimporttextfromsqlalchemy.ext.asyncioimportcreate_async_engineasyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithasync_engine.begin()asconn:insert_query=text(""" INSERT INTO sales.customers (name, job) VALUES (:name, :job) """)awaitconn.execute(insert_query,{"name":"Hans","job":"Data Engineer"})# Check the data afer it's inserted.asyncwithasync_engine.connect()asconn:select_query=text(""" SELECT * FROM sales.customers WHERE name = :name """)result=awaitconn.execute(select_query,{"name":"Hans"})print(result.fetchall())# Close and clean-up pooled connections.awaitasync_engine.dispose()asyncio.run(main())注意,当异步执行如上所示的简单 SQL 查询时,我们需要使用字典传递变量,而不是像同步版本那样使用关键字参数。
当运行上面的代码时,你将看到以下结果打印出来:
[(2,'Hans','Data Engineer'))]当你想快速开始使用 SQLAlchemy 而又不了解 Core 和 ORM 功能时,使用简单的 SQL 查询是一个不错的选择。然而,如你所见,它并不太符合 Python 风格,因为它使用了自由风格的简单 SQL 查询。当你对 SQLAlchemy 有更多经验时,你可能想使用 Core 或 ORM 功能。
使用 SQLAlchemy Core 异步执行
在 SQLAlchemy 2.0 中,核心功能,通常意味着直接与Table对象交互,现在非常强大。它实际上与 ORM 功能混合到了非常高的程度。例如,select操作符可以用于核心和 ORM。
对于核心使用,我们还需要创建一个异步引擎,然后使用它来异步创建连接。基本工作流程与普通查询相同,不同之处在于语句是通过核心操作符如insert和select构建的。
importasynciofromsqlalchemyimportColumn,Integer,insertfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithengine.begin()asconn:stmt=insert(table).values(name="Jack",job="Frontend Developer")awaitconn.execute(stmt)# Check the data afer it's inserted.asyncwithengine.connect()asconn:result=awaitconn.execute(select(table).where(table.c.name=="Jack"))print(result.fetchall())# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时,将显示以下结果:
[(3,'Jack','Frontend Developer')]使用 SQLAlchemy ORM 异步
使用 SQLAlchemy ORM 的 ORM 功能要复杂一些,尤其是在 2.0 版本中,ORM 类的创建语法发生了显著变化。特别是,Mapped[]用于指定类型,mapped_column()构造其他列属性。
fromsqlalchemyimportStringfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")要异步处理 ORM,我们需要使用async_sessionmaker()创建一个异步会话工厂,然后使用with来创建异步会话实例:
# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:...处理 ORM 的完整异步代码如下:
importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:# Insert new data with a transation.asyncwithsession.begin():session.add(Customer(name="Stephen",job="Manager"))# Check the data afer it's inserted.asyncwithasync_session()assession:result=awaitsession.execute(select(Customer).where(Customer.name=="Stephen"))customer=result.scalars().one()print(f"name ={customer.name}, job ={customer.job}")# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时,将显示以下结果:
name=Stephen,job=Manager使用 SQLAlchemy Core 在多个异步任务中
在多个异步任务中并发使用 SQLAlchemy Core 简单,因为连接对象可以直接在多个异步任务中传递和使用:
importasynciofrompprintimportpprintfromsqlalchemyimportColumn,IntegerfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefget_customer(name,conn):result=awaitconn.execute(select(table).where(table.c.name==name))returnresult.fetchone()asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithengine.connect()asconn:fornameinnames:tasks.append(get_customer(name,conn))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时,你会看到以下结果打印出来:
[(1,'Lynn','Backend Developer'),(2,'Hans','Data Engineer'),(3,'Jack','Frontend Developer'),(4,'Stephen','Manager')]在多个异步任务中使用 SQLAlchemy ORM
另一方面,在多个异步任务中使用 SQLAlchemy ORM 要复杂一些,因为不能直接在并发任务中使用相同的AsyncSession实例。
让我们直接尝试使用它并看看会发生什么:
importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name,session):result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithasync_session()assession:fornameinnames:tasks.append(get_customer(name,session))results=awaitasyncio.gather(*tasks)print(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时,你会看到以下错误:
sqlalchemy.exc.InvalidRequestError:This sessionisprovisioning a new connection;concurrent operations arenotpermitted这个错误意味着单个AsyncSession实例不能在多个并发任务(例如使用asyncio.gather()之类的函数)之间共享。如果你想深入了解这个话题,可以查看这个参考。
解决这个问题的简单可行方案是在每个任务中创建一个AsyncSession实例。我们将重构代码以全局创建engine和async_session_factory,然后在每个任务中调用async_session_factory()来创建一个独立的会话:
importasynciofrompprintimportpprintfromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_column# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session_factory=async_sessionmaker(engine,expire_on_commit=False)classBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name):# Create an async session instance.asyncwithasync_session_factory()assession:result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.fornameinnames:tasks.append(get_customer(name))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当代码运行时,你会看到以下结果打印出来:
[{'job':'Backend Developer','name':'Lynn'},{'job':'Data Engineer','name':'Hans'},{'job':'Frontend Developer','name':'Jack'},{'job':'Manager','name':'Stephen'}]就像 HTTP 请求一样,数据库请求也是 I/O 密集型任务,因为它们大部分时间都在等待数据库服务器的响应。因此,我们可以通过并发而不是顺序地执行数据库请求来显著提高应用程序的效率。
另一方面,异步地执行数据库请求也越来越重要,因为异步编程在 Python 中变得越来越流行,尤其是在使用 FastAPI 进行 Web 开发时,这也突出了学习这个主题的必要性。
在这篇文章中,我们介绍了如何在不同的场景下使用 SQLAlchemy 进行异步操作,即使用纯 SQL 查询、Core 和 ORM。你可以简单地调整代码以适应你的特定使用。我们特别介绍了如何在多个异步任务中并发使用 SQLAlchemy,如果应用程序需要并发执行大量数据库请求,这将提高应用程序的效率。
相关文章
学习基础知识并开始使用 SQLAlchemy ORM
如何在 Python 中使用 SQLAlchemy 执行纯 SQL 查询