一次使用 SQLAlchemy 实现分类以及计数的业务过程

在编写业务逻辑代码的时候, 我不幸遇到下面的表结构(已经将主要逻辑抽离出来了):python

class Category(Model):
    __tablename__ = 'category'
    # 分类ID
    id = Column(Integer, primary_key=True, autoincrement=True)
    # 分类名称
    name = Column(String(length=255))
    
class Product(Model):
    __tablename__ = 'product'
    # 产品 ID
    id = Column(Integer, primary_key=True, autoincrement=True)
    # 产品名称
    name = Column(String(length=255))
    # 分类 ID
    category_id = Column(Integer)

如今须要实现的业务是返回分类的列表结果:sql

[
    {
        "id": 1,
        "name": "分类1",
        "product_count": 1
    },
    ...
]

这是一个一对多的模型.
通常的笨拙思路就是:数据库

data = []
categorys = Category.query.all()
for category in categorys:
    product_count = len(Product.query.filter(Product.category_id == category.id).all())
    data.append({
        'id': category.id,
        'name': category.name,
        'product_count': product_count
    })

明眼人一看就知道能够把len(Product.query.filter(Product.category_id == category.id).all())换成:json

product_count = Product.query.filter(Product.category_id == category.id).count()

可是, 根据这篇文章:[Why is SQLAlchemy count() much slower than the raw query?
](https://stackoverflow.com/que... 彷佛这样写会有更好的性能:网络

from sqlalchemy import func
session.query(func.count(Product.id)).filter(Product.category_id == category.id).scalar()

可是, 稍微有点经验的人就会对上面的写法嗤之以鼻, 由于product_count是放在for category in categorys:里面的, 这意味着若是categorys有成千上万个, 就要发出成千上万个session.query(), 而数据库请求是在网络上的消耗, 请求时间相对较长, 有的数据库没有处理好链接池, 创建链接和断开链接又是一笔巨大的开销, 因此 query 的请求应该越少越好. 像上面这样把 query 放到 for 循环中显然是不明智的选择.
因而有了下面一个请求的版本:session

result = db.session.query(Product, Category) \
    .filter(Product.category_id == Category.id)\
    .order_by(Category.id).all()
id_list = []
data = []
for product, category in result:
    if category and product:
        if category.id not in id_list:
            id_list.append(category.id)
            data.append({
                'id': category.id,
                'name': category.name,
                'product_count': 0
            })
        idx = id_list.index(category.id)
        data[idx]['product_count'] += 1

这样的写法十分难看, 并且一样没有合理利用 SQLAlchemy 的 count 函数. 因而改为:app

product_count = func.count(Product.id).label('count')
results = session.query(Category, product_count) \
    .join(Product, Product.category_id == Category.id) \
    .group_by(Category).all()
data = [
    {
        'id': category.id,
        'name': category.name,
        'product_count': porduct_count
    } for category, product_count in results]

不过这里还有一个问题, 就是若是先添加一个Category, 而属于这个Category下没有Product, 那么这个Category就不会出如今data里面, 因此join必须改为outerjoin. 即:函数

results = session.query(Category, product_count) \
    .outerjoin(Product, Product.category_id == Category.id) \
    .group_by(Category).all()

需求又来了!!!
如今考虑设计Product为伪删除模式, 即添加一个is_deleted属性判断Product是否被删除.
那么count函数就不能简单地count(Product.id), 而是要同时判断Product.is_deleted是否为真和Product是否为None, 通过悉心研究, 发现使用func.nullif能够实现这个需求,即用下面的写法:性能

product_count = func.count(func.nullif(Product.is_deleted.is_(False), False)).label('count')
results = session.query(Category, product_count) \
    .join(Product, Product.category_id == Category.id) \
    .group_by(Category).all()
data = [
    {
        'id': category.id,
        'name': category.name,
        'product_count': porduct_count
    } for category, product_count in results]

可见使用 ORM 有的时候仍是须要考虑不少东西.scala

相关文章
相关标签/搜索