#相似于django wsgiref from werkzeug.wrappers import Request, Response from werkzeug.serving import run_simple #实例一 def run(environ,start_response): return [b"asdfasdf"] if __name__ == '__main__': run_simple('localhost', 4000, run) #请求进来了会加括号执行第三个参数 若是是对象的话 会触发__call__
示例二: from werkzeug.wrappers import Request, Response @Request.application def hello(request): return Response('Hello World!') if __name__ == '__main__': from werkzeug.serving import run_simple run_simple('localhost', 4000, hello)
from flask import Flask app = Flask(__name__) @app.route('/index') def index(): return "Hello World" if __name__ == '__main__': #防止被导入时候也被运行 app.run()
from flask import Flask,render_template app = Flask(__name__,template_folder='temlates') #在这改的模板文件 @app.route('/index') def index(): return render_template("login.html") #默认在temlates下找 ,能够在上面修改template默认文件夹 if __name__ == '__main__': #防止被导入时候也被运行 app.run()
from flask import Flask,render_template,request,redirect,session app = Flask(__name__,template_folder="templates") @app.route('/login',methods=["GET","POST"]) #容许提交的method 默认是Get def login(): if request.method == 'GET': return render_template('login.html') user = request.form.get('user') #这个request必须导才能使用(上下文管理)form 是post 数据 pwd = request.form.get('pwd') #get 数据在request.args if user == 'oldboy' and pwd == '666': session['user'] = user return redirect('/index') #重跳转 return render_template('login.html',error='用户名或密码错误') #后端能够拿到这个error {{error}} # return render_template('login.html',**{"error":'用户名或密码错误'}) @app.route('/index') def index(): user = session.get('user') if not user: return redirect('/login') return render_template('index.html') if __name__ == '__main__': app.run()
{{ error}} 传递的时候return error='sss' **{error:'ss'} #django 会自动加()
<img src='ssss/tupia.png'> #就算有这个sss 也找不到 sss是前缀 static_url_path='/ssss' #能够修改
app=Flask(static_folder='static') #默认是static下找图片 而图片要的是前缀是ssss的
from flask import Flask,session
session["user"]=user session["pwd"]=pwd 往cookie里存 session实际上加密的cookie里 app.secret_key='盐' #加盐
给你一个路径 字符串的 能够找到类和类的静态字段
实例 略#html
cookie知道Key 吗? 咱们不知道 实际上是内部作了 因此咱们能session["key"]=vhtml5
app.config #在这里
怎么修改呢?
app.config.from_object('配置文件路径') #好比到线上的配置就是Propython
class Dev(): #表示开发环境 kEY=VALUES #小写的没用
DEVUG=Flase
class Pro(): #表示线上环境 kEY=VALUES #小写的没用
DEBUG=True
app.config.from_object('setting.Dev') #好比到DEV
@app.route('/index',method=['GET',"POST"]) def index(): return 'index' 起别名 from filask import url_from @app.route('/index',method=['GET',"POST"],endpoint='n1') #重命名 不写默认是函数名 def index(): url_from("n1") #能够经过url_form 拿到真正的url return 'index'
from filask import url_from @app.route('/index/<int:nid>',method=['GET',"POST"],endpoint='n1') #不写默认是函数名 def index(nid): url_from("n1") #能够经过n1 拿到真正的url return 'index' #支持的有 str any path int float uuid #默认 不支持正则 @app.route('/index/<int:nid>',method=['GET',"POST"],endpoint='n1') def index(nid): url_from("n1",nid=nid) #匹配完整url return 'index'
request 的参数 return的参数 #返回相应体 字符串 render_template() redicrect jsonify #内部自动序列化 #返回字符串怎么返回响应体 obj=make_response(''str') obj.headers['xxxx']='123' #头里加东西 obj.set_cookie(key,value) #设置cookie return obj
from flask import Flask,render_template,request,redirect,session app = Flask(__name__,template_folder="templates") app.config.from_object('配置文件路径') STUDENT_DICT={ 1:{'name':'xzq'}, 2:{'ss':'ss'} } @app.route('/index/<int:nid>',method=['GET',"POST"],endpoint='n1') def index(nid): url_from("n1",nid=nid) #匹配完整url return render_template('dd.html',student_dic=STUDENT_DICT)
tr #一行 th 表题 td 普通内容 tbody 内容体 theader {% for k,v in stu_dic.items() %} <tr> <td>{{ v['name'] }}</td> # v.name 也能拿到 v.get('name',默认值) </tr> <a href='del/{{k}}'> #删除 {% endfor %}
装饰器的坑 若是使用装饰器 若是再用反向解析 就不知道哪一个是哪一个了 由于全部的view都是一个inner 装饰器导入顺序 从下到下
装饰器不适合批量mysql
应用场景:比较少的
第三版本:before_request #用于执行真正视图函数以前 进行一个拦截 进行判断
@app.before_require
def xxxx():
xxx
@app.before_require
def xxxx():
xxx
#return xxx #若是返回当即返回不走后面的视图
页面看到的就是 xxx
{{ list.0 }} #索引取值方式一 {{ list[0] }} #索引取值方式二 {{ text }} #若是传递过来的是html 仍是文本 {{ text|safe }} #若是传递过来的是html 仍是文本 #Python 用Markup 至关于makesafe {{ func}} #函数地址 {{ func(7)}} #返回值 默认返回全部值 @app.template_global() def sb(a1,a2): return a1+a2 @app.template_filter(): def db(a1,a2,a3): return a1+a2+a3 {{1|db(2,3)}} #这个返回值能够对其if判断 模板继承 {% extens 'layout.html' %} {% end%} 定义宏 至关于定义函数 {% macro ccc(name,type='text',value='')%} <h1>宏</h1> <input type="{{type}}" name="{{name}}" value="{{value}}"> <input type="submit" value="提交"> {% endmacro %} {{ ccc('n1') }} {{ ccc('n2') }} 基本数据类型 能够执行Python语法
请求刚进来就建立一个空的字典 每次处理 都会内存拿那个字典 当要返回的时候 加密返回 返回前段的session的键 配置文件能够修改 SESSION_REFERSH_EACH_REQUEST #每次请求超时时间日后推
在session中存储一个数据,读取时经过pop将数据移除。web
flash('临时数据存储','error') # 值,分类 get_flashed_messages() # [ '存储'] get_flashed_messages() #[] print(get_flashed_messages(category_filter=['error'])) #经过category_filter 取error的分类里找
请求进来了 会触发app下的__call__ 方法 怎么在请求以前作一个操做,call方法执行以后执行一个操做 重改 app.wsgi_app class Middleware(object): def __init__(self, old): self.old = old def __call__(self, *args, **kwargs): print('前') ret = self.old(*args, **kwargs) print('后') return ret if __name__ == '__main__': app.wsgi_app = Middleware(app.wsgi_app) app.run() # app.__call__ # 请求来了才执行__call__方法
@app.befer_request def x1(): print("after") @app.after_request def x2(response): return response before #先定义限制性 after #后往上执行
@app.before_first_request #flask 启动执行第一次请求 @app。errorhandle(404) def not_found(arg): return '没找到'
def index(): return "index" app.add_url_rule("/xxx",None,index)
- 不用让endpoint重名
- 若是重名函数也必定要相同。正则表达式
b. 参数 rule, URL规则 view_func, 视图函数名称 endpoint=None, 名称,用于反向生成URL,即: url_for('名称') methods=None, 容许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求, False 便可baidu.com/1 也能够 baidu.com/1/ redirect_to=None, 重定向到指定地址,例如 开发完新系统后 应该重定向到新地址,而不是使用新的url redirect_to="xxx/" defaults=None, 默认值,当URL中无参数,函数须要参数时,使用defaults={'k':'v'}为函数提供参数 subdomain=None, 子域名访问
from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'wupeiqi.com:5000' """ 127.0.0.1 wupeiqi.com 127.0.0.1 web.wupeiqi.com 127.0.0.1 admin.wupeiqi.com """ # http://admin.wupeiqi.com:5000/ 才能访问 @app.route("/", subdomain="admin") def admin_index(): return "admin.your-domain.tld" # http://web.wupeiqi.com:5000/ @app.route("/", subdomain="web") def web_index(): return "web.your-domain.tld" # http://sdsdf.wupeiqi.com:5000/ # http://sdfsdf.wupeiqi.com:5000/ # http://asdf.wupeiqi.com:5000/ @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run()
import functools from flask import Flask,views app = Flask(__name__) def wrapper(func): @functools.wraps(func) def inner(*args,**kwargs): return func(*args,**kwargs) return inner class UserView(views.MethodView): methods = ['GET'] #只支持GET 请求 decorators = [wrapper,] #给每一个函数加装饰器 def get(self,*args,**kwargs): return 'GET' def post(self,*args,**kwargs): return 'POST' app.add_url_rule('/user',None,UserView.as_view('uuuu')) #uuu 就是endpoint if __name__ == '__main__': app.run()
from flask import Flask,url_for app = Flask(__name__) # 步骤一:定制类 from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数通过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 步骤二:添加到转换器 app.url_map.converters['reg'] = RegexConverter """ 1. 用户发送请求 2. flask内部进行正则匹配 3. 调用to_python(正则匹配的结果)方法 4. to_python方法的返回值会交给视图函数的参数 """ # 步骤三:使用自定义正则 @app.route('/index/<reg("\d+"):nid>') def index(nid): print(nid,type(nid)) print(url_for('index',nid=987)) return "index" if __name__ == '__main__': app.run()
#counter 视图
from flask import Blueprint,render_template
ac = Blueprint('ac',__name__)
@ac.before_request
def x1():
print('app.before_request')
@ac.route('/login')
def login():
return render_template('login.html')
@ac.route('/logout')
def logout():
return 'Logout'
#user 视图
from flask import Blueprint
uc = Blueprint('uc',__name__)
@uc.route('/list')
def list():
return 'List'
@uc.route('/detail')
def detail():
return 'detail'
#__init__ 注册蓝图 from flask import Flask from .views.account import ac from .views.user import uc def create_app(): app = Flask(__name__) app.register_blueprint(ac) app.register_blueprint(uc,url_prefix='/api') return app
manage.pyredis
from crm import create_app app = create_app() if __name__ == '__main__': app.run()
ac = Blueprint('ac',__name__,template_folder ='',static_url_path='')
app.register_blueprint(uc,url_prefix='/uc') #user/login
from flask import Blueprint
uc = Blueprint('uc',__name__)
@uc.before_request #只在局部加装饰器
def x1():
xxxx
@uc.route('/login')
def login():
xxx
箱子 :{ 线程id:{ctx:ctx 对象}, 线程id2:{ctx:ctx 对象}, } 视图函数: from flask import request 请求结束后 根据线程惟一标识 移除空调上的数据上
- 面向对象中特殊方法 setattr/getattr注意事项:sql
class Foo(object):
def __init__(self):
# self.storage = {}
object.__setattr__(self,'storage',{}) #不能再init 的时候经过.的方式赋值
def __setattr__(self, key, value):
print(key,value,self.storage)
obj = Foo()
obj.xx = 123
class Stack(object): def __init__(self): self.data = [] def push(self,val): self.data.append(val) def pop(self): return self.data.pop() def top(self): return self.data[-1] _stack = Stack() _stack.push('佳俊') _stack.push('咸鱼') print(_stack.pop()) print(_stack.pop())
try: from greenlet import getcurrent as get_ident except: from threading import get_ident """ class Local(object): def __init__(self): object.__setattr__(self,'storage',{}) #建立一个线程 定义一个空字典 def __setattr__(self, key, value): ident = get_ident() #获取表示 if ident not in self.storage: self.storage[ident] = {key:value} #若是这个表示不存在的话 就建立这个 else: self.storage[ident][key] = value #若是这个表示存在的话 修改 def __getattr__(self, item): ident = get_ident() if ident in self.storage: #有的话 返回这个 return self.storage[ident].get(item) """
人家的local数据库
__slots__ #规定了只能点哪些属性 local 作了数据隔离、 class Local(object): __slots__ = ('__storage__', '__ident_func__') def __init__(self): # __storage__ = {1231:{'stack':[]}} object.__setattr__(self, '__storage__', {}) object.__setattr__(self, '__ident_func__', get_ident) def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__(self, name): try: del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name)
pip3 install flask-sessiondjango
app.config["session_type"]='redis'
- 使用
# by luffycity.com
import redis
from flask import Flask,request,session
from flask.sessions import SecureCookieSessionInterface
from flask_session import Session
app = Flask(__name__)
# app.session_interface = SecureCookieSessionInterface()
# app.session_interface = RedisSessionInterface()
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')
Session(app)
@app.route('/login')
def login():
session['user'] = 'alex'
return 'asdfasfd'
@app.route('/home')
def index():
print(session.get('user'))
return '...'
if __name__ == '__main__':
app.run()
- 原理:
- session数据保存到redis
session:随机字符串1:q23asifaksdfkajsdfasdf
session:随机字符串2:q23asifaksdfkajsdfasdf
session:随机字符串3:q23asifaksdfkajsdfasdf
session:随机字符串4:q23asifaksdfkajsdfasdf
session:随机字符串5:q23asifaksdfkajsdfasdf
- 随机字符串返回给用户。
随机字符串和
pip3 install DBUtils setting from DBUtils.PooledDB import PooledDB, SharedDBConnection import pymysql class Config(object): SALT = b"sdf1123df" SECRET_KEY = 'asdf123sdfsdfsdf' MAX_CONTENT_LENGTH = 1024 * 1024 * 7 POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s9day118', charset='utf8' )
使用
conn=POOL.connect()
import pymysql from settings import Config def connect(): conn = Config.POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn,cursor def connect_close(conn,cursor): cursor.close() conn.close() def fetch_all(sql,args): conn,cursor = connect() cursor.execute(sql, args) record_list = cursor.fetchall() connect_close(conn,cursor) return record_list def fetch_one(sql, args): conn, cursor = connect() cursor.execute(sql, args) result = cursor.fetchone() connect_close(conn, cursor) return result def insert(sql, args): conn, cursor = connect() row = cursor.execute(sql, args) conn.commit() connect_close(conn, cursor) return row
user1
mysql > begin; mysql > select * from ss for update ; #这样才算加锁 有结果
user 2
mysql > begin; mysql > select * from ss for update ; #阻塞 锁还没释放
释放锁的操做
commit; 支持行锁
pip3 install wtforms from flask import Flask,request,render_template,session,current_app,g,redirect from wtforms import Form from wtforms.fields import simple from wtforms.fields import html5 from wtforms.fields import core from wtforms import widgets from wtforms import validators app = Flask(__name__) class LoginForm(Form): name = simple.StringField( #定义字段类型 validators=[ # validators.DataRequired(message='用户名不能为空.'), #制定规则 # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), #制定input类型 render_kw={'placeholder':'请输入用户名'} #增长属性 透明提示 ) pwd = simple.PasswordField( validators=[ validators.DataRequired(message='密码不能为空.'), # validators.Length(min=8, message='用户名长度必须大于%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], render_kw={'placeholder':'请输入密码'} ) @app.route('/login',methods=['GET','POST']) def login(): if request.method == "GET": form = LoginForm() # print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__ # print(form.pwd,type(form.pwd)) # form.pwd是PasswordField()对象,PasswordField().__str__ return render_template('login.html',form=form) #返回表单 form = LoginForm(formdata=request.form) #生成有数据表单 if form.validate(): #校验 print(form.data) return redirect('https://www.luffycity.com/home') else: # print(form.errors) return render_template('login.html', form=form) class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, #添加属性 default='alex' #默认值 ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # int("1") ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜爱', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, ] ) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm() return render_template('register.html', form=form) form = RegisterForm(formdata=request.form) if form.validate(): print(form.data) return redirect('https://www.luffycity.com/home') return render_template('register.html', form=form) import helper class UserForm(Form): city = core.SelectField( label='城市', choices=(), coerce=int ) name = simple.StringField(label='姓名') def __init__(self,*args,**kwargs): super(UserForm,self).__init__(*args,**kwargs) self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None) @app.route('/user') def user(): if request.method == "GET": #form = UserForm(data={'name':'alex','city':3}) form = UserForm() return render_template('user.html',form=form) if __name__ == '__main__': app.run()
{{% for field in from %}}
{{ filed }}
{{ field.lable }}
{% endfor %}
默认选中
favor = core.SelectMultipleField( label='喜爱', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, ] )
显示错误信息
{{filed.error[0] }}