用于实现相似于django中 python3 manage.py runserver ...相似的命令html
安装:pip3 install flask-scriptpython
from flask_script import Manager app = Flask(__name__) manager=Manager(app) ... if __name__ == '__main__': manager.run() #之后在执行,直接:python3 manage.py runserver #python3 manage.py runserver --help
@manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) @manager.option('-n', '--name', dest='name') #@manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令(-n也能够写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) #有什么用?
SQLAlchemy是一个基于Python实现的ORM框架。该框架创建在 DB API之上,使用关系对象映射进行数据库操做,简言之即是:将类和对象转换成SQL,而后使用数据API执行SQL并获取执行结果。mysql
pip3 install sqlalchemy
组成部分:sql
Engine,框架的引擎 Connection Pooling ,数据库链接池 Dialect,选择链接数据库的DB API种类 Schema/Types,架构和类型 SQL Exprression Language,SQL表达式语言
SQLAlchemy自己没法操做数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不一样调用不一样的数据库API,从而实现对数据库的操做,如:数据库
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
django中如何反向生成modelsdjango
python manage.py inspectdb > app/models.py
修改表:在数据库添加字段,类对应上flask
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, # 超过链接池大小外最多建立的链接 pool_size=5, # 链接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,不然报错 pool_recycle=-1 # 多久以后对线程池中的线程进行一次链接的回收(重置) ) conn = engine.raw_connection() cursor = conn.cursor() cursor.execute("select * from app01_book") result = cursor.fetchall() print(result) cursor.close() conn.close()
models.py安全
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() #继承的模型 相似于django的Model class Users(Base): __tablename__ = 'users' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 # email = Column(String(32), unique=True) # ctime = Column(DateTime, default=datetime.datetime.now) # extra = Column(Text, nullable=True) __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), #联合惟一 # Index('ix_id_name', 'name', 'email'), #索引 ) def init_db(): """ 根据类建立数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过链接池大小外最多建立的链接 pool_size=5, # 链接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,不然报错 pool_recycle=-1 # 多久以后对线程池中的线程进行一次链接的回收(重置) ) Base.metadata.create_all(engine) # Base.metadata.drop_all(engine) 显而易见 是删除数据库 if __name__ == '__main__': # drop_db() init_db()
app.pysession
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users #"mysql+pymysql://root@127.0.0.1:3306/aaa" engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操做时,都须要建立一个Connection con = Connection() # ############# 执行ORM操做 ############# obj1 = Users(name="lqz") con.add(obj1) # 提交事务 con.commit() # 关闭session,实际上是将链接放回链接池 con.close()
Copyclass Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # hobby指的是tablename而不是类名,uselist=False hobby_id = Column(Integer, ForeignKey("hobby.id")) # 跟数据库无关,不会新增字段,只用于快速链表操做 # 类名,backref用于反向查询 hobby=relationship('Hobby',backref='pers')
class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便,放在哪一个单表中均可以 servers = relationship('Girl', secondary='boy2girl', backref='boys')
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操做时,都须要建立一个session session = Session() # ############# 执行ORM操做 ############# obj1 = Users(name="lqz") session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
Copyfrom sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) """ # 线程安全,基于本地线程实现每一个线程用同一个session # 特殊的:scoped_session中有原来方法的Session中的一下方法: public_methods = ( '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 'bulk_update_mappings', 'merge', 'query', 'refresh', 'rollback', 'scalar' ) """ #scoped_session类并无继承Session,可是却又它的全部方法 session = scoped_session(Session) # ############# 执行ORM操做 ############# obj1 = Users(name="alex1") session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() ----------------1.增----------------- obj1 = Users(name="111") session.add(obj1) session.add_all([ Users(name="111"), Users(name="222"), Hosts(name="333"), ]) session.commit() ----------------2.删除----------------- session.query(Users).filter(Users.id > 2).delete() session.commit() ----------------3.修改----------------- #传字典 1. session.query(Users).filter(Users.id > 0).update({"name" : "lqz"}) 2. session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # 字符串 相似于django的F查询 3. session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") #数字 session.commit() ----------------4.查----------------- 1.r1 = session.query(Users).all() 2. r2 = session.query(Users.name.label('xx'), Users.age).all() #只取age列,把name 重命名为xx 3.#filter传的是表达式,filter_by传的是参数 r3 = session.query(Users).filter(Users.name == "lqz").all() r4 = session.query(Users).filter_by(name='lqz').all() r5 = session.query(Users).filter_by(name='lqz').first() 4.#:value 和:name 至关于占位符,用params传参数 r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() 5.#自定义查询sql r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all() #增,删,改都要commit() session.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import User,Person,Hobby from sqlalchemy.sql import text engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session=Session() # 1 查询名字为lqz的全部user对象 # ret = session.query(User).filter_by(name='ccc099').all() # 2 表达式,and条件链接 # ret = session.query(User).filter(User.id > 1, User.name == 'egon').all() # 查找id在1和10之间,而且name=egon的对象 # ret = session.query(User).filter(User.id.between(1, 10), User.name == 'egon').all() # in条件(class_,由于这是关键字,不能直接用) # ret = session.query(User).filter(User.id.in_([1,3,4])).all() # 取反 ~ ret = session.query(User).filter(~User.id.in_([1,3,4])).all() #二次筛选 # select * # ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all() # # select name,id 。。。。 # ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all() ''' SELECT users.id AS users_id, users.name AS users_name FROM users WHERE users.id IN (SELECT users.id AS users_id FROM users WHERE users.name = %(name_1)s) ''' # from sqlalchemy import and_, or_ #or_包裹的都是or条件,and_包裹的都是and条件 #查询id>3而且name=egon的人 # ret = session.query(User).filter(and_(User.id > 3, User.name == 'egon')).all() # 查询id大于2或者name=ccc099的数据 # ret = session.query(User).filter(or_(User.id > 2, User.name == 'ccc099')).all() # ret = session.query(User).filter( # or_( # User.id < 2, # and_(User.name == 'egon', User.id > 3), # User.extra != "" # )).all() # print(ret) ''' select *from user where id<2 or (name=egon and id >3) or extra !='' ''' # 通配符,以e开头,不以e开头 # ret = session.query(User).filter(User.name.like('e%')).all() # ret = session.query(User).filter(~User.name.like('e%')).all() like('S_') 匹配一个 % 多个 # 限制,用于分页,区间 limit # 前闭后开区间,1能取到,3取不到 ret = session.query(User)[1:3] ''' select * from users limit 1,2; ''' # 排序,根据name降序排列(从大到小) # ret = session.query(User).order_by(User.name.desc()).all() # ret = session.query(User).order_by(User.name.asc()).all() #第一个条件降序排序后,再按第二个条件升序排 # ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all() # ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all() # 分组 from sqlalchemy.sql import func # ret = session.query(User).group_by(User.name).all() #分组以后取最大id,id之和,最小id # sql 分组以后,要查询的字段只能有分组字段和聚合函数 # ret = session.query( # func.max(User.id), # func.sum(User.id), # func.min(User.id), # User.name).group_by(User.name).all() # ''' # select max(id),sum(id),min(id) from user group by name; # # ''' # for obj in ret: # print(obj[0],'----',obj[1],'-----',obj[2],'-----',obj[3]) # print(ret) #haviing筛选 # ret = session.query( # func.max(User.id), # func.sum(User.id), # func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all() ''' select max(id),sum(id),min(id) from user group by name having min(id)>2; ''' print(ret) session.commit() session.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import User,Person,Hobby,Boy,Girl,Boy2Girl from sqlalchemy.sql import text engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session=Session() ### 1 一对多插入数据 obj=Hobby(caption='足球') session.add(obj) p=Person(name='张三',hobby_id=2) session.add(p) ### 2 方式二(默认状况传对象有问题) ###### Person表中要加 hobby = relationship('Hobby', backref='pers') p=Person(name='李四',hobby=Hobby(caption='美女')) 等同于 p=Person(name='李四2') p.hobby=Hobby(caption='美女2') session.add(p) ## 3 方式三,经过反向操做 hb = Hobby(caption='人妖') hb.pers = [Person(name='文飞'), Person(name='博雅')] session.add(hb) 4 查询(查询:基于连表的查询,基于对象的跨表查询) ### 4.1 基于对象的跨表查询(子查询,两次查询) 正查 p=session.query(Person).filter_by(name='张三').first() print(p) print(p.hobby.caption) # 反查 h=session.query(Hobby).filter_by(caption='人妖').first() print(h.pers) ### 4.2 基于连表的跨表查(查一次) 默认根据外键连表 isouter=True 左外连,表示Person left join Hobby,没有右链接,反过来便可 # 不写 inner join person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all() print(person_list) print(person_list) for row in person_list: print(row[0].name,row[1].caption) ''' select * from person left join hobby on person.hobby_id=hobby.id ''' ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id) print(ret) ''' select * from user,hobby where user.id=favor.nid; ''' #join表,默认是inner join # ret = session.query(Person).join(Hobby) # # ret = session.query(Hobby).join(Person,isouter=True) # ''' # SELECT * # FROM person INNER JOIN hobby ON hobby.id = person.hobby_id # ''' # print(ret) # 指定连表字段(历来没用过) ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True) ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all() # print(ret) ''' SELECT * FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id ''' # print(ret) # 组合(了解)UNION 操做符用于合并两个或多个 SELECT 语句的结果集 union和union all的区别? q1 = session.query(User.name).filter(User.id > 2) # 6条数据 q2 = session.query(User.name).filter(User.id < 8) # 2条数据 q1 = session.query(User.id,User.name).filter(User.id > 2) # 6条数据 q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据 ret = q1.union_all(q2).all() ret1 = q1.union(q2).all() print(ret) print(ret1) q1 = session.query(User.name).filter(User.id > 2) q2 = session.query(Hobby.caption).filter(Hobby.nid < 2) ret = q1.union_all(q2).all()
# session.add_all([ # Boy(hostname='霍建华'), # Boy(hostname='胡歌'), # Girl(name='刘亦菲'), # Girl(name='林心如'), # ]) # session.add_all([ # Boy2Girl(girl_id=1, boy_id=1), # Boy2Girl(girl_id=2, boy_id=1) # ]) ##### 要有girls = relationship('Girl', secondary='boy2girl', backref='boys') # girl = Girl(name='张娜拉') # girl.boys = [Boy(hostname='张铁林'),Boy(hostname='费玉清')] # session.add(girl) # boy=Boy(hostname='蔡徐坤') # boy.girls=[Girl(name='谢娜'),Girl(name='巧碧螺')] # session.add(boy) # session.commit() # 基于对象的跨表查 # girl=session.query(Girl).filter_by(id=3).first() # print(girl.boys) #### 基于连表的跨表查询 # 查询蔡徐坤约过的全部妹子 ''' select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name='蔡徐坤' ''' # ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname=='蔡徐坤').all() ''' select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname='蔡徐坤' ''' # ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=='蔡徐坤').all() ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname='蔡徐坤').all() print(ret) ### 执行原生sql(用的最多的) ### django中orm如何执行原生sql # # cursor = session.execute('insert into users(name) values(:value)',params={"value":'xxx'}) # print(cursor.lastrowid) # session.commit() session.close() res = session.squery(User.name.label('xx')).first() res.xx #label 至关于起别名
import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text, func from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 关联子查询:correlate(Group)表示跟Group表作关联,as_scalar至关于对该sql加括号,用于放在后面当子查询 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ select id,name, (select avr(score) from 成绩表 where 成绩表.sid=学生表.id) as x from 学生表 subqry = session.query(func.count(成绩表.scort).label("sc")).filter(学生表.id == 成绩表.sid).correlate(学生表).as_scalar() result = session.query(学生表.name, subqry) session.close()
1 Flask-SQLAlchemy 2 flask-migrate -python3 manage.py db init 初始化:只执行一次 -python3 manage.py db migrate 等同于 makemigartions -python3 manage.py db upgrade 等同于migrate 3 看代码 4 Flask-SQLAlchemy如何使用 1 from flask_sqlalchemy import SQLAlchemy 2 db = SQLAlchemy() 3 db.init_app(app) 4 之后在视图函数中使用 -db.session 就是我们讲的session 5 flask-migrate的使用(表建立,字段修改) 1 from flask_migrate import Migrate,MigrateCommand 2 Migrate(app,db) 3 manager.add_command('db', MigrateCommand) 6 直接使用 -python3 manage.py db init 初始化:只执行一次,建立migrations文件夹 -python3 manage.py db migrate 等同于 makemigartions -python3 manage.py db upgrade 等同于migrate