按书上案例来的。mysql
#coding=utf-8 from datetime import datetime from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String, Boolean, DateTime, ForeignKey, create_engine, CheckConstraint) from sqlalchemy import (insert, select, update, delete, text, desc, cast, and_, or_, not_) from sqlalchemy import (Table, ForeignKeyConstraint) from sqlalchemy.sql import (func, cast, ) from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import (relationship, backref, sessionmaker) Base = declarative_base() class Cookie(Base): __tablename__ = 'cookies' cookie_id = Column(Integer(), primary_key=True) cookie_name = Column(String(50), index=True) cookie_recipe_url = Column(String(255)) cookie_sku = Column(String(55)) quantity = Column(Integer()) unit_cost = Column(Numeric(12, 2)) def __repr__(self): return "Cookie(cookie_name='{self.cookie_name}', " \ "cookie_recipe_url='{self.cookie_recipe_url}', " \ "cookie_sku='{self.cookie_sku}', " \ "quantity={self.quantity}, " \ "unit_cost={self.unit_cost})".format(self=self) class User(Base): __tablename__ = 'users' user_id = Column(Integer(), primary_key=True) username = Column(String(15), nullable=False, unique=True) email_address = Column(String(255), nullable=False) phone = Column(String(20), nullable=False) password = Column(String(25), nullable=False) created_on = Column(DateTime(), default=datetime.now) updated_on = Column(DateTime(), default=datetime.now, onupdate=datetime.now) def __repr__(self): return "User(username='{self.username}', " \ "email_address='{self.email_address}', " \ "phone='{self.phone}', " \ "password='{self.password}')".format(self=self) class Order(Base): __tablename__ = 'orders' order_id = Column(Integer(), primary_key=True) user_id = Column(Integer(), ForeignKey('users.user_id')) shipped = Column(Boolean(), default=False) user = relationship("User", backref=backref('orders', order_by=order_id)) def __repr__(self): return "Order(user_id={self.user_id}, " \ "shipped={self.shipped})".format(self=self) class LineItem(Base): __tablename__ = 'line_items' line_item_id = Column(Integer(), primary_key=True) order_id = Column(Integer(), ForeignKey('orders.order_id')) cookie_id = Column(Integer(), ForeignKey('cookies.cookie_id')) quantity = Column(Integer()) extended_cost = Column(Numeric(12, 2)) order = relationship("Order", backref=backref('line_items', order_by=line_item_id)) cookie = relationship("Cookie", uselist=False) def __repr__(self): return "LineItems(order_id={self.order_id}, " \ "cookie_id={self.cookie_id}, " \ "quantity={self.quantity}, " \ "extended_cost={self.extended_cost})".format(self=self) class Employee(Base): __tablename__ = 'employees' id = Column(Integer(), primary_key=True) manager_id = Column(Integer(), ForeignKey('employees.id')) name = Column(String(255), nullable=False) manager = relationship("Employee", backref=backref('reports'), remote_side=[id]) engine = create_engine('mysql+pymysql://xxx') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() ''' query = session.query(Cookie) cc_cookie = query.filter(Cookie.cookie_name == "chocolate chip").first() cc_cookie.quantity = cc_cookie.quantity + 120 session.commit() print(cc_cookie.quantity) query = session.query(Cookie) query = query.filter(Cookie.cookie_name == "chocolate chip") query.update({Cookie.quantity: Cookie.quantity - 20}) cc_cookie = query.first() print(cc_cookie.quantity) query = session.query(Cookie) query = query.filter(Cookie.cookie_name == "oatmeal raisin") dcc_cookie = query.one() session.delete(dcc_cookie) session.commit() dcc_cookie = query.first() print(dcc_cookie) query = session.query(Cookie) query = query.filter(Cookie.cookie_name == "peanut butter") query.delete() pea_cookie = query.first() print(pea_cookie) cookiemon = session.query(User).filter(User.username=="cookiemon").one() cakeeater = session.query(User).filter(User.username=="cakeeater").one() pieperson = session.query(User).filter(User.username=="pieperson").one() o1 = Order() o1.user = cookiemon session.add(o1) cc = session.query(Cookie).filter(Cookie.cookie_name=="chocolate chip").first() line1 = LineItem(cookie=cc, quantity=2, extended_cost=1.00) pb = session.query(Cookie).filter(Cookie.cookie_name=="molasses").first() line2 = LineItem(quantity=12, extended_cost=3.00) line2.cookie = pb line2.order = o1 o1.line_items.append(line1) o1.line_items.append(line2) session.commit() o2 = Order() o2.user = cakeeater cc = session.query(Cookie).filter(Cookie.cookie_name =="chocolate chip").first() line1 = LineItem(cookie=cc, quantity=24, extended_cost=12.00) oat = session.query(Cookie).filter(Cookie.cookie_name =="dark chocolate chip").first() line2 = LineItem(cookie=oat, quantity=6, extended_cost=6.00) o2.line_items.append(line1) o2.line_items.append(line2) session.add(o2) session.commit() query = session.query(Order.order_id, User.username, User.phone, Cookie.cookie_name, LineItem.quantity, LineItem.extended_cost) query = query.join(User).join(LineItem).join(Cookie) results = query.filter(User.username=='cookiemon').all() print(results) query = session.query(User.username, func.count(Order.order_id)) query = query.outerjoin(Order).group_by(User.username) for row in query: print(row) marsha = Employee(name='Marsha') fred = Employee(name='Fred') marsha.reports.append(fred) session.add(marsha) session.commit() for report in marsha.reports: print(report.name) query = session.query(User.username, func.count(Order.order_id)) query = query.outerjoin(Order).group_by(User.username) for row in query: print(row) def get_orders_by_customer(cust_name, shipped=None, details=False): query = session.query(Order.order_id, User.username, User.phone) query = query.join(User) if details: query = query.add_columns(Cookie.cookie_name, LineItem.quantity, LineItem.extended_cost) query = query.join(LineItem).join(Cookie) if shipped is not None: pass # query = query.where(Order.shipped == shipped) results = query.filter(User.username == cust_name).all() return results print get_orders_by_customer('cakeeater') print get_orders_by_customer('cakeeater', details=True) print get_orders_by_customer('cakeeater', shipped=True) print get_orders_by_customer('cakeeater', shipped=False) print get_orders_by_customer('cakeeater', shipped=False, details=True) query = session.query(User).filter(text("username='cookiemon'")) print(query.all()) '''