DBUtils是Python的一个用于实现数据库链接池的模块。
详情参考官方文档html
安装: pip install flask-sessionhtml5
使用方法:先导入 from flask_session import Sessionmysql
建立一个flask的app,app=Flask(__name__) 将app传入Session便可 Session(app) web
或者先实例化一个session对象 sess = Session() sess.init_app(app)redis
实例化时,将app对象,传入,而且覆盖了本来flask中的session_interface sql
而后在这个_get_interface中又作了分类。咱们只须要在配置文件中配置上链接的类型,便可使用对应的session数据库
好比:app.config["SESSION_TYPE"] = "redis" 就会将本来的session覆盖成redis的session。django
而后,还要有redis的链接,这个覆盖了session的类为RedisSessionInterfacejson
这个类在实例化的时候,能够传入四个参数,config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],flask
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']
这四个参数分别为redis对象,redis中全部的keys加一个前缀,是否使用签名对session进行加密,默认为False,最后一个参数为是否使用持久化。
因此咱们链接redis就须要设置配置文件 app.config["SESSION_REDIS"] = redis对象 便可
小结:flask中使用flask-session
from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port=6379) Session(app)
通常状况下,咱们会把配置添加到配置文件settings中
from redis import Redis class Config(object): DEBUG = False SECRET_KEY = "asdfasdfasdf" SESSION_TYPE = 'redis' SESSION_REDIS = Redis(host='127.0.0.1', port=6379) class ProductionConfig(Config): DEBUG = False class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True
flask-session的请求流程:请求进来后,在到达视图函数以前,先执行flask-session中的open_session方法,这个方法中作的事情:先去用户的cookie中获取随机字符串,若是没有,表示请求是第一次进来,就生成一个随机字符串sid(str(uuid.uuid4()))而且生成一个特殊的字典,将这个sid放到特殊字典中,而后返回这个特殊的字典,接着执行视图函数,这是,视图函数能够向这个特殊的字典中写入session,在返回响应时,会先执行flask-session中的save_session方法,这个方法中,将这个特殊的字典,先转化为字典并序列化为一个json字符串 json.dumps(dict(特殊字典)) ,接着将这个值存入redis中 redis.setex(key,val) ,又将这个特殊字典中的sid(随机字符串) 设置为cookie返回给浏览器 response.set_cookie(sid),下次用户在请求进来,先判断有无这个随机字符串,有,就从redis中取出对应的特殊字典,而后就能够从这个特殊字典中获取设置的session。
咱们使用偏移MySQL时,每次对数据库的操做就须要链接数据库,表的增删改查,而后关闭数据库,这样每操做一次,就链接关闭,就会形成性能上的问题。或许会想到使用单例,链接一次,就能够一直使用,这样单线程应用彻底没有问题,但若是涉及到多线程应用那么就须要加锁,一旦加锁那么链接势必就会排队等待,当请求比较多时,性能就会下降了。
import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
备注:不加锁,多线程下会报错。
DBUtils是Python的一个用于实现数据库链接池的模块。
安装: pip install DBUtils
使用:此链接池有两种链接模式:
from DBUtils import PersistentDB POOL = PersistentDB( creator=pymysql, # 使用连接数据库的模块 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 若是为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭连接。若是为True时, conn.close()则关闭连接,那么再次调用pool.connection时就会报错,由于已经真的关闭了链接(pool.steady_connection()能够获取一个新的连接) threadlocal=None, # 本线程独享值得对象,用于保存连接对象,若是连接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 检测当前正在运行链接数的是否小于最大连接数,若是不小于则:等待或报raise TooManyConnections异常 # 不然 # 则优先去初始化时建立的连接中获取连接 SteadyDBConnection。 # 而后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 若是最开始建立的连接没有连接,则去建立一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭连接后,链接就返回到链接池让后续线程继续使用。 conn = POOL.connection() # print(th, '连接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
PS: 查看链接 show status like 'Threads%';
import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=20, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=0, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='zhaopanpan', password='qwe123..', database='code', charset='utf8' ) def on_open(cur=pymysql.cursors.DictCursor): conn = POOL.connection() cursor = conn.cursor(cursor=cur) return conn,cursor def on_close(conn,cursor): cursor.close() conn.close() def fetchone(sql,args,cur=pymysql.cursors.DictCursor): """ 获取单条数据 :param sql: SQL语句 select title from book where id=%s; :param args: SQL中若是有须要传的参数,则 args=[参数,参数2] ,不然 args=[] :param cur: 表示返回的值默认是以字典的形式返回,若是设置为[]或None表示以元组的形式返回 :return: """ conn,cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchone() on_close(conn,cursor) return result def fetchall(sql,args,cur=pymysql.cursors.DictCursor): """ 获取多条数据 :param sql: :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchall() on_close(conn, cursor) return result def exec_sql(sql,args,cur=pymysql.cursors.DictCursor): """ 添加/删除/修改 :param sql: insert into table(%s,%s) values(....) :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) conn.commit() on_close(conn, cursor)
WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。django中也可使用,可是django自带
安装:pip install wtforms
1. 用户登陆
当用户登陆时候,须要对用户提交的用户名和密码进行多种格式校验。如:
密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据经过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登陆</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
2. 用户注册
注册页面须要让用户输入:用户名、密码、密码重复、性别、爱好等。
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜爱',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有全部的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 再也不继续后续验证
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1})
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据经过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
备注:对于动态的choices,应该重写其构造方法。
class StudentForm(Form): name = simple.StringField( label='学生姓名', widget=widgets.TextInput(), render_kw={'class': 'form-control', 'placeholder': '请输入学生姓名'}, validators=[validators.DataRequired(message='学生姓名不能为空')] ) class_id = core.SelectField( label='请选择班级', choices=(), coerce=int ) def __init__(self, *args, **kwargs): super(StudentForm, self).__init__(*args, **kwargs) self.class_id.choices = operate.fetchall(sql='select id,name from classes', args=[], cur=None)
能够生成csrf标签
from flask import Flask, render_template, request, redirect, session from wtforms import Form from wtforms.csrf.core import CSRF from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets from hashlib import md5 app = Flask(__name__, template_folder='templates') app.debug = True class MyCSRF(CSRF): """ Generate a CSRF token based on the user's IP. I am probably not very secure, so don't use me. """ def setup_form(self, form): self.csrf_context = form.meta.csrf_context() self.csrf_secret = form.meta.csrf_secret return super(MyCSRF, self).setup_form(form) def generate_csrf_token(self, csrf_token): gid = self.csrf_secret + self.csrf_context token = md5(gid.encode('utf-8')).hexdigest() return token def validate_csrf_token(self, form, field): print(field.data, field.current_token) if field.data != field.current_token: raise ValueError('Invalid CSRF') class TestForm(Form): name = html5.EmailField(label='用户名') pwd = simple.StringField(label='密码') class Meta: # -- CSRF # 是否自动生成CSRF标签 csrf = True # 生成CSRF标签name csrf_field_name = 'csrf_token' # 自动生成标签的值,加密用的csrf_secret csrf_secret = 'xxxxxx' # 自动生成标签的值,加密用的csrf_context csrf_context = lambda x: request.url # 生成和比较csrf标签 csrf_class = MyCSRF # -- i18n # 是否支持本地化 # locales = False locales = ('zh', 'en') # 是否对本地化进行缓存 cache_translations = True # 保存本地化缓存信息的字段 translations_cache = {} @app.route('/index/', methods=['GET', 'POST']) def index(): if request.method == 'GET': form = TestForm() else: form = TestForm(formdata=request.form) if form.validate(): print(form) return render_template('index.html', form=form) if __name__ == '__main__': app.run()
实例化流程分析
# 源码流程 1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中 2. 执行构造方法 a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,而后将返回值添加到 self._fields[name] 中。 即: _fields = { name: wtforms.fields.core.StringField(), } PS:因为字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),建立的是UnboundField(cls, *args, **kwargs),当执行完bind以后,才变成执行 wtforms.fields.core.StringField() b. 循环_fields,为对象设置属性 for name, field in iteritems(self._fields): # Set all the fields to attributes so that they obscure the class # attributes with the same names. setattr(self, name, field) c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs) 优先级:obj,data,formdata; 再循环执行每一个字段的process方法,为每一个字段设置值: for name, field, in iteritems(self._fields): if obj is not None and hasattr(obj, name): field.process(formdata, getattr(obj, name)) elif name in kwargs: field.process(formdata, kwargs[name]) else: field.process(formdata) 执行每一个字段的process方法,为字段的data和字段的raw_data赋值 def process(self, formdata, data=unset_value): self.process_errors = [] if data is unset_value: try: data = self.default() except TypeError: data = self.default self.object_data = data try: self.process_data(data) except ValueError as e: self.process_errors.append(e.args[0]) if formdata: try: if self.name in formdata: self.raw_data = formdata.getlist(self.name) else: self.raw_data = [] self.process_formdata(self.raw_data) except ValueError as e: self.process_errors.append(e.args[0]) try: for filter in self.filters: self.data = filter(self.data) except ValueError as e: self.process_errors.append(e.args[0]) d. 页面上执行print(form.name) 时,打印标签 由于执行了: 字段的 __str__ 方法 字符的 __call__ 方法 self.meta.render_field(self, kwargs) def render_field(self, field, render_kw): other_kw = getattr(field, 'render_kw', None) if other_kw is not None: render_kw = dict(other_kw, **render_kw) return field.widget(field, **render_kw) 执行字段的插件对象的 __call__ 方法,返回标签字符串
钩子函数
校验单个字段,可使用 def validate_字段名(filed) 能够经过 field.data 得到输入的值。
校验全局使用 validate
a. 执行form的validate方法,获取钩子方法 def validate(self): extra = {} for name in self._fields: inline = getattr(self.__class__, 'validate_%s' % name, None) if inline is not None: extra[name] = [inline] return super(Form, self).validate(extra) b. 循环每个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数) def validate(self, extra_validators=None): self._errors = None success = True for name, field in iteritems(self._fields): if extra_validators is not None and name in extra_validators: extra = extra_validators[name] else: extra = tuple() if not field.validate(self, extra): success = False return success c. 每一个字段进行验证时候 字段的pre_validate 【预留的扩展】 字段的_run_validation_chain,对正则和字段的钩子函数进行校验 字段的post_validate【预留的扩展】