news 2026/5/7 1:04:27

psycopg2-binary 全面教程:常用 API 串联与实战指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
psycopg2-binary 全面教程:常用 API 串联与实战指南

大家好,我是jobleap.cn的小九。
psycopg2-binary 是 Python 连接 PostgreSQL 数据库的核心库(psycopg2的预编译二进制版本,无需编译依赖,开箱即用),本文将从环境准备、核心 API 讲解到实战案例,全面串联其常用用法,帮助你掌握 PostgreSQL 数据库的 Python 操作全流程。

一、环境准备

1. 安装 psycopg2-binary

使用 pip 快速安装(推荐 Python 3.6+ 版本):

# 安装最新版pip install psycopg2-binary# 安装指定版本(如适配特定 PostgreSQL 版本)pip install psycopg2-binary==2.9.9

2. 前置条件

  • 已安装并启动 PostgreSQL 服务(本地/远程);
  • 拥有可访问的 PostgreSQL 数据库、用户名和密码;
  • 确保目标数据库端口(默认 5432)未被防火墙拦截。

二、核心概念与基础 API

psycopg2 的核心操作围绕连接(Connection)游标(Cursor)展开:

  • Connection:负责与 PostgreSQL 数据库建立网络连接,管理事务;
  • Cursor:基于连接创建的操作句柄,用于执行 SQL 语句、获取查询结果。

1. 数据库连接(connect())

psycopg2.connect()是创建数据库连接的核心函数,支持通过参数或 DSN 字符串传参,常用参数如下:

参数说明默认值
host数据库服务器地址localhost
port数据库端口5432
dbname/database目标数据库名-
user数据库用户名当前系统用户
password数据库密码-
sslmodeSSL 连接模式(如 require)disable

基础连接示例

importpsycopg2frompsycopg2importOperationalErrordefcreate_connection(db_name,db_user,db_password,db_host,db_port):"""创建数据库连接并返回 Connection 对象"""connection=Nonetry:connection=psycopg2.connect(database=db_name,user=db_user,password=db_password,host=db_host,port=db_port,)print("PostgreSQL 连接成功 ✅")exceptOperationalErrorase:print(f"连接失败 ❌:{e}")returnconnection# 替换为你的数据库信息conn=create_connection(db_name="test_db",db_user="postgres",db_password="123456",db_host="localhost",db_port="5432")

2. 游标创建与 SQL 执行(cursor()/execute())

创建连接后,需通过conn.cursor()创建游标,再用游标执行 SQL 语句:

  • cursor.execute(sql, params):执行单条 SQL 语句(支持参数化查询);
  • cursor.executemany(sql, params_list):批量执行相同结构的 SQL 语句;
  • psycopg2.extras.execute_batch(cursor, sql, params_list, page_size=100):高性能批量执行(推荐替代executemany)。
(1)创建数据表示例
defcreate_table(connection):"""创建用户表(users)"""create_table_query=""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """try:# 创建游标cursor=connection.cursor()# 执行 SQLcursor.execute(create_table_query)# 提交事务(psycopg2 默认关闭自动提交,必须手动提交)connection.commit()print("数据表创建成功 ✅")exceptExceptionase:print(f"创建表失败 ❌:{e}")# 异常时回滚事务connection.rollback()finally:# 关闭游标(避免资源泄漏)cursor.close()# 调用创建表函数ifconn:create_table(conn)
(2)参数化查询(防 SQL 注入)

关键:psycopg2 使用%s作为占位符(而非 Python 的{}%),参数需以元组/列表传入

definsert_single_user(connection,name,age,email):"""插入单条用户数据(参数化查询)"""insert_query=""" INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; # 避免重复插入 """try:cursor=connection.cursor()# 传入参数(元组形式)cursor.execute(insert_query,(name,age,email))connection.commit()print(f"插入用户{name}成功 ✅")exceptExceptionase:print(f"插入失败 ❌:{e}")connection.rollback()finally:cursor.close()# 插入单条数据ifconn:insert_single_user(conn,"张三",25,"zhangsan@example.com")
(3)批量插入数据
frompsycopg2importextrasdefbatch_insert_users(connection,users_list):"""批量插入用户数据(高性能版)"""insert_query=""" INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; """try:cursor=connection.cursor()# 高性能批量执行(page_size 控制每次批量提交的条数)extras.execute_batch(cursor,insert_query,users_list,page_size=100)connection.commit()print(f"批量插入{len(users_list)}条数据成功 ✅")exceptExceptionase:print(f"批量插入失败 ❌:{e}")connection.rollback()finally:cursor.close()# 批量插入示例数据ifconn:users_data=[("李四",28,"lisi@example.com"),("王五",30,"wangwu@example.com"),("赵六",22,"zhaoliu@example.com")]batch_insert_users(conn,users_data)

3. 数据查询(fetchone()/fetchmany()/fetchall())

执行查询类 SQL(SELECT)后,需通过游标获取结果:

  • cursor.fetchone():获取下一条结果(返回元组,无数据时返回 None);
  • cursor.fetchmany(size):获取指定条数的结果(返回列表,元素为元组);
  • cursor.fetchall():获取所有剩余结果(返回列表,元素为元组);
  • cursor.rowcount:返回受上一条 SQL 影响的行数(查询时为匹配的行数)。

查询示例

defquery_users(connection,age_min=0):"""查询年龄大于等于 age_min 的用户"""query=""" SELECT id, name, age, email, create_time FROM users WHERE age >= %s; """try:cursor=connection.cursor()cursor.execute(query,(age_min,))# 方式1:获取单条数据# single_user = cursor.fetchone()# if single_user:# print("单条结果:", single_user)# 方式2:获取指定条数(如2条)# partial_users = cursor.fetchmany(2)# print("部分结果:", partial_users)# 方式3:获取所有结果all_users=cursor.fetchall()print(f"\n查询到{cursor.rowcount}条符合条件的用户:")foruserinall_users:# 解析元组(id, name, age, email, create_time)print(f"ID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 创建时间:{user[4]}")exceptExceptionase:print(f"查询失败 ❌:{e}")finally:cursor.close()# 查询年龄≥25的用户ifconn:query_users(conn,age_min=25)

4. 数据更新与删除

更新/删除操作与插入逻辑一致,需注意事务提交和参数化:

defupdate_user_age(connection,email,new_age):"""根据邮箱更新用户年龄"""update_query=""" UPDATE users SET age = %s WHERE email = %s; """try:cursor=connection.cursor()cursor.execute(update_query,(new_age,email))connection.commit()ifcursor.rowcount>0:print(f"更新{email}的年龄为{new_age}成功 ✅")else:print(f"未找到邮箱为{email}的用户 ❌")exceptExceptionase:print(f"更新失败 ❌:{e}")connection.rollback()finally:cursor.close()defdelete_user(connection,user_id):"""根据ID删除用户"""delete_query="DELETE FROM users WHERE id = %s;"try:cursor=connection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()ifcursor.rowcount>0:print(f"删除ID为{user_id}的用户成功 ✅")else:print(f"未找到ID为{user_id}的用户 ❌")exceptExceptionase:print(f"删除失败 ❌:{e}")connection.rollback()finally:cursor.close()# 执行更新和删除ifconn:update_user_age(conn,"zhangsan@example.com",26)delete_user(conn,3)# 删除ID为3的用户query_users(conn)# 重新查询验证结果

5. 事务管理(commit()/rollback())

psycopg2 默认关闭「自动提交」模式,所有修改类操作(INSERT/UPDATE/DELETE/CREATE)都需要手动调用conn.commit()确认;若执行过程中出现异常,需调用conn.rollback()回滚事务,避免数据不一致。

事务回滚示例

deftest_transaction(connection):"""测试事务回滚"""try:cursor=connection.cursor()# 第一步:插入数据cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)",("测试用户",99,"test@example.com"))# 第二步:故意触发错误(比如插入重复邮箱)cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)",("重复用户",88,"zhangsan@example.com"))# 无异常则提交connection.commit()exceptExceptionase:print(f"事务执行失败,触发回滚 ❌:{e}")connection.rollback()# 回滚所有未提交的操作finally:cursor.close()# 测试事务回滚(最终 "测试用户" 不会被插入)ifconn:test_transaction(conn)query_users(conn)

6. 类型转换(PostgreSQL ↔ Python)

psycopg2 会自动完成 PostgreSQL 类型与 Python 类型的转换,常用映射关系如下:

PostgreSQL 类型Python 类型
INT/SERIALint
VARCHAR/TEXTstr
TIMESTAMP/DATEdatetime.datetime/date
BOOLEANbool
ARRAYlist
JSON/JSONBdict/list(需导入 extras)

JSON 类型操作示例

frompsycopg2.extrasimportJsondeftest_json_type(connection):"""测试 JSON 类型字段操作"""# 1. 先添加 JSON 字段alter_query="ALTER TABLE users ADD COLUMN IF NOT EXISTS info JSONB;"# 2. 更新 JSON 数据update_query="UPDATE users SET info = %s WHERE email = %s;"try:cursor=connection.cursor()cursor.execute(alter_query)# 传入 Python 字典(通过 Json 封装)user_info={"hobby":["篮球","编程"],"address":"北京市"}cursor.execute(update_query,(Json(user_info),"zhangsan@example.com"))connection.commit()# 3. 查询 JSON 字段cursor.execute("SELECT name, info FROM users WHERE email = %s;",("zhangsan@example.com",))result=cursor.fetchone()print(f"\nJSON 字段查询结果:")print(f"姓名:{result[0]}, 信息:{result[1]}")print(f"提取 hobby:{result[1]['hobby']}")# 直接按字典访问exceptExceptionase:print(f"JSON 操作失败 ❌:{e}")connection.rollback()finally:cursor.close()ifconn:test_json_type(conn)

7. 连接池(生产环境必备)

频繁创建/关闭连接会消耗大量资源,生产环境建议使用连接池(psycopg2.pool)复用连接:

frompsycopg2importpool# 创建连接池(最小1个,最大5个连接)connection_pool=pool.SimpleConnectionPool(minconn=1,maxconn=5,database="test_db",user="postgres",password="123456",host="localhost",port="5432")defuse_pooled_connection():"""使用连接池获取连接"""# 从池获取连接conn=connection_pool.getconn()ifconn:print("\n从连接池获取连接成功 ✅")query_users(conn)# 归还连接到池(不是关闭)connection_pool.putconn(conn)# 测试连接池use_pooled_connection()# 关闭连接池(程序退出时)connection_pool.closeall()

8. 资源释放

使用完连接和游标后,必须关闭以释放资源,推荐通过finally块确保执行:

# 最终关闭连接(非连接池场景)ifconn:try:conn.close()print("\n数据库连接已关闭 ✅")exceptExceptionase:print(f"关闭连接失败 ❌:{e}")

三、完整实战脚本(串联所有 API)

以下脚本整合了上述所有常用操作,可直接运行(需替换数据库信息):

importpsycopg2frompsycopg2importOperationalError,extrasfrompsycopg2.extrasimportJsonfrompsycopg2importpool# 1. 创建数据库连接(或连接池)defcreate_connection(db_name,db_user,db_password,db_host,db_port):connection=Nonetry:connection=psycopg2.connect(database=db_name,user=db_user,password=db_password,host=db_host,port=db_port,)print("PostgreSQL 连接成功 ✅")exceptOperationalErrorase:print(f"连接失败 ❌:{e}")returnconnection# 2. 创建数据表defcreate_table(connection):create_table_query=""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, info JSONB ); """try:cursor=connection.cursor()cursor.execute(create_table_query)connection.commit()print("数据表创建成功 ✅")exceptExceptionase:print(f"创建表失败 ❌:{e}")connection.rollback()finally:cursor.close()# 3. 插入/更新/删除/查询definsert_user(connection,user_data):insert_query="INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING;"try:cursor=connection.cursor()extras.execute_batch(cursor,insert_query,user_data,page_size=100)connection.commit()print(f"批量插入{len(user_data)}条数据成功 ✅")exceptExceptionase:print(f"插入失败 ❌:{e}")connection.rollback()finally:cursor.close()defupdate_user_info(connection,email,info):update_query="UPDATE users SET info = %s WHERE email = %s;"try:cursor=connection.cursor()cursor.execute(update_query,(Json(info),email))connection.commit()print(f"更新{email}的扩展信息成功 ✅")exceptExceptionase:print(f"更新失败 ❌:{e}")connection.rollback()finally:cursor.close()defquery_users(connection,age_min=0):query="SELECT id, name, age, email, info FROM users WHERE age >= %s;"try:cursor=connection.cursor()cursor.execute(query,(age_min,))all_users=cursor.fetchall()print(f"\n查询到{cursor.rowcount}条用户数据:")foruserinall_users:print(f"ID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 扩展信息:{user[4]}")exceptExceptionase:print(f"查询失败 ❌:{e}")finally:cursor.close()defdelete_user(connection,user_id):delete_query="DELETE FROM users WHERE id = %s;"try:cursor=connection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()print(f"删除ID为{user_id}的用户{'成功'ifcursor.rowcount>0else'失败'}✅")exceptExceptionase:print(f"删除失败 ❌:{e}")connection.rollback()finally:cursor.close()# 主流程if__name__=="__main__":# 替换为你的数据库信息DB_CONFIG={"db_name":"test_db","db_user":"postgres","db_password":"123456","db_host":"localhost","db_port":"5432"}# 创建连接conn=create_connection(**DB_CONFIG)ifnotconn:exit(1)# 执行核心操作create_table(conn)insert_user(conn,[("张三",25,"zhangsan@example.com"),("李四",28,"lisi@example.com"),("王五",30,"wangwu@example.com")])update_user_info(conn,"zhangsan@example.com",{"hobby":["篮球","编程"],"address":"北京市"})query_users(conn,age_min=25)delete_user(conn,3)query_users(conn,age_min=25)# 关闭连接ifconn:conn.close()print("\n数据库连接已关闭 ✅")

四、常见问题与注意事项

  1. SQL 注入风险:严禁拼接 SQL 字符串,必须使用%s占位符传参;
  2. 编码问题:PostgreSQL 默认编码为 UTF8,Python 脚本需确保编码一致;
  3. 连接超时:远程连接时需设置connect_timeout参数(如connect(..., connect_timeout=10));
  4. 大结果集处理:避免使用fetchall(),改用fetchone()fetchmany()分批读取,防止内存溢出;
  5. 版本兼容:psycopg2-binary 版本需与 PostgreSQL 服务版本适配(如 2.9.x 适配 PostgreSQL 12+)。

五、总结

psycopg2-binary 的核心流程可总结为:
创建连接 → 创建游标 → 执行 SQL → 处理结果 → 提交/回滚事务 → 释放资源

掌握connect()cursor()execute()commit()fetch*()等核心 API,结合参数化查询、事务管理和连接池,即可安全、高效地实现 PostgreSQL 数据库的增删改查。生产环境中还需注意异常捕获、资源释放和性能优化(如批量操作、索引设计),确保系统稳定运行。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/7 1:03:33

Go + 边缘计算:工业质检 AI 模型部署实践指南

在工业生产领域,产品质检是保障出厂质量的关键环节。传统人工质检方式受限于人眼疲劳、主观判断差异等问题,存在效率低、漏检率高、成本攀升等痛点。随着AI计算机视觉技术的发展,基于图像识别的工业质检模型应运而生,但模型部署环…

作者头像 李华
网站建设 2026/5/7 1:03:32

基于springboot + vue宠物天然粮食商城系统

宠物天然粮食商城系统 目录 基于springboot vue宠物天然粮食商城系统 一、前言 二、系统功能演示 详细视频演示 三、技术选型 四、其他项目参考 五、代码参考 六、测试参考 七、最新计算机毕设选题推荐 八、源码获取: 基于springboot vue宠物天然粮食商…

作者头像 李华
网站建设 2026/5/6 12:41:41

R语言绘制复杂加权数据(nhanes数据)多模型生存分析决策曲线

决策曲线分析(Decision Curve Analysis, DCA)是一种用于评估、比较和优化诊断试验、预测模型或分子标志物临床实用性的统计方法。它由Andrew J. Vickers和Eugene B. Elkin于2006年提出,旨在弥补传统统计指标(如灵敏度、特异度、RO…

作者头像 李华
网站建设 2026/5/3 10:15:52

EmotiVoice社区生态崛起:插件、工具与二次开发案例汇总

EmotiVoice社区生态崛起:插件、工具与二次开发案例汇总 在虚拟主播深夜直播带货、AI老师为学生定制朗读课文、游戏NPC因剧情转折突然语气颤抖的今天,语音合成早已不再满足于“把字念出来”。用户期待的是能传递情绪、具备个性、甚至带有“人格”的声音。…

作者头像 李华
网站建设 2026/5/1 15:46:01

方才的系分架构训练营升级啦!别错过!

Hello,我是方才。先做个简单的自我介绍,认识下:【城市】重庆【职业|经验】在职15人研发leader 7年【架构经验】4年架构经验,负责过多个大型项目(单表超10亿,整体超100亿的海量业务数据)的架构设…

作者头像 李华
网站建设 2026/5/1 15:46:49

2026最新版Ubuntu安装Docker(20.10+)和Docker Compose

一、安装Docker 准备环境#安装前先卸载操作系统默认安装的docker, sudo apt-get remove docker docker-engine docker.io containerd runc#安装必要支持 sudo apt install apt-transport-https ca-certificates curl software-properties-common gnupg lsb-releas…

作者头像 李华