import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',]) cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'}) obj = cursor.fetchone() conn.commit() cursor.close() conn.close() print(obj)
setting.pyhtml
from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis" PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db', charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass
utils/sql.pyhtml5
import pymysql from settings import Config class SQLHelper(object): @staticmethod def open(cursor): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
使用:python
obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)
安装:pip3 install wtformsmysql
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据经过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html正则表达式
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登陆</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() #如何自定义过滤类?看源码模仿 ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜爱', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有全部的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 再也不继续后续验证 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据经过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run() Copy<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>