Flask-SQLAlchemy是一个Flask扩展,可以支持多种数据库后台,咱们能够不须要关心SQL的处理细节,操做数据库,一个基本关系对应一个类,而一个实体对应类的实例对象。Flask是一个轻量级的web框架,而SQLAlchemy 是转为Flask定制的ORM框架。mysql
安装flask-sqlalchemyweb
pip install flask-sqlalchemy
初始化sqlalchemysql
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # 配置 sqlalchemy 数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://password:@127.0.0.1:3306/db?charset=utf8' # 初始化 db = SQLAlchemy(app)
注意:数据库
SQLALCHEMY_DATABASE_URI 是关键字,默认数据源配置变量。而咱们在实际操做中,不免要同时链接多个数据库,这个时候就要配置多个数据源,而SQLAlchemy提供了很方便的多数据源配置。
只需在配置文件中这样写就能够:
SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main' SQLALCHEMY_BINDS = { 'users': 'mysqldb://localhost/users', 'appmeta': 'sqlite:////path/to/appmeta.db' }
SQLALCHEMY_DATABASE_URI 是默认数据源,users和appmeta是其余数据源,用的时候也很是方便,在model中加入 __bind_key__ = 'users' 便可。
数据库表映射到模型,继承db.model类便可flask
class RoadReportErrorCheck(db.Model): __bind_key__ = 'mdb_plat' __tablename__ = 'road_report_error_check' order_id = db.Column(db.Integer, nullable=False, default=0, primary_key=True) #订单ID p_pid = db.Column(db.String(32), nullable=False, default='') #包ID check_state = db.Column(db.Integer, nullable=False, default=0) #0未审核,1初审,2复审 first_checker = db.Column(db.String(32), nullable=False, default='') #轨迹点初审人 first_check_time = db.Column(db.TIMESTAMP, nullable=False, default=0) #轨迹点初审时间 checker = db.Column(db.String(32), nullable=False, default='') #轨迹点复审人 check_time = db.Column(db.TIMESTAMP, nullable=False, default=0) #轨迹点复审时间 description = db.Column(db.String(200), nullable=False, default='') #轨迹点审核描述 receive_flag = db.Column(db.Integer, nullable=False, default=0) #外包领取,0未领,1已领 create_time = db.Column(db.TIMESTAMP, nullable=False, default=0) update_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now)
只需初始化一个变量,添加到session中便可session
group_info = Group_Check(id=id,package_id=package_id,user=user,upload_time=upload_time,orderid=orderid,group_track_url=group_track_url,xf_links=xf_links,create_time=datetime.datetime.now())
db.session.add(group_info)
db.session。commit()
SQLAlchemy 提供了不少种方便的过滤方法。写法以下:app
#查询单个 Group_Check.query.fliter(Group_Check.id == id).first() #查询全部 Group_Check.query.fliter(Group_Check.id == id).all() #条件查询
Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))#求和Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))