Skip to content
🗂️ 文章分类: Python  
🏷️ 文章标签: SqlAlchemy  
📝 文章创建时间: 2025-12-30
🔥 文章最后更新时间:2025-12-30

[toc]

SqlAlchemy2.x笔记1

SqlAlchemy2.x 是指 SqlAlchemy 框架的2.x版本,相比 SqlAlchemy 1.x 版本进行了重大升级和改进。

目前最新的稳定版本为2.0.45

注意:目前SqlAlchemy2.x版本是兼容SqlAlchemy 1.x版本。

介绍

SqlAlchemy官网 https://www.sqlalchemy.org/

官网截图 python_2025-12-30_102505_005.png

SQLAlchemy是一个基于Python实现的SQL工具包和ORM框架。

使用SQLAlchemy可以省去很多手动管理数据库连接、资源、事务等重复工作。让开发者更加高效地使用数据库。

许多大型Python项目都选择使用SQLAlchemy作为ORM框架。

SqlAlchemy的特点

  1. 支持Postgres、MySQL、Oracle等主流数据库。
  2. SQLAlchemy提供丰富的查询方式,如过滤、分组、联结等,可以构建复杂查询。
  3. 异步查询:基于Greenlet等实现异步查询,提高查询效率。
  4. 事务控制: 通过Session管理数据库会话和事务。
  5. 工具集成:如数据迁移工具Alembic,可以实现Schema版本控制和迁移。
  6. 大数据集查询:基于Pagination实现数据分页,避免大量数据查询内存溢出。

安装

安装SqlAlchemy

SqlAlchemy本身需要数据库驱动才能进行数据库的操作。因此安装SqlAlchemy还需要安装相应的数据库驱动。

使用pip安装SqlAlchemy和pymysql数据库驱动

sh
# 安装指定版本的SqlAlchemy
pip3 install sqlalchemy
pip3 install pymysql

在python终端中查询SqlAlchemy的版本

py
# python终端
>>> import sqlalchemy
>>> sqlalchemy.__version__
'2.0.45'
>>>

创建数据库连接

数据库连接是指应用程序与数据库服务器之间的通信渠道。

python
# 导入sqlalchemy的create_engine方法
from sqlalchemy import create_engine

# mysql数据库的连接URL
MYSQL_DATABASE_URL = "mysql+pymysql://root:123456@localhost:3306/my_test"

# 创建数据库引擎myEngine
myEngine = create_engine(MYSQL_DATABASE_URL,
   pool_size=10,            # 连接池大小
   pool_timeout=30,        # 池中没有线程最多等待的时间,否则报错
   echo=False              # 是否在控制台打印相关语句等
)

# 创建数据库连接对象
conn = myEngine.connect()

数据库连接URL的参数说明:

  • 数据库类型:mysql
  • 数据库驱动:pymysql
  • 用户名:root
  • 密码:123456
  • 服务器地址:localhost
  • 端口:3306
  • 数据库名称:my_test

注意:echo=False 用于开启或关闭数据库引擎的日志输出。若开启,会在控制台打印相关SQL语句等。

创建会话工厂和会话对象

  • sessionmaker 是 sqlalchemy.orm 模块中的一个方法,用于创建会话工厂。
  • session 会话对象,是用于与数据库进行交互的对象。

通过会话工厂来创建会话对象并对会话对象进行管理。然后操作会话对象来进行数据库的增删改查操作。

python
# 导入sqlalchemy框架中的各个工具
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# mysql数据库的连接URL
MYSQL_DATABASE_URL = "mysql+pymysql://root:123456@localhost:3306/shuyx_db"

# 创建数据库引擎myEngine
myEngine = create_engine(MYSQL_DATABASE_URL,
    pool_size=10,            # 连接池大小
    pool_timeout=30,        # 池中没有线程最多等待的时间,否则报错
    echo=False              # 是否在控制台打印相关语句等
)

# 创建会话工厂对象mySessionLocal
mySessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=myEngine, expire_on_commit=False)

#通过会话工厂创建新的会话session对象实例 db_session
db_session = mySessionLocal()  

# 该函数每次通过会话工厂创建新的会话session对象。确保每个请求都有独立的会话。从而避免了并发访问同一个会话对象导致的事务冲突
def get_db_session():
    db_session = mySessionLocal()  #每次通过会话工厂创建新的会话session对象
    try:
        yield db_session
    except:
        db_session.rollback()
        raise
    finally:
        db_session.close()

创建数据库基类Base

在1.x版本中,使用declarative_base方法创建一个基类Base,然后从这个基类派生自定义模型类。

python
# 导入 declarative_base
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()

在2.x版本中,declarative_base()方法 被 DeclarativeBase类 取代。通过继承DeclarativeBase类来创建一个基类Base,然后从这个基类派生自定义模型类。

python
from sqlalchemy.orm import DeclarativeBase
# 定义数据库基类Base,继承DeclarativeBase类
class Base(DeclarativeBase):
    pass

表结构

创建表

在SQLAlchemy中,有多种方式来创建表。

  • 方式1:使用MetaData和Table对象直接定义表。使用Column方法来定义表的字段。
  • 方式2:在1.x版本中,使用declarative_base方法创建一个基类Base,然后使用Column()方法定义表字段。
  • 方式3:在2.x版本中,使用DeclarativeBase类来创建基类。然后使用mapped_column()方法定义表字段。

这几种方式都需要在最后调用metadata.create_all(engine)或者Base.metadata.create_all(engine)来创建表。第一种方式更加面向对象,第二种方式更加直接。

注意:如果表已经存在,那么SQLAlchemy不会对已经存在的表进行表结构上的更新。那么需要通过其他方式来更新表结构。

方式1 使用MetaData和Table对象直接定义表

使用MetaData和Table对象直接定义表。使用Column方法来定义表的字段。

py
# 导入sqlalchemy
import sqlalchemy
from sqlalchemy import Table,Column,Integer,String,Date

# 创建数据库引擎
engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/my_test",echo=True)

# 创建元数据
meta_data = sqlalchemy.MetaData()

# 定义表结构
school = Table(
    "t_school",meta_data,
    Column("id",Integer,primary_key=True),
    Column("name",String(128),unique=True,nullable=True),
    Column("address",String(128)),
    Column("create_time",Date),
    Column("update_time",Date)
)

# 调用元数据对象的create_all方法来创建表.
# 模型映射到数据库之中
meta_data.create_all(engine)

执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。

方式2 在1.x版本中,使用Column()方法定义表字段。

python
# 导入sqlalchemy中的各个方法
from sqlalchemy import create_engine, Column, Integer,String,DateTime,func
# 导入 declarative_base
from sqlalchemy.ext.declarative import declarative_base

# 创建基类
Base = declarative_base()
# 自定义的模型类TClass需要继承基类
class TClass(Base):
    # 定义表名
    __tablename__ = 't_class'
    # 定义表字段
    id = Column(Integer, primary_key=True,autoincrement=True,comment="id")
    name = Column(String(50),unique=True, nullable=False,comment="班级名称")
    address = Column(String(50),comment="班级地址")
    remark = Column(String(50), default="", comment="备注")
    created_time = Column(DateTime, server_default=func.now(), comment="创建时间")
    updated_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="修改时间")
 
# 创建表
# 模型映射到数据库之中
Base.metadata.create_all(engine)
  • Base类 是用于定义 ORM 模型类的基类,提供了一些方便的功能。比如提供了一些CRUD的方法等。
  • 自定义的模型类需要继承Base类。
  • primary_key=True 是否是主键。
  • unique=True 是否是唯一值
  • nullable=False是否为null
  • server_default=func.now(), onupdate=func.now() 创建或者修改的时候值为当前时间。

执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。

方式3 在2.x版本中,使用mapped_column()方法来定义表的字段。

python
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import mapped_column
# 定义数据库基类Base,继承DeclarativeBase类
class Base(DeclarativeBase):
    pass

# 定义班级表User模型类,通过继承Base类来定义表结构。使用mapped_column方法来定义表的字段。
class TClass(Base):
    # 定义表名
    __tablename__ = 't_class'
    # 定义表字段

    # 定义id字段,python类型为int 对应的数据库字段类型为int,主键,自增,注释为id
    id: Mapped[int] = mapped_column(Integer, primary_key=True,autoincrement=True,comment="id")
    # 定义name字段,python类型为str 对应的数据库字段类型为varchar(50),唯一值,不能为空,注释为班级名称
    name: Mapped[str] = mapped_column(String(50),unique=True, nullable=False,comment="班级名称")
    address: Mapped[str] = mapped_column(String(50),comment="班级地址")
    remark: Mapped[str] = mapped_column(String(50), default="", comment="备注")
    created_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), comment="创建时间")
    updated_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(), comment="修改时间")

# 创建表
# 执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。
Base.metadata.create_all(engine)
  • Base类 是用于定义 ORM 模型类的基类,提供了一些方便的功能。比如提供了一些CRUD的方法等。
  • 自定义的模型类需要继承Base类。
  • primary_key=True 是否是主键。
  • unique=True 是否是唯一值
  • nullable=False是否为null
  • server_default=func.now(), onupdate=func.now() 创建或者修改的时候值为当前时间。

执行这段代码后,sqlalchemy会自动连接数据库,并创建表。如果表已经存在,则不会创建。

修改表

SQLAlchemy 不支持通过代码的形式直接修改表结构,但是可以执行原生的 SQL 来进行 ALTER TABLE 的操作。

增删改查 CRUD

SQLAlchemy创建表的方式不同,那么增删改查数据的操作方式也不同。

下面是基于 ORM 模式的方式来进行增删改查操作。

插入数据

1.x版本:直接通过会话对象来直接插入数据

1.x版本新增操作,主要是通过会话对象的add()方法,add_all()方法来添加数据。

py
# 导入之前创建好的模型类和会话对象
from test03 import TClass,db_session

# 1. 单条创建
def create(db_session: Session):
    # 创建一个模型类对象作为插入的数据
    one = TClass(name='jack',address="北京")
    # add方法将对象添加到会话中
    db_session.add(one)
    # 提交会话(写入数据库)
    db_session.commit()
    # 刷新实例(获取自增ID、默认值等)
    db_session.refresh(one)
    # 关闭会话对象
    db_session.close()
    # 返回插入的对象
    return one


# 2. 批量创建
def bulk_create(db_session: Session):
    # 创建多个模型类对象
    obj_list = [
        TClass(name='tom',address="河南"),
        TClass(name='bob',address="河南")
    ]
    # add_all方法将多个对象添加到会话中
    db_session.add_all(c)
    # 提交会话(写入数据库)
    db_session.commit()
    # 刷新所有实例(获取自增ID、默认值等)
    for item in obj_list:
        db_session.refresh(item)
    # 关闭会话
    db_session.close()
    # 返回插入的对象列表
    return obj_list

2.x版本:通过insert函数来创建insert语句

2.x版本新增操作。先通过insert函数来创建insert语句,然后调用会话对象的execute方法执行insert语句。

  • scalar_one()方法用于获取一个模型对象或结果元组,确保返回的结果是唯一的。如果查询结果不唯一,将引发异常。
  • scalars()方法会返回一个ScalarsResult对象。
    • scalars().all() 获取所有实例
py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import insert

# 1. 单条创建
def create(db_session: Session):
    # 通过insert函数来创建insert语句
    insert_sql = insert(TClass).values(name='xaxx', address='安徽')
    # 打印insert语句
    print(insert_sql)
    # 通过会话对象,调用execute方法执行insert语句
    result = db_session.execute(insert_sql)
    # 提交会话对象
    db_session.commit()
    # 关闭会话对象
    db_session.close()
    # scalar_one方法提取单个实例
    return result.scalar_one()

# 2. 批量创建(2.x高效写法:insert构造器)
def bulk_create(db_session: Session):
    # 先创建insert语句
    batch_insert_sql = insert(TClass).values([
        {"name":'xxxss',"address":'安徽'},
        {"name":'sdsdss',"address":'安徽2'}
    ])
    # 打印insert语句
    print(batch_insert_sql)
    # 通过execute方法执行sql语句。
    result = db_session.execute(batch_insert_sql)
    # 提交会话对象
    db_session.commit()
    # 关闭会话对象
    db_session.close()
    # scalars()提取实例列表
    return result.scalars().all()

更新数据

下面是更新数据的两种方式。

1.x 版本:通过会话对象来直接更新数据

py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import insert

# 单条更新
def update(db_session: Session):
    # 先query查询出记录,然后update修改这条记录
    affected_rows = db_session.query(TClass).filter(TClass.id == 2).update({'name': 'jack1'})
    # 提交会话
    db_session.commit()
    # 关闭会话
    db_session.close()

    # 返回受影响行数
    return affected_rows

2.x 版本:通过update函数来创建更新语句

py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import update

# 更新(2.x新写法:update构造器)
def update(db_session: Session):
    # 先构建update语句
    update_sql = update(TClass).where(TClass.id == 2).values(name='xasxx', address='安徽3')
    # 打印更新语句
    print(update_sql)
    # 通过会话对象的execute方法执行语句
    result = db_session.execute(update_sql)
    # 提交会话对象
    db_session.commit()
    # 关闭会话对象
    db_session.close()

    # 返回受影响行数
    return len(result.scalars().all())

删除数据

1.x版本:通过会话对象来直接删除数据

py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session

# 单条删除
def delete(db_session: Session) -> bool:
    # 先查询出一条记录,然后delete()删除这条记录。返回执行被删除的条数
    affected_rows = db_session.query(TClass).filter(TClass.id == 2).delete()
    # 提交会话
    db_session.commit()
    # 关闭会话
    db_session.close()
    return affected_rows > 0

# 2. 批量删除(条件删除)
def bulk_delete(db_session: Session) -> bool:
    # 返回被删除的条数
    affected_rows = db_session.query(TClass).filter(TClass.id < 10).delete()
    # 提交会话
    db_session.commit()
    # 关闭会话
    db_session.close()
    return affected_rows > 0

2.x版本:先通过delete函数来创建删除语句

py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import delete

# 删除(2.x新写法:delete构造器)
def delet(db_session: Session) -> int:
    # 先构建sql语句
    delete_sql = delete(TClass).where(TClass.id == 2)
    # 打印语句
    print(delete_sql)
    # 通过会话对象的execute方法执行语句
    result = db_session.execute(delete_sql)
    # 提交会话对象
    db_session.commit()
    # 关闭会话对象
    db_session.close()
    
    # 返回受影响行数
    return len(result.scalars().all())

查询数据

查询操作不涉及到事务,因此会话对象不需要提交。但是最终还是要关闭会话对象。

  • 1.x版本中,查询数据通常使用session.query()搭配filter()方法来查询数据。然后通过first()/all()方法来获取单条记录或多条记录。
  • 2.x版本中,查询数据通常使用select()函数和where()函数来查询数据。然后通过scalar_one_or_none()/scalars().all()方法来获取单条记录或多条记录。

1.x版本:通过session.query()方法搭配filter()方法来查询数据

  • and_()函数 来设置与条件查询
  • or_()函数 来设置或条件查询
py
# 导入之前创建好的数据库模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import or_, and_,joinedload

# 1. 查询所有数据: all()
def test(db_session: Session):
    all_list = db_session.query(TClass).all()
    # 输出查询结果
    for item in all_list:
        print(item.id,item.name,item.address)

# 2. 查询单条记录
def test(db_session: Session):
    result = db_session.query(TClass).first()
    print(result.id,result.name,result.address)

# 3. 带条件的查询
def test(db_session: Session):
    res = db_session.query(TClass).filter(TClass.id >= 2).all()
    for i in res:
        print(i.id,i.name,i.address)

# 4. and 查询 使用and_()函数
def test(db_session: Session):
    res = db_session.query(TClass).filter(and_(TClass.id >= 2, TClass.name == 'jack')).all()
    for i in res:
        print(i.id,i.name,i.address)

# 5. or 查询,使用or_()函数
def test(db_session: Session):
    res = db_session.query(TClass).filter(or_(TClass.id >= 2, TClass.name == 'jack')).all()
    for i in res:
        print(i.id,i.name,i.address)


# 额外补充 ======================

# 1. 查询所有数据: all()
all_list = db_session.query(TClass).all()
# 2. 查询第一条记录
result = db_session.query(TClass).first()
# 3. 带条件的范围查询
res = db_session.query(TClass).filter(TClass.id >= 2).all()
# 4. between查询,对于多个相同字段
res = db_session.query(TClass).filter(TClass.id >= 2, TClass.id <= 5).all()
# 5. and 查询,对于多个不同字段 使用and_()函数
res = db_session.query(TClass).filter(and_(TClass.id == 2, TClass.name == 'jack')).all()
# 6. or查询,使用or_()函数
res = db_session.query(TClass).filter(or_(TClass.id == 1, TClass.id == 3)).all()
# 7. order by 排序,默认asc升序
res = db_session.query(TClass).order_by(TClass.id).all()
# 降序查询 desc()
res = db_session.query(TClass).order_by(TClass.id.desc()).all()
# 8. in 查询。通过in_()函数
res = db_session.query(TClass).filter(TClass.id.in_([1,3,4])).all() 
# 9. not in 查询
res = db_session.query(TClass).filter(~TClass.id.in_([1,3,4])).all() 
# 10. like模糊查询。like()函数
res = db_session.query(TClass).filter(TClass.name.like('a%')).all()
# 11. group by 分组查询
res = db_session.query(TClass).group_by(TClass.id).all()
# 12. 分页查询。limit()函数和offset()函数
res = db_session.query(TClass).limit(2).offset(1).all()
# 13. 统计查询。count()函数
res = db_session.query(TClass).filter(TClass.id >= 2).count()

2.x版本:通过select()函数构建查询语句,where()函数设置查询条件

  • and_()函数 来设置与条件查询
  • or_()函数 来设置或条件查询
  • scalar_one_or_none()方法:返回单条记录或None(无结果返回None,多条抛异常)
  • scalars().all()方法:返回所有记录(列表)
py
# 导入之前创建好的模型类和数据库引擎对象和会话对象
from test03 import TClass,db_session
from sqlalchemy import select

# 单条查询
def test(db_session: Session):
    select_sql = select(TClass).where(TClass.id == 3)
    # 通过会话对象的execute方法执行语句
    result = db_session.execute(select_sql)
    # 提取结果:scalar_one_or_none(无结果返回None,多条抛异常)
    result.scalar_one_or_none()

# 多条查询
def test(db_session: Session):
    select_sql = select(TClass).where(TClass.id >= 3)
    # 通过会话对象的execute方法执行语句
    result = db_session.execute(select_sql)
    # 提取结果:scalar_one_or_none(无结果返回None,多条抛异常)
    result.scalar_one_or_none()
    

# 多条件 and 查询
def test(db_session: Session):
    select_sql = select(TClass).where(and_(TClass.id >= 3, TClass.name == 'jack'))
    # 执行查询
    result = db_session.execute(select_sql)
    # scalars()提取模型实例 → all()转为列表
    res = result.scalars().all()
    for item in res:
        print(item.id,item.name,item.address)

# 多条件 or 查询
def test(db_session: Session):
    select_sql = select(TClass).where(or_(TClass.id >= 3, TClass.name == 'jack'))
    # 执行查询
    result = db_session.execute(select_sql)
    # scalars()提取模型实例 → all()转为列表
    res = result.scalars().all()
    for item in res:
        print(item.id,item.name,item.address)

# 分页查询
def test(db_session: Session):
    # 查询第1页,每页100条数据
    select_sql = select(TClass).offset(0).limit(100)
    # 执行查询
    result = db_session.execute(select_sql)
    # scalars()提取模型实例 → all()转为列表
    return result.scalars().all()

# 计数查询
def test(db_session: Session):
    select_sql = select(func.count(TClass.id))
    # 执行查询
    result = db_session.execute(select_sql)
    # scalar()提取单个计数值
    return result.scalar()

1.x 版本 和 2.x 版本 对比

场景1.x版本2.x版本
单条查询session.query(Model).filter(id==x).first()select(Model).where(id==x) 搭配 scalar_one_or_none()
条件查询filter(Model.a==x, Model.b==y)where(Model.a==x, Model.b==y)
批量查询query(Model).offset().limit().all()select(Model).offset().limit() 搭配 scalars().all()
计数查询query(Model).count()select(func.count(Model.id)) 搭配 scalar()
关联查询query(Model).options(joinedload())select(Model).options(joinedload())
单条创建session.add() + commit() + refresh()兼容 1.x 写法;或insert构造器
批量创建循环add()方法,或bulk_save_objects()方法insert构造器 + execute()(更高效)
单条更新查询对象→赋值→commit兼容 1.x 写法
批量更新query(Model).filter().update()update(Model).where().values() + execute
单条删除查询对象→db.delete ()→commit兼容 1.x 写法
批量删除query(Model).filter().delete()delete(Model).where() + execute

2.x版本中表的关联关系

在sqlalchemy中关联关系的实现依赖两个关键组件:

  • 外键(Foreign Key):用于建立表与表之间的关联关系。外键约束确保引用的记录存在于关联的表中,从而维护数据的完整性。
  • 关联属性(Relationship Attribute):在 ORM 模型中,通过关联属性来表示表之间的关联关系。关联属性可以是简单的属性,也可以是复杂的查询。

一对一关联关系

假设:一个用户(User)有一个用户资料(UserProfile),一个资料属于一个用户。

模型类定义

python
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship
from config_2x import Base

# User 类
class User(Base):
    __tablename__ = "users"
    username = Column(String(length=50), unique=True, nullable=False)
    email = Column(String(length=100), unique=True, nullable=False)
    
    # 设置关联关系
    profile = relationship(
        "UserProfile",        
        back_populates="user",
        uselist=False,  # 核心:标记为一对一关系,表示一个用户只有一个用户资料
        lazy="joined"   # 一对一推荐用 joined(立即 JOIN,无性能问题)
    )

# 用户资料类
class UserProfile(Base):
    __tablename__ = "user_profiles"
    user_id = Column(Integer, ForeignKey("users.id"), unique=True, nullable=False)  # unique 保证一对一
    address = Column(Text, nullable=True)
    phone = Column(String(20), unique=True, nullable=True)
    
    # 设置关联关系
    user = relationship(
        "User",
        back_populates="profile"
    )

进行一对一关联查询

python
# 关联查询
def get_user_with_profile(db: Session, user_id: int):
    # 一对一用 lazy="joined",一次 JOIN 查询即可获取所有数据
    stmt = select(User).options(joinedload(User.profile)).where(User.id == user_id)
    # 执行查询,返回 User 实例(包含关联的 UserProfile)
    user = db.execute(stmt).scalar_one_or_none()
    # 输出用户资料的地址
    print(user.profile.address)

一对多关联关系

假设:一个用户有多个订单,一个订单属于一个用户。(一对多关联关系)

关联属性的定义

python
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from datetime import datetime
from config_2x import Base

class User(Base):
    __tablename__ = "users"
    username = Column(String(length=50), unique=True, nullable=False)
    email = Column(String(length=100), unique=True, nullable=False)
    create_time = Column(DateTime, default=datetime.now)
    
    # 设置关联关系
    # 2.x 默认 lazy="selectin"(批量预加载,解决 N+1)
    orders = relationship(
        "Order",
        back_populates="user"
        # lazy="selectin"  # 2.x 默认,可省略
    )

class Order(Base):
    __tablename__ = "orders"
    order_no = Column(String(length=30), unique=True, nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    amount = Column(Float, nullable=False)
    create_time = Column(DateTime, default=datetime.now)
    
    # 设置关联关系
    user = relationship(
        "User",
        back_populates="orders"
    )

进行一对多关联查询

python
# 2.x 查询:获取用户及其所有订单
from sqlalchemy.orm import Session
from sqlalchemy import select
from models_2x import User

def get_user_with_orders_2x(db: Session, user_id: int):
    # 2.x 默认 lazy="selectin",无需手动预加载(自动批量查询)
    stmt = select(User).options(joinedload(User.orders)).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()

多对多关联关系

假设:一个用户(User)有多个角色(Role),一个角色属于多个用户。(多对多关联关系)

多对多关系的两个表中需要有一个中间表,用于存储关联关系。

用 relationship 中的secondary 参数,指定中间表。

模型类定义

python
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy.orm import relationship
from config_2x import Base

# User 类
class User(Base):
    __tablename__ = "users"
    username = Column(String(length=50), unique=True, nullable=False)
    email = Column(String(length=100), unique=True, nullable=False)
    
    # 多对多关联角色
    roles = relationship(
        "Role",
        secondary=user_role,  # 核心:指定中间表
        back_populates="users",
        lazy="selectin"       # 多对多推荐用 selectin
    )

# Role 类
class Role(Base):
    __tablename__ = "roles"
    name = Column(String(length=30), unique=True, nullable=False)  # 角色名:admin/editor
    
    # 反向关联用户
    users = relationship(
        "User",
        secondary=user_role,  # 指定中间表
        back_populates="roles"
    )

# 多对多中间表(无模型,仅表结构)
user_role = Table(
    "user_roles",  # 表名
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True)
)

多对多关联查询

python
# 先查询用户,然后查询用户关联的角色
def get_user_with_roles(db: Session, user_id: int):
    # 多对多预加载,避免 N+1
    stmt = select(User).options(joinedload(User.roles)).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()
    print([role.name for role in user.roles])  # 无额外查询

# 先查询角色,然后查询角色关联的用户
def get_role_with_users(db: Session, role_name: str):
    stmt = select(Role).options(joinedload(Role.users)).where(Role.name == role_name)
    role = db.execute(stmt).scalar_one_or_none()
    print([user.username for user in role.users])

无关联关系的多表查询

无关联关系的多表查询,即多个表之间没有外键约束关系,表中也没有强制设置关联关系。表与表之间仅仅通过字段进行逻辑上的关联。

下面示例仅供参考

基础join查询(订单 + 商品,无外键)

python
from sqlalchemy.orm import Session
from sqlalchemy import select, join
from models_2x import Order, Product

# select.join(最常用)
def get_order_product_join_2x(db: Session, order_no: str = None):
    # 基础join查询(订单 + 商品,无外键)
    stmt = select(
        Order.order_no,
        Product.product_name,
        Product.price
    ).join(
        Order,
        Product,
        Order.product_code == Product.product_code
    )
    if order_no:
        stmt = stmt.where(Order.order_no == order_no)
    result = db.execute(stmt)
    return result.all()

多表级联join查询(用户 + 订单 + 商品,无外键)

python
from sqlalchemy.orm import Session
from sqlalchemy import select, join
from models_2x import User, Order, Product

def get_user_order_product_2x(db: Session, user_id: int):
    # 级联join查询(用户 + 订单 + 商品)
    stmt = select(
        User.username,
        Order.order_no,
        Product.product_name,
        Product.price
    ).join(User, Order, User.id == Order.user_id)
    .join(Product, Order.product_code == Product.product_code)

    # 过滤用户ID
    if user_id:
        stmt = stmt.where(User.id == user_id)

    # 执行查询
    result = db.execute(stmt)
    return result.all()

左连接查询(用户 + 订单,无外键,保留左表所有记录)

python
from sqlalchemy.orm import Session
from sqlalchemy import select
from models_2x import Order, Product

def get_order_product_left_join_2x(db: Session):
    stmt = select(
        Order.order_no,
        Product.product_name
    ).join(Product, Order.product_code == Product.product_code, isouter=True)
    result = db.execute(stmt)
    return result.all()

Released under the MIT License.