[toc]
SqlAlchemy2.x笔记1
SqlAlchemy2.x 是指 SqlAlchemy 框架的2.x版本,相比 SqlAlchemy 1.x 版本进行了重大升级和改进。
目前最新的稳定版本为2.0.45
注意:目前SqlAlchemy2.x版本是兼容SqlAlchemy 1.x版本。介绍
SqlAlchemy官网 https://www.sqlalchemy.org/
官网截图 
SQLAlchemy是一个基于Python实现的SQL工具包和ORM框架。
使用SQLAlchemy可以省去很多手动管理数据库连接、资源、事务等重复工作。让开发者更加高效地使用数据库。
许多大型Python项目都选择使用SQLAlchemy作为ORM框架。
SqlAlchemy的特点
- 支持Postgres、MySQL、Oracle等主流数据库。
- SQLAlchemy提供丰富的查询方式,如过滤、分组、联结等,可以构建复杂查询。
- 异步查询:基于Greenlet等实现异步查询,提高查询效率。
- 事务控制: 通过Session管理数据库会话和事务。
- 工具集成:如数据迁移工具Alembic,可以实现Schema版本控制和迁移。
- 大数据集查询:基于Pagination实现数据分页,避免大量数据查询内存溢出。
安装
安装SqlAlchemy
SqlAlchemy本身需要数据库驱动才能进行数据库的操作。因此安装SqlAlchemy还需要安装相应的数据库驱动。
使用pip安装SqlAlchemy和pymysql数据库驱动
# 安装指定版本的SqlAlchemy
pip3 install sqlalchemy
pip3 install pymysql在python终端中查询SqlAlchemy的版本
# python终端
>>> import sqlalchemy
>>> sqlalchemy.__version__
'2.0.45'
>>>创建数据库连接
数据库连接是指应用程序与数据库服务器之间的通信渠道。
# 导入sqlalchemy的create_engine方法
from sqlalchemy import create_engine
# mysql数据库的连接URL
MYSQL_DATABASE_URL = "mysql+pymysql://root:123456@localhost:3306/my_test"
# 创建数据库引擎myEngine
myEngine = create_engine(MYSQL_DATABASE_URL,
pool_size=10, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
echo=False # 是否在控制台打印相关语句等
)
# 创建数据库连接对象
conn = myEngine.connect()数据库连接URL的参数说明:
- 数据库类型:mysql
- 数据库驱动:pymysql
- 用户名:root
- 密码:123456
- 服务器地址:localhost
- 端口:3306
- 数据库名称:my_test
注意:echo=False 用于开启或关闭数据库引擎的日志输出。若开启,会在控制台打印相关SQL语句等。
创建会话工厂和会话对象
- sessionmaker 是 sqlalchemy.orm 模块中的一个方法,用于创建会话工厂。
- session 会话对象,是用于与数据库进行交互的对象。
通过会话工厂来创建会话对象并对会话对象进行管理。然后操作会话对象来进行数据库的增删改查操作。
# 导入sqlalchemy框架中的各个工具
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# mysql数据库的连接URL
MYSQL_DATABASE_URL = "mysql+pymysql://root:123456@localhost:3306/shuyx_db"
# 创建数据库引擎myEngine
myEngine = create_engine(MYSQL_DATABASE_URL,
pool_size=10, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
echo=False # 是否在控制台打印相关语句等
)
# 创建会话工厂对象mySessionLocal
mySessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=myEngine, expire_on_commit=False)
#通过会话工厂创建新的会话session对象实例 db_session
db_session = mySessionLocal()
# 该函数每次通过会话工厂创建新的会话session对象。确保每个请求都有独立的会话。从而避免了并发访问同一个会话对象导致的事务冲突
def get_db_session():
db_session = mySessionLocal() #每次通过会话工厂创建新的会话session对象
try:
yield db_session
except:
db_session.rollback()
raise
finally:
db_session.close()创建数据库基类Base
在1.x版本中,使用declarative_base方法创建一个基类Base,然后从这个基类派生自定义模型类。
# 导入 declarative_base
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()在2.x版本中,declarative_base()方法 被 DeclarativeBase类 取代。通过继承DeclarativeBase类来创建一个基类Base,然后从这个基类派生自定义模型类。
from sqlalchemy.orm import DeclarativeBase
# 定义数据库基类Base,继承DeclarativeBase类
class Base(DeclarativeBase):
pass表结构
创建表
在SQLAlchemy中,有多种方式来创建表。
- 方式1:使用MetaData和Table对象直接定义表。使用Column方法来定义表的字段。
- 方式2:在1.x版本中,使用declarative_base方法创建一个基类Base,然后使用
Column()方法定义表字段。 - 方式3:在2.x版本中,使用DeclarativeBase类来创建基类。然后使用
mapped_column()方法定义表字段。
这几种方式都需要在最后调用metadata.create_all(engine)或者Base.metadata.create_all(engine)来创建表。第一种方式更加面向对象,第二种方式更加直接。
注意:如果表已经存在,那么SQLAlchemy不会对已经存在的表进行表结构上的更新。那么需要通过其他方式来更新表结构。
方式1 使用MetaData和Table对象直接定义表
使用MetaData和Table对象直接定义表。使用Column方法来定义表的字段。
# 导入sqlalchemy
import sqlalchemy
from sqlalchemy import Table,Column,Integer,String,Date
# 创建数据库引擎
engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/my_test",echo=True)
# 创建元数据
meta_data = sqlalchemy.MetaData()
# 定义表结构
school = Table(
"t_school",meta_data,
Column("id",Integer,primary_key=True),
Column("name",String(128),unique=True,nullable=True),
Column("address",String(128)),
Column("create_time",Date),
Column("update_time",Date)
)
# 调用元数据对象的create_all方法来创建表.
# 模型映射到数据库之中
meta_data.create_all(engine)执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。
方式2 在1.x版本中,使用Column()方法定义表字段。
# 导入sqlalchemy中的各个方法
from sqlalchemy import create_engine, Column, Integer,String,DateTime,func
# 导入 declarative_base
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()
# 自定义的模型类TClass需要继承基类
class TClass(Base):
# 定义表名
__tablename__ = 't_class'
# 定义表字段
id = Column(Integer, primary_key=True,autoincrement=True,comment="id")
name = Column(String(50),unique=True, nullable=False,comment="班级名称")
address = Column(String(50),comment="班级地址")
remark = Column(String(50), default="", comment="备注")
created_time = Column(DateTime, server_default=func.now(), comment="创建时间")
updated_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="修改时间")
# 创建表
# 模型映射到数据库之中
Base.metadata.create_all(engine)- Base类 是用于定义 ORM 模型类的基类,提供了一些方便的功能。比如提供了一些CRUD的方法等。
- 自定义的模型类需要继承Base类。
primary_key=True是否是主键。unique=True是否是唯一值nullable=False是否为nullserver_default=func.now(), onupdate=func.now()创建或者修改的时候值为当前时间。
执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。
方式3 在2.x版本中,使用mapped_column()方法来定义表的字段。
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import mapped_column
# 定义数据库基类Base,继承DeclarativeBase类
class Base(DeclarativeBase):
pass
# 定义班级表User模型类,通过继承Base类来定义表结构。使用mapped_column方法来定义表的字段。
class TClass(Base):
# 定义表名
__tablename__ = 't_class'
# 定义表字段
# 定义id字段,python类型为int 对应的数据库字段类型为int,主键,自增,注释为id
id: Mapped[int] = mapped_column(Integer, primary_key=True,autoincrement=True,comment="id")
# 定义name字段,python类型为str 对应的数据库字段类型为varchar(50),唯一值,不能为空,注释为班级名称
name: Mapped[str] = mapped_column(String(50),unique=True, nullable=False,comment="班级名称")
address: Mapped[str] = mapped_column(String(50),comment="班级地址")
remark: Mapped[str] = mapped_column(String(50), default="", comment="备注")
created_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), comment="创建时间")
updated_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(), comment="修改时间")
# 创建表
# 执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。
Base.metadata.create_all(engine)- Base类 是用于定义 ORM 模型类的基类,提供了一些方便的功能。比如提供了一些CRUD的方法等。
- 自定义的模型类需要继承Base类。
primary_key=True是否是主键。unique=True是否是唯一值nullable=False是否为nullserver_default=func.now(), onupdate=func.now()创建或者修改的时候值为当前时间。
执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。
修改表
SQLAlchemy 不支持通过代码的形式直接修改表结构,但是可以执行原生的 SQL 来进行 ALTER TABLE 的操作。
增删改查 CRUD
SQLAlchemy创建表的方式不同,那么增删改查数据的操作方式也不同。
下面是基于 ORM 模式的方式来进行增删改查操作。
插入数据
1.x版本:直接通过会话对象来直接插入数据
1.x版本新增操作,主要是通过会话对象的add()方法,add_all()方法来添加数据。
# 导入之前创建好的模型类和会话对象
from test03 import TClass,db_session
# 1. 单条创建
def create(db_session: Session):
# 创建一个模型类对象作为插入的数据
one = TClass(name='jack',address="北京")
# add方法将对象添加到会话中
db_session.add(one)
# 提交会话(写入数据库)
db_session.commit()
# 刷新实例(获取自增ID、默认值等)
db_session.refresh(one)
# 关闭会话对象
db_session.close()
# 返回插入的对象
return one
# 2. 批量创建
def bulk_create(db_session: Session):
# 创建多个模型类对象
obj_list = [
TClass(name='tom',address="河南"),
TClass(name='bob',address="河南")
]
# add_all方法将多个对象添加到会话中
db_session.add_all(c)
# 提交会话(写入数据库)
db_session.commit()
# 刷新所有实例(获取自增ID、默认值等)
for item in obj_list:
db_session.refresh(item)
# 关闭会话
db_session.close()
# 返回插入的对象列表
return obj_list2.x版本:通过insert函数来创建insert语句
2.x版本新增操作。先通过insert函数来创建insert语句,然后调用会话对象的execute方法执行insert语句。
- scalar_one()方法用于获取一个模型对象或结果元组,确保返回的结果是唯一的。如果查询结果不唯一,将引发异常。
- scalars()方法会返回一个ScalarsResult对象。
- scalars().all() 获取所有实例
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import insert
# 1. 单条创建
def create(db_session: Session):
# 通过insert函数来创建insert语句
insert_sql = insert(TClass).values(name='xaxx', address='安徽')
# 打印insert语句
print(insert_sql)
# 通过会话对象,调用execute方法执行insert语句
result = db_session.execute(insert_sql)
# 提交会话对象
db_session.commit()
# 关闭会话对象
db_session.close()
# scalar_one方法提取单个实例
return result.scalar_one()
# 2. 批量创建(2.x高效写法:insert构造器)
def bulk_create(db_session: Session):
# 先创建insert语句
batch_insert_sql = insert(TClass).values([
{"name":'xxxss',"address":'安徽'},
{"name":'sdsdss',"address":'安徽2'}
])
# 打印insert语句
print(batch_insert_sql)
# 通过execute方法执行sql语句。
result = db_session.execute(batch_insert_sql)
# 提交会话对象
db_session.commit()
# 关闭会话对象
db_session.close()
# scalars()提取实例列表
return result.scalars().all()更新数据
下面是更新数据的两种方式。
1.x 版本:通过会话对象来直接更新数据
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import insert
# 单条更新
def update(db_session: Session):
# 先query查询出记录,然后update修改这条记录
affected_rows = db_session.query(TClass).filter(TClass.id == 2).update({'name': 'jack1'})
# 提交会话
db_session.commit()
# 关闭会话
db_session.close()
# 返回受影响行数
return affected_rows2.x 版本:通过update函数来创建更新语句
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import update
# 更新(2.x新写法:update构造器)
def update(db_session: Session):
# 先构建update语句
update_sql = update(TClass).where(TClass.id == 2).values(name='xasxx', address='安徽3')
# 打印更新语句
print(update_sql)
# 通过会话对象的execute方法执行语句
result = db_session.execute(update_sql)
# 提交会话对象
db_session.commit()
# 关闭会话对象
db_session.close()
# 返回受影响行数
return len(result.scalars().all())删除数据
1.x版本:通过会话对象来直接删除数据
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
# 单条删除
def delete(db_session: Session) -> bool:
# 先查询出一条记录,然后delete()删除这条记录。返回执行被删除的条数
affected_rows = db_session.query(TClass).filter(TClass.id == 2).delete()
# 提交会话
db_session.commit()
# 关闭会话
db_session.close()
return affected_rows > 0
# 2. 批量删除(条件删除)
def bulk_delete(db_session: Session) -> bool:
# 返回被删除的条数
affected_rows = db_session.query(TClass).filter(TClass.id < 10).delete()
# 提交会话
db_session.commit()
# 关闭会话
db_session.close()
return affected_rows > 02.x版本:先通过delete函数来创建删除语句
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import delete
# 删除(2.x新写法:delete构造器)
def delet(db_session: Session) -> int:
# 先构建sql语句
delete_sql = delete(TClass).where(TClass.id == 2)
# 打印语句
print(delete_sql)
# 通过会话对象的execute方法执行语句
result = db_session.execute(delete_sql)
# 提交会话对象
db_session.commit()
# 关闭会话对象
db_session.close()
# 返回受影响行数
return len(result.scalars().all())查询数据
查询操作不涉及到事务,因此会话对象不需要提交。但是最终还是要关闭会话对象。
- 1.x版本中,查询数据通常使用
session.query()搭配filter()方法来查询数据。然后通过first()/all()方法来获取单条记录或多条记录。 - 2.x版本中,查询数据通常使用
select()函数和where()函数来查询数据。然后通过scalar_one_or_none()/scalars().all()方法来获取单条记录或多条记录。
1.x版本:通过session.query()方法搭配filter()方法来查询数据
- and_()函数 来设置与条件查询
- or_()函数 来设置或条件查询
# 导入之前创建好的数据库模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import or_, and_,joinedload
# 1. 查询所有数据: all()
def test(db_session: Session):
all_list = db_session.query(TClass).all()
# 输出查询结果
for item in all_list:
print(item.id,item.name,item.address)
# 2. 查询单条记录
def test(db_session: Session):
result = db_session.query(TClass).first()
print(result.id,result.name,result.address)
# 3. 带条件的查询
def test(db_session: Session):
res = db_session.query(TClass).filter(TClass.id >= 2).all()
for i in res:
print(i.id,i.name,i.address)
# 4. and 查询 使用and_()函数
def test(db_session: Session):
res = db_session.query(TClass).filter(and_(TClass.id >= 2, TClass.name == 'jack')).all()
for i in res:
print(i.id,i.name,i.address)
# 5. or 查询,使用or_()函数
def test(db_session: Session):
res = db_session.query(TClass).filter(or_(TClass.id >= 2, TClass.name == 'jack')).all()
for i in res:
print(i.id,i.name,i.address)
# 额外补充 ======================
# 1. 查询所有数据: all()
all_list = db_session.query(TClass).all()
# 2. 查询第一条记录
result = db_session.query(TClass).first()
# 3. 带条件的范围查询
res = db_session.query(TClass).filter(TClass.id >= 2).all()
# 4. between查询,对于多个相同字段
res = db_session.query(TClass).filter(TClass.id >= 2, TClass.id <= 5).all()
# 5. and 查询,对于多个不同字段 使用and_()函数
res = db_session.query(TClass).filter(and_(TClass.id == 2, TClass.name == 'jack')).all()
# 6. or查询,使用or_()函数
res = db_session.query(TClass).filter(or_(TClass.id == 1, TClass.id == 3)).all()
# 7. order by 排序,默认asc升序
res = db_session.query(TClass).order_by(TClass.id).all()
# 降序查询 desc()
res = db_session.query(TClass).order_by(TClass.id.desc()).all()
# 8. in 查询。通过in_()函数
res = db_session.query(TClass).filter(TClass.id.in_([1,3,4])).all()
# 9. not in 查询
res = db_session.query(TClass).filter(~TClass.id.in_([1,3,4])).all()
# 10. like模糊查询。like()函数
res = db_session.query(TClass).filter(TClass.name.like('a%')).all()
# 11. group by 分组查询
res = db_session.query(TClass).group_by(TClass.id).all()
# 12. 分页查询。limit()函数和offset()函数
res = db_session.query(TClass).limit(2).offset(1).all()
# 13. 统计查询。count()函数
res = db_session.query(TClass).filter(TClass.id >= 2).count()2.x版本:通过select()函数构建查询语句,where()函数设置查询条件
- and_()函数 来设置与条件查询
- or_()函数 来设置或条件查询
- scalar_one_or_none()方法:返回单条记录或None(无结果返回None,多条抛异常)
- scalars().all()方法:返回所有记录(列表)
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import select
# 单条查询
def test(db_session: Session):
select_sql = select(TClass).where(TClass.id == 3)
# 通过会话对象的execute方法执行语句
result = db_session.execute(select_sql)
# 提取结果:scalar_one_or_none(无结果返回None,多条抛异常)
result.scalar_one_or_none()
# 多条查询
def test(db_session: Session):
select_sql = select(TClass).where(TClass.id >= 3)
# 通过会话对象的execute方法执行语句
result = db_session.execute(select_sql)
# 提取结果:scalar_one_or_none(无结果返回None,多条抛异常)
result.scalar_one_or_none()
# 多条件 and 查询
def test(db_session: Session):
select_sql = select(TClass).where(and_(TClass.id >= 3, TClass.name == 'jack'))
# 执行查询
result = db_session.execute(select_sql)
# scalars()提取模型实例 → all()转为列表
res = result.scalars().all()
for item in res:
print(item.id,item.name,item.address)
# 多条件 or 查询
def test(db_session: Session):
select_sql = select(TClass).where(or_(TClass.id >= 3, TClass.name == 'jack'))
# 执行查询
result = db_session.execute(select_sql)
# scalars()提取模型实例 → all()转为列表
res = result.scalars().all()
for item in res:
print(item.id,item.name,item.address)
# 分页查询
def test(db_session: Session):
# 查询第1页,每页100条数据
select_sql = select(TClass).offset(0).limit(100)
# 执行查询
result = db_session.execute(select_sql)
# scalars()提取模型实例 → all()转为列表
return result.scalars().all()
# 计数查询
def test(db_session: Session):
select_sql = select(func.count(TClass.id))
# 执行查询
result = db_session.execute(select_sql)
# scalar()提取单个计数值
return result.scalar()1.x 版本 和 2.x 版本 对比
| 场景 | 1.x版本 | 2.x版本 |
|---|---|---|
| 单条查询 | session.query(Model).filter(id==x).first() | select(Model).where(id==x) 搭配 scalar_one_or_none() |
| 条件查询 | filter(Model.a==x, Model.b==y) | where(Model.a==x, Model.b==y) |
| 批量查询 | query(Model).offset().limit().all() | select(Model).offset().limit() 搭配 scalars().all() |
| 计数查询 | query(Model).count() | select(func.count(Model.id)) 搭配 scalar() |
| 关联查询 | query(Model).options(joinedload()) | select(Model).options(joinedload()) |
| 单条创建 | session.add() + commit() + refresh() | 兼容 1.x 写法;或insert构造器 |
| 批量创建 | 循环add()方法,或bulk_save_objects()方法 | insert构造器 + execute()(更高效) |
| 单条更新 | 查询对象→赋值→commit | 兼容 1.x 写法 |
| 批量更新 | query(Model).filter().update() | update(Model).where().values() + execute |
| 单条删除 | 查询对象→db.delete ()→commit | 兼容 1.x 写法 |
| 批量删除 | query(Model).filter().delete() | delete(Model).where() + execute |
2.x版本中表的关联关系
在sqlalchemy中关联关系的实现依赖两个关键组件:
- 外键(Foreign Key):用于建立表与表之间的关联关系。外键约束确保引用的记录存在于关联的表中,从而维护数据的完整性。
- 关联属性(Relationship Attribute):在 ORM 模型中,通过关联属性来表示表之间的关联关系。关联属性可以是简单的属性,也可以是复杂的查询。
一对一关联关系
假设:一个用户(User)有一个用户资料(UserProfile),一个资料属于一个用户。
模型类定义
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship
from config_2x import Base
# User 类
class User(Base):
__tablename__ = "users"
username = Column(String(length=50), unique=True, nullable=False)
email = Column(String(length=100), unique=True, nullable=False)
# 设置关联关系
profile = relationship(
"UserProfile",
back_populates="user",
uselist=False, # 核心:标记为一对一关系,表示一个用户只有一个用户资料
lazy="joined" # 一对一推荐用 joined(立即 JOIN,无性能问题)
)
# 用户资料类
class UserProfile(Base):
__tablename__ = "user_profiles"
user_id = Column(Integer, ForeignKey("users.id"), unique=True, nullable=False) # unique 保证一对一
address = Column(Text, nullable=True)
phone = Column(String(20), unique=True, nullable=True)
# 设置关联关系
user = relationship(
"User",
back_populates="profile"
)进行一对一关联查询
# 关联查询
def get_user_with_profile(db: Session, user_id: int):
# 一对一用 lazy="joined",一次 JOIN 查询即可获取所有数据
stmt = select(User).options(joinedload(User.profile)).where(User.id == user_id)
# 执行查询,返回 User 实例(包含关联的 UserProfile)
user = db.execute(stmt).scalar_one_or_none()
# 输出用户资料的地址
print(user.profile.address)一对多关联关系
假设:一个用户有多个订单,一个订单属于一个用户。(一对多关联关系)
关联属性的定义
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from datetime import datetime
from config_2x import Base
class User(Base):
__tablename__ = "users"
username = Column(String(length=50), unique=True, nullable=False)
email = Column(String(length=100), unique=True, nullable=False)
create_time = Column(DateTime, default=datetime.now)
# 设置关联关系
# 2.x 默认 lazy="selectin"(批量预加载,解决 N+1)
orders = relationship(
"Order",
back_populates="user"
# lazy="selectin" # 2.x 默认,可省略
)
class Order(Base):
__tablename__ = "orders"
order_no = Column(String(length=30), unique=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
amount = Column(Float, nullable=False)
create_time = Column(DateTime, default=datetime.now)
# 设置关联关系
user = relationship(
"User",
back_populates="orders"
)进行一对多关联查询
# 2.x 查询:获取用户及其所有订单
from sqlalchemy.orm import Session
from sqlalchemy import select
from models_2x import User
def get_user_with_orders_2x(db: Session, user_id: int):
# 2.x 默认 lazy="selectin",无需手动预加载(自动批量查询)
stmt = select(User).options(joinedload(User.orders)).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()多对多关联关系
假设:一个用户(User)有多个角色(Role),一个角色属于多个用户。(多对多关联关系)
多对多关系的两个表中需要有一个中间表,用于存储关联关系。
用 relationship 中的secondary 参数,指定中间表。
模型类定义
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy.orm import relationship
from config_2x import Base
# User 类
class User(Base):
__tablename__ = "users"
username = Column(String(length=50), unique=True, nullable=False)
email = Column(String(length=100), unique=True, nullable=False)
# 多对多关联角色
roles = relationship(
"Role",
secondary=user_role, # 核心:指定中间表
back_populates="users",
lazy="selectin" # 多对多推荐用 selectin
)
# Role 类
class Role(Base):
__tablename__ = "roles"
name = Column(String(length=30), unique=True, nullable=False) # 角色名:admin/editor
# 反向关联用户
users = relationship(
"User",
secondary=user_role, # 指定中间表
back_populates="roles"
)
# 多对多中间表(无模型,仅表结构)
user_role = Table(
"user_roles", # 表名
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True)
)多对多关联查询
# 先查询用户,然后查询用户关联的角色
def get_user_with_roles(db: Session, user_id: int):
# 多对多预加载,避免 N+1
stmt = select(User).options(joinedload(User.roles)).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
print([role.name for role in user.roles]) # 无额外查询
# 先查询角色,然后查询角色关联的用户
def get_role_with_users(db: Session, role_name: str):
stmt = select(Role).options(joinedload(Role.users)).where(Role.name == role_name)
role = db.execute(stmt).scalar_one_or_none()
print([user.username for user in role.users])无关联关系的多表查询
无关联关系的多表查询,即多个表之间没有外键约束关系,表中也没有强制设置关联关系。表与表之间仅仅通过字段进行逻辑上的关联。
下面示例仅供参考
基础join查询(订单 + 商品,无外键)
from sqlalchemy.orm import Session
from sqlalchemy import select, join
from models_2x import Order, Product
# select.join(最常用)
def get_order_product_join_2x(db: Session, order_no: str = None):
# 基础join查询(订单 + 商品,无外键)
stmt = select(
Order.order_no,
Product.product_name,
Product.price
).join(
Order,
Product,
Order.product_code == Product.product_code
)
if order_no:
stmt = stmt.where(Order.order_no == order_no)
result = db.execute(stmt)
return result.all()多表级联join查询(用户 + 订单 + 商品,无外键)
from sqlalchemy.orm import Session
from sqlalchemy import select, join
from models_2x import User, Order, Product
def get_user_order_product_2x(db: Session, user_id: int):
# 级联join查询(用户 + 订单 + 商品)
stmt = select(
User.username,
Order.order_no,
Product.product_name,
Product.price
).join(User, Order, User.id == Order.user_id)
.join(Product, Order.product_code == Product.product_code)
# 过滤用户ID
if user_id:
stmt = stmt.where(User.id == user_id)
# 执行查询
result = db.execute(stmt)
return result.all()左连接查询(用户 + 订单,无外键,保留左表所有记录)
from sqlalchemy.orm import Session
from sqlalchemy import select
from models_2x import Order, Product
def get_order_product_left_join_2x(db: Session):
stmt = select(
Order.order_no,
Product.product_name
).join(Product, Order.product_code == Product.product_code, isouter=True)
result = db.execute(stmt)
return result.all()