FLask插件

Flask插件

flask-session

  • 下载html

    pip install Flask-session
  • 导入python

    from flask_session import Session
  • 实例化sessionmysql

    在__init__文件中redis

    def create_app():
      app = Flask(__name__)
      Session(app)
      return app
  • 配置文件sql

    SESSION_TYPE = 'redis'
  • 用法和内置session同样数据库

  • 实现原理django

    • 内置的session调用 session_interface = xxxx
      • xxxx.open_session 解密cookie转化成字典给session
      • xxxx.save_session 将session加密给cookie
    • Flask-session 修改session_interface 制定的类
      • 来改变session存储的位置

SQLALchemy

建立链接

from sqlalchemy import create_engine
conn = create_engine(
    # 'mysql+pymysql://rout用户:密码@链接地址:端口号/数据库名?charset=编码方式'
    'mysql+pymysql://root:123@127.0.0.1:3306/day103?charset=ustf8',
    max_overflow = 0,   # 超过链接池大小外最多建立的链接数
    pool_size = 5,  # 链接池大小
    pool_timeout=30,    # 链接池中没有线程最多等待时间,不然报错
    pool_recycle=-1,    # 多久以后对链接池中的链接进行回收(重置), -1不回收
    
)

如何建立表

  • 导入flask

    from sqlalchemy.ext.declarative import declarative_base
    Base = declarative_base()
  • 单表建立安全

    class Book(Base):
       __tablename__ = "book"
    
       id = Column(Integer, primary_key=True)
       title = Column(String(32), nullable=False,index=True)
    
       def __repr__(self):
          return self.title
    
       __table_args__ = (
          # 联合惟一
          UniqueConstraint("id", "title", name="uni_id_title"),
          # 联合索引
          Index("id", "title")
       )
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime
    from sqlalchemy import Index, UniqueConstraint
    import datetime
    # 链接数据库
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    # 实例化
    Base = declarative_base()
    
    # 建立单表,继承Base
    class UserInfo(Base):
        __tablename__ = "user_info"
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        email = Column(String(32), unique=True)
        create_time = Column(DateTime, default=datetime.datetime.now)
    # 相似与django中的class Meta, 针对于本表
        __table_args__ = (
            # 设置联合惟一
            UniqueConstraint("id", "name", name="uni_id_name"),
            # 设置联合索引
            Index("name", "email")
        )
    
    # 函数,方便建立表
    def create_db():
        Base.metadata.create_all(ENGINE)
    
    # 方便删除
    def drop_db():
        Base.metadata.drop_all(ENGINE)
    
    
    
    if __name__ == '__main__':
        # 执行
        create_db()
  • 一对多表建立cookie

    class Book(Base):
       __tablename__ = "book"
    
       id = Column(Integer, primary_key=True)
       title = Column(String(32), nullable=False)
       publisher_id = Column(Integer, ForeignKey("publisher.id"))
       # 不生成字段创建关系 方便操做
       # 一对多
       publisher = relationship("Publisher", backref="books")
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime
    from sqlalchemy import Index, UniqueConstraint, ForeignKey
    from sqlalchemy.orm import relationship
    import datetime
    
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Base = declarative_base()
    
    
    # ======一对多示例=======
    class UserInfo(Base):
        __tablename__ = "user_info"
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        email = Column(String(32), unique=True)
        create_time = Column(DateTime, default=datetime.datetime.now)
        # FK字段的创建
        hobby_id = Column(Integer, ForeignKey("hobby.id"))
        # 不生成表结构 方便查询使用
        hobby = relationship("Hobby", backref="user")
    
        __table_args__ = (
            UniqueConstraint("id", "name", name="uni_id_name"),
            Index("name", "email")
        )
    
    
    class Hobby(Base):
        __tablename__ = "hobby"
    
        id = Column(Integer, primary_key=True)
        title = Column(String(32), default="码代码")
    
    
    
    
    def create_db():
        Base.metadata.create_all(ENGINE)
    
    
    def drop_db():
        Base.metadata.drop_all(ENGINE)
    
    
    
    if __name__ == '__main__':
        create_db()
        # drop_db()
  • 多对对建立

    • 第三张表本身生成

      class Book(Base):
         __tablename__ = "book"
      
         id = Column(Integer, primary_key=True)
         title = Column(String(32), nullable=False)
         publisher_id = Column(Integer, ForeignKey("publisher.id"))
         # 不生成字段创建关系 方便操做
         # 一对多
         publisher = relationship("Publisher", backref="books")
         # 多对多
         tags = relationship("Tag", secondary="book2tag", backref="books")
      class Book2Tag(Base):
         __tablename__ = "book2tag"
      
         id = Column(Integer, primary_key=True)
         book_id = Column(Integer, ForeignKey("book.id"))
         tag_id = Column(Integer, ForeignKey("tag.id"))
      from sqlalchemy import create_engine
      from sqlalchemy.ext.declarative import declarative_base
      from sqlalchemy import Column, Integer, String, DateTime
      from sqlalchemy import Index, UniqueConstraint, ForeignKey
      from sqlalchemy.orm import relationship
      import datetime
      
      
      ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
      
      Base = declarative_base()
      
      
      # ======多对多示例=======
      class Book(Base):
          __tablename__ = "book"
      
          id = Column(Integer, primary_key=True)
          title = Column(String(32))
          # 不生成表字段 仅用于查询方便
          tags = relationship("Tag", secondary="book2tag", backref="books")
      
      
      class Tag(Base):
          __tablename__ = "tag"
      
          id = Column(Integer, primary_key=True)
          title = Column(String(32))
      
      
      class Book2Tag(Base):
          __tablename__ = "book2tag"
      
          id = Column(Integer, primary_key=True)
          book_id = Column(Integer, ForeignKey("book.id"))
          tag_id = Column(Integer, ForeignKey("tag.id"))
      
      
      def create_db():
          Base.metadata.create_all(ENGINE)
      
      def drop_db():
          Base.metadata.drop_all(ENGINE)
      
      if __name__ == '__main__':
          create_db()
          # drop_db()
    • 建立表命令

      Base.metadata.create_all(conn) # conn是链接池对象

对数据库表的操做(增删改查)

  • 建立管理器

    from sqlalchemy.orm import sessionmaker, scoped_session
    Session = sessionmaker(bind=conn)
    # 线程安全 根本本地线程会使用一个session
    session = scoped_session(Session)
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    from models_demo import Tag
    
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Session = sessionmaker(bind=ENGINE)
    
    # 每次执行数据库操做的时候,都须要建立一个session
    
    # 线程安全,基于本地线程实现每一个线程用同一个session
    
    
    session = scoped_session(Session)
    
    # =======执行ORM操做==========
    tag_obj = Tag(title="SQLAlchemy")
    # 添加
    session.add(tag_obj)
    # 提交
    session.commit()
    # 关闭session
    session.close()
  • 基本增删改查

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    from models_demo import Tag, UserInfo
    import threading
    
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Session = sessionmaker(bind=ENGINE)
    
    # 每次执行数据库操做的时候,都须要建立一个session
    session = Session()
    session = scoped_session(Session)
    
    # ============添加============
    # tag_obj = Tag(title="SQLAlchemy")
    # # 添加
    # session.add(tag_obj)
    # session.add_all([
    #     Tag(title="Python"),
    #     Tag(title="Django"),
    # ])
    # # 提交
    # session.commit()
    # # 关闭session
    # session.close()
    
    # ============基础查询============
    # ret1 = session.query(Tag).all()
    # ret2 = session.query(Tag).filter(Tag.title == "Python").all()
    # ret3 = session.query(Tag).filter_by(title="Python").all()
    # ret4 = session.query(Tag).filter_by(title="Python").first()
    # print(ret1, ret2, ret3, ret4)
    
    # ============删除===========
    # session.query(Tag).filter_by(id=1).delete()
    # session.commit()
    
    # ===========修改===========
    session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"})
    session.query(Tag).filter_by(id=23).update({"title": "王者毒药"})
    session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False)
    # synchronize_session="evaluate" 默认值进行数字加减
    session.commit()
  • 经常使用操做

    # 条件查询
    ret1 = session.query(Tag).filter_by(id=22).first()
    ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all()
    ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all()
    ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first()
    from sqlalchemy import and_, or_
    ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first()
    ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first()
    ret7 = session.query(Tag).filter(or_(
        Tag.id>1,
        and_(Tag.id>3, Tag.title=="LOL")
    )).all()
    # 通配符
    ret8 = session.query(Tag).filter(Tag.title.like("L%")).all()
    ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all()
    # 限制
    ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2]
    # 排序
    ret11 = session.query(Tag).order_by(Tag.id.desc()).all()  # 倒序
    ret12 = session.query(Tag).order_by(Tag.id.asc()).all()  # 正序
    # 分组
    ret13 = session.query(Tag.test).group_by(Tag.test).all()
    # 聚合函数
    from sqlalchemy.sql import func
    ret14 = session.query(
        func.max(Tag.id),
        func.sum(Tag.test),
        func.min(Tag.id)
    ).group_by(Tag.title).having(func.max(Tag.id > 22)).all()
    # 连表
    ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()
    # print(ret15) 获得一个列表套元组 元组里是两个对象
    ret16 = session.query(UserInfo).join(Hobby).all()
    # print(ret16) 获得列表里面是前一个对象
    # 至关于inner join
    # for i in ret16:
    #     # print(i[0].name, i[1].title)
    #     print(i.hobby.title)
    ret17 = session.query(Hobby).join(UserInfo, isouter=True).all()
    ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()
    ret18 = session.query(Hobby).outerjoin(UserInfo).all()
    ret18_1 = session.query(UserInfo).outerjoin(Hobby).all()
    # 至关于left join
    print(ret17)
    print(ret17_1)
    print(ret18)
    print(ret18_1)
  • 基于relationship的Fk

    # 基于relationship的FK
    # 添加
    user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇"))
    session.add(user_obj)
    
    hobby = Hobby(title="弹奏一曲")
    hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")]
    session.add(hobby)
    session.commit()
    
    # 基于relationship的正向查询
    user_obj_1 = session.query(UserInfo).first()
    print(user_obj_1.name)
    print(user_obj_1.hobby.title)
    
    # 基于relationship的反向查询
    hb = session.query(Hobby).first()
    print(hb.title)
    for i in hb.user:
        print(i.name)
    
    session.close()
  • 基于relationship的M2M

    # 添加
    book_obj = Book(title="Python源码剖析")
    tag_obj = Tag(title="Python")
    b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id)
    session.add_all([
        book_obj,
        tag_obj,
        b2t,
    ])
    session.commit()
    
    #  上面有坑哦~~~~
    book = Book(title="测试")
    book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")]
    session.add(book)
    session.commit()
    
    tag = Tag(title="LOL")
    tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")]
    session.add(tag)
    session.commit()
    
    # 基于relationship的正向查询
    book_obj = session.query(Book).filter_by(id=4).first()
    print(book_obj.title)
    print(book_obj.tags)
    # 基于relationship的反向查询
    tag_obj = session.query(Tag).first()
    print(tag_obj.title)
    print(tag_obj.books)

Flask_Script

  • 下载

    pip install flask-script
  • 导入

    from flask_demo import create_app
    from flask_script import Manager
  • 实例化

    app = create_app()
    # 实例化
    manager = Manager(app)
    
    if __name__ == '__main__':
       # app.run()
       manager.run()
  • 自定义的命令

    1. 位置传参

      @manager.command
         def mycommad(arg):
              prin(arg)
          # python manange.py my_command 123
    2. 关键字

      @manager.option('-n','--name', dest='name')
      @manager.option('-u','--url', dest='url')
      def cmd(name, url):
         print(name, url)
      # python manager.py cmd -n xiatian -u www.xxx

      详细用法:https://www.cnblogs.com/buyisan/p/8270283.html

Flask-Migrate

  • !!! 依赖flask-script

  • 下载

    pip install flask-migrate
  • 导入

    from flask_Migrate import Migrate, MigrateCommand
  • 实例化

    Migrate(app)
    
    manager.add_command('db', MigrateCommand)
    """
      python manager.py db init
      python manager.py db migrate # 类型于makemigrations
      python manager.py db uprade  # migrate
    """
师博客:

https://www.cnblogs.com/GGGG-XXXX/articles/9447619.html