2.1.1 先后端分离;html
2.1.2 导师后台+管理后台+主站(本人负责)前端
#!/usr/bin/env python # -*- coding:utf-8 -*- import json from django.core.exceptions import ObjectDoesNotExist from django.conf import settings from rest_framework.views import APIView from rest_framework.viewsets import ViewSetMixin from rest_framework.viewsets import ModelViewSet from rest_framework.response import Response from repository import models from api.serializer.payment import ShoppingCarSerializer from api.utils.auth.token_auth import LuffyTokenAuthentication from api.utils.auth.token_permission import LuffyPermission from api.utils import redis_pool from api.utils.exception import PricePolicyDoesNotExist class ShoppingCarView(ViewSetMixin, APIView): """ 购物车接口 """ authentication_classes = [LuffyTokenAuthentication, ] permission_classes = [LuffyPermission, ] def get(self, request, *args, **kwargs): """ 根据用户ID获取购物车全部东西 :param request: :param args: :param kwargs: :return: """ response = {'code': 1000, 'data': None} try: product_dict = redis_pool.conn.hget(settings.REDIS_SHOPPING_CAR_KEY, request.user.id) if product_dict: product_dict = json.loads(product_dict.decode('utf-8')) response['data'] = product_dict except Exception as e: response['code'] = 1001 response['msg'] = "获取购物车列表失败" return Response(response) def post(self, request, *args, **kwargs): """ # 根据课程ID获取课程信息以及相关全部价格策略 chopping_car = { request.user.id:{ course.id:{ title:'xx', img:'xx', choice_policy_id:1, price_policy_dict:{ {id:1,price:'9.9', period:'1个月'}, {id:2,price:'19.9',period:'3个月'}, {id:3,price:'59.9',period:'8个月'}, }, } }, course.id:[ title:'xx', img:'xx', choice_policy_id:1, price_policy_dict:{ {id:1,price:'9.9', period:'1个月'}, {id:2,price:'19.9',period:'3个月'}, {id:3,price:'59.9',period:'8个月'}, }, ] } } } :param request: :param args: :param kwargs: :return: """ response = {'code': 1000, 'msg': None} try: course_id = int(request.data.get('course_id')) policy_id = int(request.data.get('policy_id')) # 获取课程信息 course = models.Course.objects.exclude(course_type=2).filter(status=0).get(id=course_id) # 序列化课程信息,并获取其关联的全部价格策略 ser = ShoppingCarSerializer(instance=course, many=False) product = ser.data # 判断价格策略是否存在 policy_exist = False for policy in product['price_policy_list']: if policy['id'] == policy_id: policy_exist = True break if not policy_exist: raise PricePolicyDoesNotExist() # 设置默认选中的价格策略 product.setdefault('choice_policy_id', policy_id) # 获取当前用户在购物车中已存在的课程,若是存在则更新,不然添加新课程 product_dict = redis_pool.conn.hget(settings.REDIS_SHOPPING_CAR_KEY, request.user.id) if not product_dict: product_dict = {course_id: product} else: product_dict = json.loads(product_dict.decode('utf-8')) product_dict[course_id] = product # 将新课程写入到购物车 redis_pool.conn.hset(settings.REDIS_SHOPPING_CAR_KEY, request.user.id, json.dumps(product_dict)) except ObjectDoesNotExist as e: response['code'] = 1001 response['msg'] = '视频不存在' except PricePolicyDoesNotExist as e: response['code'] = 1002 response['msg'] = '价格策略不存在' except Exception as e: print(e) response['code'] = 1003 response['msg'] = '添加购物车失败' return Response(response) def delete(self, request, *args, **kwargs): """ 删除购物车中的课程 :param request: :param args: :param kwargs: :return: """ response = {'code': 1000} try: course_id = kwargs.get('pk') product_dict = redis_pool.conn.hget(settings.REDIS_SHOPPING_CAR_KEY, request.user.id) if not product_dict: raise Exception('购物车中无课程') product_dict = json.loads(product_dict.decode('utf-8')) if course_id not in product_dict: raise Exception('购物车中无该商品') del product_dict[course_id] redis_pool.conn.hset(settings.REDIS_SHOPPING_CAR_KEY, request.user.id, json.dumps(product_dict)) except Exception as e: response['code'] = 1001 response['msg'] = str(e) return Response(response) def put(self, request, *args, **kwargs): """ 更新购物车中的课程的默认的价格策略 :param request: :param args: :param kwargs: :return: """ response = {'code': 1000} try: course_id = kwargs.get('pk') policy_id = request.data.get('policy_id') product_dict = redis_pool.conn.hget(settings.REDIS_SHOPPING_CAR_KEY, request.user.id) if not product_dict: raise Exception('购物车清单不存在') product_dict = json.loads(product_dict.decode('utf-8')) if course_id not in product_dict: raise Exception('购物车清单中商品不存在') policy_exist = False for policy in product_dict[course_id]['price_policy_list']: if policy['id'] == policy_id: policy_exist = True break if not policy_exist: raise PricePolicyDoesNotExist() product_dict[course_id]['choice_policy_id'] = policy_id redis_pool.conn.hset(settings.REDIS_SHOPPING_CAR_KEY, request.user.id, json.dumps(product_dict)) except PricePolicyDoesNotExist as e: response['code'] = 1001 response['msg'] = '价格策略不存在' except Exception as e: response['code'] = 1002 response['msg'] = str(e) return Response(response)
#!/usr/bin/env python # -*- coding:utf-8 -*- import json import datetime from django.conf import settings from rest_framework.views import APIView from rest_framework.response import Response from api.utils.auth.token_auth import LuffyTokenAuthentication from api.utils.auth.token_permission import LuffyPermission from api.utils import redis_pool from repository import models class PaymentView(APIView): """ 去结算接口 """ authentication_classes = [LuffyTokenAuthentication, ] permission_classes = [LuffyPermission, ] def get(self, request, *args, **kwargs): """ 获取结算列表 :param request: :param args: :param kwargs: :return: """ response = {'code': 1000} try: # 结算商品列表 payment_list = redis_pool.conn.hget(settings.REDIS_PAYMENT_KEY, request.user.id) if not payment_list: raise Exception() response['data'] = { 'payment_list': json.loads(payment_list.decode('utf-8')), # 结算信息(课程、价格和优惠券) "balance": request.user.balance # 我的贝里帐户,可以使用贝里金额 } except Exception as e: response['code'] = 1001 response['msg'] = "结算列表为空" return Response(response) def post(self, request, *args, **kwargs): """ 去结算 方案一(示例):用户提交课程id,去redis购物车中获取其选好的价格策略,再次检测课程和价格策略的合法性。 PS: 直接购买时,须要先加入购物车,再当即去结算 方案二:用户提交课程id和价格策略id,去数据库验证其合法性。 PS: 直接购买时,直接去结算 user.id: { policy_course_dict:{ 课程ID:{ 'course_id': course_id, 'course_name': product['name'], 'course_img': product['course_img'], 'policy_id': product['choice_policy_id'], 'policy_price': policy_price, 'policy_': policy_period, 'coupon_record_list': [ {'id': 0, 'text': '请选择优惠券'}, {'id': 1, 'type':1, 'text': '优惠券1', ..}, {'id': 2, 'type':2, 'text': '优惠券1', ..}, {'id': 3, 'type':3, 'text': '优惠券1', ..}, ], }, 课程ID:{ 'course_id': course_id, 'course_name': product['name'], 'course_img': product['course_img'], 'policy_id': product['choice_policy_id'], 'policy_price': policy_price, 'policy_': policy_period, 'coupon_record_list': [ {'id': 0, 'text': '请选择优惠券'}, {'id': 1, 'type':1, 'text': '优惠券1', ..}, {'id': 2, 'type':2, 'text': '优惠券1', ..}, {'id': 3, 'type':3, 'text': '优惠券1', ..}, ], } }, global_coupon_dict:{ 1:{'type': 0, 'text': "通用优惠券", 'id': 1, ..}, 2:{'type': 0, 'text': "通用优惠券", 'id': 2, ..}, 3:{'type': 0, 'text': "通用优惠券", 'id': 3, ...}, 4:{'type': 0, 'text': "通用优惠券", 'id': 4, ...}, } } :param request: :param args: :param kwargs: :return: """ response = {'code': 1001} try: """ 1. 获取要支付的课程ID 2. 检查购物车中是否存在,不存在则报错 循环用户提交的课程ID,去购物车中获取,若是不存在,就报错。 """ # 获取用户提交的课程id course_id_list = request.data.get('course_list') if not course_id_list or not isinstance(course_id_list, list): raise Exception('请选择要结算的课程') # 购物车中检查是否已经有课程(应该有课程的) product_dict = redis_pool.conn.hget(settings.REDIS_SHOPPING_CAR_KEY, request.user.id) if not product_dict: raise Exception('购物车无课程') # 购物车中是否有用户要购买的课程 product_dict = json.loads(product_dict.decode('utf-8')) # ###### 课程、价格和优惠券 ####### policy_course_dict = {} for course_id in course_id_list: course_id = str(course_id) product = product_dict.get(course_id) if not product: raise Exception('购买的课程必须先加入购物车') policy_exist = False for policy in product['price_policy_list']: if policy['id'] == product['choice_policy_id']: policy_price = policy['price'] policy_period = policy['period'] policy_valid_period = policy['valid_period'] policy_exist = True break if not policy_exist: raise Exception('购物车中的课程无此价格') policy_course = { 'course_id': course_id, 'course_name': product['name'], 'course_img': product['course_img'], 'policy_id': product['choice_policy_id'], 'policy_price': policy_price, 'policy_period': policy_period, 'policy_valid_period': policy_valid_period, 'coupon_record_list': [ {'id': 0, 'text': '请选择优惠券'}, ], } policy_course_dict[course_id] = policy_course # 获取当前全部优惠券 user_coupon_list = models.CouponRecord.objects.filter(account=request.user, status=0) # ###### 全局优惠券 ####### global_coupon_record_dict = {} # 课程优惠券添加到课程中;全局优惠券添加到全局 current_date = datetime.datetime.now().date() for record in user_coupon_list: # 检查优惠券是否已通过期 begin_date = record.coupon.valid_begin_date end_date = record.coupon.valid_end_date if begin_date: if current_date < begin_date: continue if end_date: if current_date > end_date: continue # 全局优惠券 if not record.coupon.content_type: if record.coupon.coupon_type == 0: temp = {'type': 0, 'text': "通用优惠券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'money_equivalent_value': record.coupon.money_equivalent_value} elif record.coupon.coupon_type == 1: temp = {'type': 1, 'text': "满减券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'minimum_consume': record.coupon.minimum_consume, 'money_equivalent_value': record.coupon.money_equivalent_value} elif record.coupon.coupon_type == 2: temp = {'type': 2, 'text': "折扣券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'off_percent': record.coupon.off_percent} else: continue global_coupon_record_dict[record.id] = temp # 课程优惠券 else: cid = record.coupon.object_id if record.coupon.content_type.model == 'course' and cid in policy_course_dict: # 课程价格:满减,打折,通用 if record.coupon.coupon_type == 0: temp = {'type': 0, 'text': "通用优惠券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'money_equivalent_value': record.coupon.money_equivalent_value} elif record.coupon.coupon_type == 1 and policy_course_dict[cid][ 'policy_price'] >= record.coupon.minimum_consume: temp = {'type': 1, 'text': "满减券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'minimum_consume': record.coupon.minimum_consume, 'money_equivalent_value': record.coupon.money_equivalent_value} elif record.coupon.coupon_type == 2: temp = {'type': 2, 'text': "折扣券", 'id': record.id, 'begin_date': begin_date, 'end_date': end_date, 'off_percent': record.coupon.off_percent} else: continue policy_course_dict[cid]['coupon_record_list'].append(temp) user_pay = { 'policy_course_dict': policy_course_dict, 'global_coupon_record_dict': global_coupon_record_dict } redis_pool.conn.hset(settings.REDIS_PAYMENT_KEY, request.user.id, json.dumps(user_pay)) except Exception as e: response['code'] = 1002 response['msg'] = str(e) return Response(response)
#!/usr/bin/env python # -*- coding:utf-8 -*- import json import time import random import datetime from django.conf import settings from django.db import transaction from django.db.models import F from rest_framework.views import APIView from rest_framework.response import Response from api.utils.auth.token_auth import LuffyTokenAuthentication from api.utils.auth.token_permission import LuffyPermission from api.utils import redis_pool from api.utils.alipay import AliPay from repository import models def generate_order_num(): """ 生成订单编号, 且必须惟一 :return: """ while True: order_num = time.strftime('%Y%m%d%H%M%S', time.localtime()) + str(random.randint(111, 999)) if not models.Order.objects.filter(order_number=order_num).exists(): break return order_num def generate_transaction_num(): """ 生成流水编号, 且必须惟一 :return: """ while True: transaction_number = time.strftime('%Y%m%d%H%M%S', time.localtime()) + str(random.randint(111, 999)) if not models.TransactionRecord.objects.filter(transaction_number=transaction_number).exists(): break return transaction_number class PayOrderView(APIView): authentication_classes = [LuffyTokenAuthentication, ] permission_classes = [LuffyPermission, ] def post(self, request, *args, **kwargs): """ 去支付,生成订单。 获取前端提交的购买信息 { course_price_list:[ {'policy_id':1, '':'course_id':1, 'coupon_record_id':1}, {'policy_id':2, '':'course_id':2, 'coupon_record_id':2}, ], coupon_record_id:1, alipay: 99, balance: 1 } 1. 用户提交 - balance - alipay 2. 获取去结算列表 课程 3. 循环全部课程 - 获取原价 - 抵扣的钱 :param request: :param args: :param kwargs: :return: """ response = {'code': 1000} try: # 用户请求验证 policy_course_list = request.data.get('course_price_list') coupon_record_id = request.data.get('coupon_record_id') alipay = request.data.get('alipay') # >= 0 balance = request.data.get('balance') # >= 0 if balance > request.user.balance: raise Exception('帐户中贝里余额不足') # 检查用户提交的信息在 redis结算列表 中是否存在,若是不存在,则须要用户从购物车中再次去结算 payment_dict_bytes = redis_pool.conn.hget(settings.REDIS_PAYMENT_KEY, request.user.id) payment_dict = json.loads(payment_dict_bytes.decode('utf-8')) policy_course_dict = payment_dict['policy_course_dict'] global_coupon_record_dict = payment_dict['global_coupon_record_dict'] global_coupon_record = {} # 全局优惠券 if coupon_record_id: if coupon_record_id not in global_coupon_record_dict: raise Exception('全局优惠券在缓存中不存在') global_coupon_record = global_coupon_record_dict[coupon_record_id] # 当前时间 current_date = datetime.datetime.now().date() current_datetime = datetime.datetime.now() # 原价 total_price = 0 # 总抵扣的钱 discount = 0 # 使用优惠券ID列表 if coupon_record_id: use_coupon_record_id_list = [coupon_record_id, ] else: use_coupon_record_id_list=[] # 课程和优惠券 buy_course_record = [] for cp in policy_course_list: _policy_id = cp['policy_id'] _course_id = cp['course_id'] _coupon_record_id = cp['coupon_record_id'] temp = { 'course_id': _course_id, 'course_name': "course", 'valid_period': 0, # 有效期:30 'period': 0, # 有效期:一个月 'original_price': 0, 'price': 0, } if str(_course_id) not in policy_course_dict: raise Exception('课程在缓存中不存在') redis_course = policy_course_dict[str(_course_id)] if str(_policy_id) != str(redis_course['policy_id']): raise Exception('价格策略在缓存中不存在') # 课程是否已经下线或价格策略被修改 policy_object = models.PricePolicy.objects.get(id=_policy_id) # 价格策略对象 course_object = policy_object.content_object # 课程对象 if course_object.id != _course_id: raise Exception('课程和价格策略对应失败') if course_object.status != 0: raise Exception('课程已下线,没法购买') # 选择的优惠券是否在缓存中 redis_coupon_list = redis_course['coupon_record_list'] redis_coupon_record = None for item in redis_coupon_list: if item['id'] == _coupon_record_id: redis_coupon_record = item break if not redis_coupon_record: raise Exception('单课程优惠券在缓存中不存在') # 计算购买原总价 total_price += policy_object.price # 未使用单课程优惠券 if redis_coupon_record['id'] == 0: temp['price'] = policy_object.price buy_course_record.append(temp) continue temp['original_price'] = policy_object.price temp['valid_period'] = redis_coupon_record['policy_valid_period'] temp['period'] = redis_coupon_record['policy_period'] # 缓存中的优惠券是否已通过期 begin_date = redis_coupon_record.get('begin_date') end_date = redis_coupon_record.get('end_date') if begin_date: if current_date < begin_date: raise Exception('优惠券使用还未到时间') if end_date: if current_date > end_date: raise Exception('优惠券已过时') # 使用的是单课程优惠券抵扣了多少钱;使用的 我的优惠券ID if redis_coupon_record['type'] == 0: # 通用优惠券 money = redis_coupon_record['money_equivalent_value'] discount += money elif redis_coupon_record['type'] == 1: # 满减券 money = redis_coupon_record['money_equivalent_value'] minimum_consume = redis_coupon_record['minimum_consume'] if policy_object.price >= minimum_consume: discount += money elif redis_coupon_record['type'] == 2: # 打折券 money = policy_object.price * redis_coupon_record['off_percent'] discount += money temp['price'] = policy_object.price - money buy_course_record.append(temp) use_coupon_record_id_list.append(redis_coupon_record['id']) # 全局优惠券 print(global_coupon_record) begin_date = global_coupon_record.get('begin_date') end_date = global_coupon_record.get('end_date') if begin_date: if current_date < begin_date: raise Exception('优惠券使用还未到时间') if end_date: if current_date > end_date: raise Exception('优惠券已过时') # 使用全局优惠券抵扣了多少钱 if global_coupon_record.get('type') == 0: # 通用优惠券 money = global_coupon_record['money_equivalent_value'] discount += money elif global_coupon_record.get('type') == 1: # 满减券 money = global_coupon_record['money_equivalent_value'] minimum_consume = global_coupon_record['minimum_consume'] if (total_price - discount) >= minimum_consume: discount += money elif global_coupon_record.get('type') == 2: # 打折券 money = (total_price - discount) * global_coupon_record['off_percent'] discount += money # 贝里抵扣的钱 if balance: discount += balance if (alipay + discount) != total_price: raise Exception('总价、优惠券抵扣、贝里抵扣和实际支付的金额不符') # 建立订单 + 支付宝支付 # 建立订单详细 # 贝里抵扣 + 贝里记录 # 优惠券状态更新 actual_amount = 0 if alipay: payment_type = 1 # 支付宝 actual_amount = alipay elif balance: payment_type = 3 # 贝里 else: payment_type = 2 # 优惠码 with transaction.atomic(): order_num = generate_order_num() if payment_type == 1: order_object = models.Order.objects.create( payment_type=payment_type, order_number=order_num, account=request.user, actual_amount=actual_amount, status=1, # 待支付 ) else: order_object = models.Order.objects.create( payment_type=payment_type, order_number=order_num, account=request.user, actual_amount=actual_amount, status=0, # 支付成功,优惠券和贝里已够支付 pay_time=current_datetime ) for item in buy_course_record: detail = models.OrderDetail.objects.create( order=order_object, content_object=models.Course.objects.get(id=item['course_id']), original_price=item['original_price'], price=item['price'], valid_period_display=item['period'], valid_period=item['valid_period'] ) models.Account.objects.filter(id=request.user.id).update(balance=F('balance') - balance) models.TransactionRecord.objects.create( account=request.user, amount=request.user.balance, balance=request.user.balance - balance, transaction_type=1, content_object=order_object, transaction_number=generate_transaction_num() ) effect_row = models.CouponRecord.objects.filter(id__in=use_coupon_record_id_list).update( order=order_object, used_time=current_datetime) if effect_row != len(use_coupon_record_id_list): raise Exception('优惠券使用失败') response['payment_type'] = payment_type # 生成支付宝URL地址 if payment_type == 1: pay = AliPay(debug=True) query_params = pay.direct_pay( subject="路飞学城", # 商品简单描述 out_trade_no=order_num, # 商户订单号 total_amount=actual_amount, # 交易金额(单位: 元 保留俩位小数) ) pay_url = "https://openapi.alipaydev.com/gateway.do?{}".format(query_params) response['pay_url'] = pay_url except IndentationError as e: response['code'] = 1001 response['msg'] = str(e) return Response(response)
# -*- coding:utf-8 -*- # Project: FlaskFull # Software: PyCharm # Time : 2018-09-17 10:12 # File : s2.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) @app.route('/index/') def index(): return 'Hello World!' if __name__ == '__main__': app.run()#run_simple(host,port,app)
from flask import Flask, render_template, request, redirect, session app = Flask(__name__, template_folder='templates') # 基于app这个对象设置secret_key的值,任意设置! app.secret_key = 'nishifdalkj4389!@#$28908' @app.route('/login/', methods=['GET', 'POST']) def hello_world(): # return 'Hello World!' if request.method == "GET": return render_template('login.html') user = request.form.get('usr') pwd = request.form.get('pwd') if user == 'cuixiaozhao' and pwd == '123456': # 将用户信息放入session; session['user_info'] = user """ RuntimeError: The session is unavailable because no secret key was set. Set the secret_key on the application to something unique and secret. 127.0.0.1 - - [17/Sep/2018 21:02:04] "POST /login/ HTTP/1.1" 500 - """ return redirect('/index/') else: # 两种传值方法都可,比Django灵活一些; # return render_template('login.html', msg='用户名或者密码错误!') return render_template('login.html', **{'msg': '用户名或者密码错误!'}) @app.route('/index/') def index(): user_info = session.get('user_info') if not user_info: return redirect('/login/') else: return '欢迎登录!' @app.route('/logout/') def logout(): del session['user_info'] return redirect('/login/') if __name__ == '__main__': app.run()
from flask import Flask, render_template, request, redirect, session app = Flask(__name__, template_folder='templates') # 基于app这个对象设置secret_key的值,任意设置! app.secret_key = 'nishifdalkj4389!@#$28908' app.debug = True USER_DICT = { '1': {'name': '志军', 'age': 18}, '2': {'name': '大伟', 'age': 48}, '3': {'name': '美凯', 'age': 38} } @app.route('/login/', methods=['GET', 'POST']) def hello_world(): # return 'Hello World!' if request.method == "GET": return render_template('login.html') user = request.form.get('usr') pwd = request.form.get('pwd') if user == 'cuixiaozhao' and pwd == '123456': # 将用户信息放入session; session['user_info'] = user """ RuntimeError: The session is unavailable because no secret key was set. Set the secret_key on the application to something unique and secret. 127.0.0.1 - - [17/Sep/2018 21:02:04] "POST /login/ HTTP/1.1" 500 - """ return redirect('/index/') else: # 两种传值方法都可,比Django灵活一些; # return render_template('login.html', msg='用户名或者密码错误!') return render_template('login.html', **{'msg': '用户名或者密码错误!'}) @app.route('/index/') def index(): user_info = session.get('user_info') if not user_info: return redirect('/login/') return render_template('index.html', user_dict=USER_DICT) @app.route('/detail/') def detail(): user_info = session.get('user_info') if not user_info: return redirect('/login/') uid = request.args.get('uid') info = USER_DICT.get(uid) return render_template('detail.html', info=info) @app.route('/logout/') def logout(): del session['user_info'] return redirect('/login/') if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 10:45 # File : 1.装饰器.py # Author : 天晴天朗 # Email : tqtl@tqtl.org import functools def wapper(func): @functools.wraps(func) def inner(*args,**kwargs): return func(*args,**kwargs) return inner ''' 一、执行wapper函数,并将被装饰的函数当作参数。wapper(index) 二、将第一步的返回值,从新赋值给index = wapper(old index) ''' #一、为何要使用装饰器?在不改变原来函数的基础之上,对函数执行先后进行自定义操做; @wapper def index(a1): return a1 +1000 v = index(2) print(v) #获取函数名 print("打印函数名:",index.__name__) @wapper def order(a1): return a1+1000 print(index.__name__) print(order.__name__)
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 11:06 # File : 3.面向对象.py # Author : 天晴天朗 # Email : tqtl@tqtl.org "" """ 谈谈你对面向对象的认识? 封装: 将同一类方法分为一类,方法封装到类中; 将方法中的共同的参数封装到对象中,把共同的值封装到对象中; """ # 用户类实现; class File: def __init__(self, a1, a2, a3, a4): self.a1 = a1 self.a2 = a2 self.a3 = a3 self.a4 = a4 def file_add(self): pass def file_del(self): pass def file_update(self): pass def file_fetch(self): pass # 给了一些值,将数据加工,应用场景:Django自定义分页; class Foo(): def __init__(self, a1, a2, a3, a4, a5, a6, a7): self.a1 = a1 self.a2 = a2 self.a3 = a3 self.a4 = a4 self.a5 = a5 self.a6 = a6 self.a7 = a7 def sum(self): return self.a1 + self.a2 def reduce(self): return self.a5 - self.a7 obj = File(1, 2, 3, 4) print(obj) # <__main__.File object at 0x10bbf25c0> class A(object): def __init__(self): self.age1 = 123 self.a = A() class B(object): def __init__(self): self.age2 = 123 self.b = B() class C(object): def __init__(self): self.age3 = 123 self.c = C() class D(object): def __init__(self): self.age4 = 123 self.d = D()
from flask import Flask, render_template, redirect app = Flask(__name__) # Flask的配置文件这么玩耍; app.config.from_object("settings.DevelopmentConfig")#settings后面是一个类名; @app.route('/index/', methods=['GET', 'POST']) def index(): return 'Hello World!' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 11:25 # File : settings.py # Author : 天晴天朗 # Email : tqtl@tqtl.org class BaseConfig(object): DEBUG = False TESTING = False DATABASE_URI = 'sqllite://:memory:' class ProductionConfig(BaseConfig): DATABASE_URI = 'mysql://user@production/foo' class DevelopmentConfig(BaseConfig): DEBUG = True DATABASE_URI = 'mysql://user@development/foo' class TestingConfig(BaseConfig): DEBUG = True DATABASE_URI = 'mysql://user@test/foo'
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为: { 'DEBUG': get_debug_flag(default=False), 是否开启Debug模式; 'TESTING': False, 是否开启测试模式; 'PROPAGATE_EXCEPTIONS': None, 'PRESERVE_CONTEXT_ON_EXCEPTION': None, 'SECRET_KEY': None, 'PERMANENT_SESSION_LIFETIME': timedelta(days=31), 'USE_X_SENDFILE': False, 'LOGGER_NAME': None, 'LOGGER_HANDLER_POLICY': 'always', 'SERVER_NAME': None, 'APPLICATION_ROOT': None, 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, 'MAX_CONTENT_LENGTH': None, 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12), 'TRAP_BAD_REQUEST_ERRORS': False, 'TRAP_HTTP_EXCEPTIONS': False, 'EXPLAIN_TEMPLATE_LOADING': False, 'PREFERRED_URL_SCHEME': 'http', 'JSON_AS_ASCII': True, 'JSON_SORT_KEYS': True, 'JSONIFY_PRETTYPRINT_REGULAR': True, 'JSONIFY_MIMETYPE': 'application/json', 'TEMPLATES_AUTO_RELOAD': None, } 方式一: app.config['DEBUG'] = True PS: 因为Config对象本质上是字典,因此还可使用app.config.update(...) 方式二: app.config.from_pyfile("python文件名称") 如: settings.py DEBUG = True app.config.from_pyfile("settings.py") app.config.from_envvar("环境变量名称") 环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称") JSON文件名称,必须是json格式,由于内部会执行json.loads app.config.from_mapping({'DEBUG':True}) 字典格式 app.config.from_object("python类或类的路径") app.config.from_object('pro_flask.settings.TestingConfig') settings.py class Config(object): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True PS: 从sys.path中已经存在路径开始写; PS: settings.py文件默认路径要放在程序root_path目录,若是instance_relative_config为True,则就是instance_path目录;
from flask import Flask, render_template, redirect app = Flask(__name__) # Flask的配置文件这么玩耍; app.config.from_object("settings.DevelopmentConfig") # 添加的第一种方式,推荐使用装饰器的方式; @app.route('/index/', methods=['GET', 'POST']) def index(): return '# 添加的第一种方式,推荐使用装饰器的方式;' # 添加路由的另一种方式; def order(): return '# 添加路由的第二种方式;' app.add_url_rule('/order/', view_func=order) if __name__ == '__main__': app.run() if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 12:06 # File : 4.反向生成URL.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, redirect, url_for app = Flask(__name__) # endpoint&url_for不起别名默认就是函数名; @app.route('/index/', methods=['GET', 'POST']) def index(): v1 = url_for('n1') # v2 = url_for('n1') # v2 = url_for('n2') v2 = url_for('login') v3 = url_for('logout') print(v1, v2, v3) return 'Index' @app.route('/login/', methods=['GET', 'POST'], endpoint='n2') def login(): return 'Login' @app.route('/logout/', methods=['GET', 'POST'], endpoint='n3') def logout(): return 'Logout'
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 19:14 # File : 6.Flask框架之app.route参数.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) # 从旧功能重定向至新功能页面;Js能够作重定向;meta头、js location href @app.route('/index/', methods=['GET', 'POST'], redirect_to='/new/') def index(): return '旧的功能' @app.route('/new/', methods=['GET', 'POST']) def new(): return '新功能' if __name__ == '__main__': app.run()
DEFAULT_CONVERTERS = { 'default': UnicodeConverter, 'string': UnicodeConverter, 'any': AnyConverter, 'path': PathConverter, 'int': IntegerConverter, 'float': FloatConverter, 'uuid': UUIDConverter, }
def auth(func): def inner(*args, **kwargs): print('before') result = func(*args, **kwargs) print('after') return result return inner @app.route('/index.html',methods=['GET','POST'],endpoint='index') @auth def index(): return 'Index' 或 def index(): return "Index" self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"]) or app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"]) app.view_functions['index'] = index 或 def auth(func): def inner(*args, **kwargs): print('before') result = func(*args, **kwargs) print('after') return result return inner class IndexView(views.View): methods = ['GET'] decorators = [auth, ] def dispatch_request(self): print('Index') return 'Index!' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint 或 class IndexView(views.MethodView): methods = ['GET'] decorators = [auth, ] def get(self): return 'Index.GET' def post(self): return 'Index.POST' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint @app.route和app.add_url_rule参数: rule, URL规则 view_func, 视图函数名称 defaults=None, 默认值,当URL中无参数,函数须要参数时,使用defaults={'k':'v'}为函数提供参数 endpoint=None, 名称,用于反向生成URL,即: url_for('名称') methods=None, 容许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求, 如: @app.route('/index',strict_slashes=False), 访问 http://www.xx.com/index/ 或 http://www.xx.com/index都可 @app.route('/index',strict_slashes=True) 仅访问 http://www.xx.com/index redirect_to=None, 重定向到指定地址 如: @app.route('/index/<int:nid>', redirect_to='/home/<nid>') 或 def func(adapter, nid): return "/home/888" @app.route('/index/<int:nid>', redirect_to=func) subdomain=None, 子域名访问 from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'wupeiqi.com:5000' @app.route("/", subdomain="admin") def static_index(): """Flask supports static subdomains This is available at static.your-domain.tld""" return "static.your-domain.tld" @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run() a.注册路由原理
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 19:14 # File : 4.Flask框架之app.route参数.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) # 从旧功能重定向至新功能页面;Js能够作重定向;meta头、js location href @app.route('/index/', methods=['GET', 'POST'], redirect_to='/new/') def index(): return '旧的功能' @app.route('/new/', methods=['GET', 'POST']) def new(): return '新功能' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 19:14 # File : 7.Flask框架之获取子域名.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) app.config['SERVER_NAME'] = 'www.cuixiaozhao.com:5000' # 从旧功能重定向至新功能页面;Js能够作重定向;meta头、js location href @app.route('/dynamic/', methods=['GET', 'POST'], subdomain='<username>') def sub_domain(username): print(username) return '旧的功能1' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 19:14 # File : 8.添加装饰器.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask import functools app = Flask(__name__) def wapper(func): @functools.wraps(func) def inner(*args, **kwargs): print('before') return func(*args, **kwargs) return inner @app.route('/xxxx/', methods=['GET', 'POST']) @wapper def index(): return 'Index' @app.route('/xxxx/', methods=['GET', 'POST']) @wapper def order(): return 'Order' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 21:57 # File : 9.Flask框架之CBV和FBV.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, redirect, render_template, views app = Flask(__name__) import functools def wapper(func): @functools.wraps(func) def inner(*args, **kwargs): print('before') return func(*args, **kwargs) return inner @app.route('/xxx/', methods=['GET', 'POST']) @wapper def index(): return 'Index' class IndexView(views.View): methods = ['GET'] decorators = [wapper, ] def dispatch_request(self): print('Index') return 'Index' app.add_url_rule('/index/', view_func=IndexView.as_view(name='index')) # name == endpoint # CBV方式; class IndexView(views.MethodView): methods = ['GET'] decorators = [wapper] def get(self): return 'Index.GET' def post(self): return 'Index POST' app.add_url_rule('/index/', view_func=IndexView.as_view(name='index')) # name = endpoint if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 22:13 # File : 12.请求和响应.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, request, redirect, jsonify, make_response app = Flask(__name__) app.config.from_object("settings.DevelopmentConfig") @app.route('/index/', methods=['GET', 'POST']) def index(): # 请求相关; request.args # 响应相关; return '' return render_template() return redirect('/index/') # 返回json数据; return json.dumps({}) # return jsonify({}) if __name__ == '__main__': app.run() """ # 请求相关信息; # request.method # request.args # request.form # request.values # request.cookies # request.headers # request.path # request.full_path # request.script_root # request.url # request.base_url # request.url_root # request.host_url # request.host # request.files # obj = request.files['the_file_name'] # obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 响应相关信息; # return "字符串" # return render_template('html模板路径',**{}) # return redirect('/index.html') # response = make_response(render_template('index.html')) # response是flask.wrappers.Response类型; # response.delete_cookie('key') # response.set_cookie('key', 'value') # response.headers['X-Something'] = 'A value' # return response """
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-18 22:23 # File : 13.模板.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, redirect, jsonify, make_response, Markup app = Flask(__name__) # 全局模板——每一个模板都可调用的函数; @app.template_global() def cxz(a1, a2): return a1 + a2 def input(value): return Markup("<input value:'%s'/>" % value) def gen_input(value): return Markup("<input value:'%s'/>" % value) @app.route('/computed/', methods=['GET', 'POST']) def computed(): context = { 'k1': 123, 'k2': [11, 22, 33], 'k3': {'name': 'cuixiaozhao', 'age': 84}, 'k4': lambda x: x + 1, # 用户写简单的函数; 'k5': gen_input } return render_template('index.html', **context) if __name__ == '__main__': app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <div>头部</div> <div> {% block content %} {% endblock %} </div> <div>底部</div> </body> </html>
{% extends 'layout.html' %} {% block content %} <h1>{{ k1 }}</h1> <h1>{{ k2.0 }} {{ k2[0] }}</h1> <h1>{{ k3.name }} {{ k3['name'] }}{{ k3.get('name',19930911) }}</h1> <h1>{{ k4 }}</h1> <h1>{{ k5(99) }}</h1> {% endblock %}
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 16:37 # File : 1.Flask中的session.py # Author : 天晴天朗 # Email : tqtl@tqtl.org "" """ Session的请求流程; 一、请求刚刚到达; 二、视图函数; 三、请求结果; """ from flask import Flask, session app = Flask(__name__) app.secret_key = 'fdjljfaljfkla' @app.route('/index/') def index(): session['k1'] = 123 return 'Index' @app.route('/order/') def order(): print(session['k1']) return 'Order' if __name__ == '__main__': app.run() """ 一、Flask 二、RequestContext 三、Request 四、SecureCookieSessionInterface 五、SecureCookieSession(dict ) """
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 17:22 # File : 1.Flask中内置的特殊装饰器.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, redirect app = Flask(__name__) # before_request和after_request相似于Django中的中间件middleware; @app.before_request def before_req(): print('before_request,前') @app.before_request def before_req1(): print('before_request1,前') # request以前,会添加一个reverse反转; @app.after_request def after_req(response): print('after_request, 后') return response @app.after_request def after_req1(response): print('after_request1, 后') return response @app.route('/x1/', methods=['GET', 'POST']) def x1(): print('视图函数X1') return 'X1' @app.route('/x2/', methods=['GET', 'POST']) def x2(): print('视图函数X2') return 'X2' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 17:22 # File : 2.基于Flask中内置的特殊装饰器作登陆验证.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, redirect, request, session app = Flask(__name__) app.secret_key = 'fdsjklfdjaslkjflas' # before_request和after_request相似于Django中的中间件middleware; @app.before_request def check_login(): if request.path == '/login/': return None user = session.get('user_info') if not user: return redirect('/login/') @app.route('/login/', methods=['GET', 'POST']) def login(): return 'Login' @app.route('/index/', methods=['GET', 'POST']) def index(): return 'Index' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 17:44 # File : 3.Flask中其余常见的装饰器.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, Request, render_template app = Flask(__name__, template_folder='templates') app.debug = True @app.before_first_request def before_first_request1(): print('before_first_request1') @app.before_first_request def before_first_request2(): print('before_first_request2') @app.before_request def before_request1(): Request.nnn = 123 print('before_request1') @app.before_request def before_request2(): print('before_request2') @app.after_request def after_request1(response): print('before_request1', response) return response @app.after_request def after_request2(response): print('before_request2', response) return response @app.errorhandler(404) def page_not_found(error): return 'This page does not exist', 404 @app.template_global() def sb(a1, a2): return a1 + a2 @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 @app.route('/') def hello_world(): return render_template('index.html') if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 18:03 # File : 1.Flask之消息闪现flush.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, session, flash, get_flashed_messages app = Flask(__name__) app.secret_key = 'fdjslkjflkafdaklfjdlakfj' # # 生成session; # @app.route('/x1/', methods=['GET', 'POST']) # def login(): # session['mgs'] = 'cuixiaozhao' # return '视图函数1' # # # # 销毁session;; # @app.route('/x2/', methods=['GET', 'POST']) # def index(): # msg = session.pop('msg') # print(msg) # return '视图函数2' # 消息闪现之flask生成session; @app.route('/x1/', methods=['GET', 'POST']) def login(): flash('cuixiaozhao', category='x1') flash('cuixiaozhao', category='x2') return '视图函数1' # 消息闪现之flask销毁session;; @app.route('/x2/', methods=['GET', 'POST']) def index(): data = get_flashed_messages(category_filter=['x1', 'x2']) print(data) return '视图函数2' if __name__ == '__main__': app.run()
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 18:13 # File : 1.Flask之中间件.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) app.secret_key = 'fjaljfdklajfkdasl' @app.route('/x2', methods=['GET', 'POST']) def index(): return 'x2' class MiddleWare(object): def __init__(self, old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, *args, **kwargs): print('before') obj = self.old_wsgi_app(*args, **kwargs) print('after') return obj if __name__ == '__main__': app.wsgi_app = MiddleWare(app.wsgi_app) app.run() """ 一、执行app.__call__方法; 二、在调用app.wsgi_app方法; """
简单来讲,Blueprint 是一个存储操做方法的容器,这些操做在这个Blueprint 被注册到一个应用以后就能够被调用,Flask 能够经过Blueprint来组织URL以及处理请求。Flask使用Blueprint让应用实现模块化,在Flask中,Blueprint具备以下属性:vue
from flask import Flask app = Flask(__name__) # @app.route('/index/') # def index(): # pass from .views import account from .views import admin from .views import user app.register_blueprint(account.ac) app.register_blueprint(admin.ad) app.register_blueprint(user.us)
# -*- coding:utf-8 -*- # Project: Pro_Flask # Software: PyCharm # Time : 2018-09-19 19:08 # File : account.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template # 蓝图Blueprint; from flask import Blueprint ac = Blueprint('ac', __name__) @ac.route('/login/') def login(): #return 'Login' return render_template('login.html') @ac.route('/logout/') def logout(): return 'Logout'
# -*- coding:utf-8 -*- # Project: Day123 # Software: PyCharm # Time : 2018-09-19 20:40 # File : 1.函数和方法的区别.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from types import MethodType, FunctionType class Foo(object): def fetch(self): pass print(isinstance(Foo.fetch, MethodType)) # False print(isinstance(Foo.fetch, FunctionType)) # True obj = Foo() print(obj.fetch) # <bound method Foo.fetch of <__main__.Foo object at 0x10bbf2358>> print(isinstance(obj.fetch, MethodType)) # False print(isinstance(Foo.fetch, FunctionType)) # True """ 类、对象、方法、函数能够➕(); 方法和函数的区别:被谁来调用! """
# -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-19 22:40 # File : 1.ThreadingLocal.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from threading import local from threading import Thread import time # 特殊的对象; xiaozhao = local() # xiaozhao = -1 def task(arg): # global xiaozhao xiaozhao.value = arg time.sleep(2) print(xiaozhao.value) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:11 # File : 3.自定义Local对象(基于函数).py # Author : 天晴天朗 # Email : tqtl@tqtl.org from threading import get_ident, Thread import time storage = {} def set(k, v): ident = get_ident() if ident in storage: storage[ident][k] = v storage[ident] = {k: v} def get(k): ident = get_ident() return storage[ident][k] def task(arg): set('val', arg) print(storage) time.sleep(2) v = get('val') print(v) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:20 # File : 4.自定义Local对象(基于面向对象).py # Author : 天晴天朗 # Email : tqtl@tqtl.org from threading import get_ident from threading import Thread class Local(object): storage = {} def set(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def get(self, k): ident = get_ident() return Local.storage[ident][k] obj = Local() def task(arg): obj.set('val', arg) v = obj.get('val') print(v) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:20 # File : 5.自定义Local对象(基于面向对象优化版 ).py # Author : 天晴天朗 # Email : tqtl@tqtl.org from threading import get_ident from threading import Thread class Local(object): def __setattr__(self, k, v): # self.storage = {} object.__setattr__(self, 'storage', {}) ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else: self.storage[ident] = {k: v} def __getattr__(self, item): ident = get_ident() return self.storage[ident][item] obj = Local() obj1 = Local() def task(arg): obj.val = arg obj1.val = arg print(obj.val) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:20 # File : 4.自定义Local对象(基于面向对象).py # Author : 天晴天朗 # Email : tqtl@tqtl.org from threading import get_ident from threading import Thread class Local(object): storage = {} def __setattr__(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def __getattr__(self, item): ident = get_ident() return Local.storage[ident][item] obj = Local() def task(arg): obj.val = arg print(obj.val) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:20 # File : 7.自定义Local对象(基于greenlet).py # Author : 天晴天朗 # Email : tqtl@tqtl.org try: from greenlet import getcurrent as get_ident except Exception as e: from threading import get_ident from threading import Thread class Local(object): storage = {} def __setattr__(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def __getattr__(self, item): ident = get_ident() return Local.storage[ident][item] obj = Local() def task(arg): obj.val = arg print(obj.val) for i in range(10): t = Thread(target=task, args=(i,)) t.start()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Threading # Software: PyCharm # Time : 2018-09-20 09:45 # File : 1.Flask源码分析上下文管理.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask app = Flask(__name__) @app.route('/') def hello_world(): return 'cuixiaozhao!' if __name__ == '__main__': app.__call__ app.run() "" """ 一、第一个阶段:将ctx(request,session)放到Local对象; 二、第二阶段:视图函数导入:request、session; 三、请求处理完毕: -获取session并保存到cookie; -将ctx删除; """
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 16:14 # File : 1.flask_session.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, session from flask_session import RedisSessionInterface app = Flask(__name__) app.secret_key = 'fdahfdafdajfalk' # 默认session保存操做; # from flask.sessions import SecureCookieSessionInterface # app.session_interface = SecureCookieSessionInterface() # 使用Redis保存session; from flask.ext.session import Session from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='192.168.0.94', port='6379') app.session_interface = RedisSessionInterface( redis=Redis(host='127.0.0.1', port=6379), key_prefix='flaskxxxx' ) @app.route('/login/') def login(): session['k1'] = 123 return 'Login' @app.route('/index/') def index(): v = session['k1'] print(v) return 'Index' if __name__ == '__main__': app.run()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 16:46 # File : __init__.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask from .views import account from .views import home def create_app(): app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') app.register_blueprint(account.account) app.register_blueprint(home.home) return app
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 16:46 # File : manage.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from Flask_Session import create_app app = create_app() if __name__ == '__main__': app.run()
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 16:53 # File : account.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Blueprint, render_template, request, session, redirect from uuid import uuid4 account = Blueprint('account', __name__) @account.route('/login/', methods=['GET', 'POST']) def login(): if request.method == "GET": return render_template("login.html") user = request.form.get('user') pwd = request.form.get('pwd') if user == "cxz" and pwd == "123": uid = str(uuid4()) session.permanent = True session['user_info'] = {'id': uid, 'name': user} return redirect('/index/') else: return render_template('login.html', msg='用户名或者密码错误!')
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 17:33 # File : home.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Blueprint, render_template, request, session, redirect home = Blueprint('home', __name__) @home.route('/index/', methods=['GET', 'POST']) def index(): user_info = session.get('user_info') # {'k1':1,'k2':2} print("原来的值", user_info) session['user_info']['k1'] = 19939 user_info = session.get('user_info') print("修改以后的值", user_info) # session['modified'] = True,在配置文件中使用SESSION_REFRESH_EACH_REQUEST代替; return 'Index' @home.route('/test/') def test(): user_info = session.get('user_info') print(user_info) return 'Test'
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 17:28 # File : settings.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from datetime import timedelta class Config(object): DEBUG = True TESTING = False DATABASE_URI = 'sqlite://:memory' SECRET_KEY = 'fjdksjfdasljflksd' PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST = True class ProductionConfig(Config): pass class DevelopmentConfig(Config): pass class TestingConfig(Config): pass
settings.py;html5
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 17:28 # File : settings.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from datetime import timedelta from redis import Redis class Config(object): DEBUG = True TESTING = False DATABASE_URI = 'sqlite://:memory' SECRET_KEY = 'fjdksjfdasljflksd' PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST = True SESSION_TYPE = "redis" # SESSION_REDIS = Redis(host='127.0.0.1',port='6379') class ProductionConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 16:46 # File : __init__.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask from .views import account from .views import home # from flask.ext.session import Session from flask_session import Session def create_app(): app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') app.register_blueprint(account.account) app.register_blueprint(home.home) # 将Session替换成Redis; Session(app) return app
app.config['SESSION_TYPE'] = 'redis'python
app.config['SESSION_REDIS'] = Redis(host = '127.0.0.1',port=6379)mysql
from flask_session import Sessionios
Session(app)web
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 20:13 # File : sql.py # Author : 天晴天朗 # Email : tqtl@tqtl.org import pymysql class SQLHelper(object): @staticmethod def open(): conn = pymysql.connect(host='mysql123.cuixiaozhao.com', port=3306, user='root', passwd='Tqtl911!@%*)', db='flask_session') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn, cursor @staticmethod def close(conn, cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls, sql, args): # cursor.execute("select id,name from users where name = %s and pwd = %s", ['cxz', '123', ]) conn, cursor = cls.open() cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn, cursor) return obj @classmethod def fetch_all(cls, sql, args): conn, cursor = cls.open() cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-20 19:53 # File : MySQL数据库练习.py # Author : 天晴天朗 # Email : tqtl@tqtl.org import pymysql conn = pymysql.connect(host='x.x.x.x', port=3306, user='root', passwd='Tqtl911!@%*)', db='flask_session') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute("select id,name from users where name = %s and pwd = %s", ['cxz', '123', ]) obj = cursor.fetchone() conn.commit() cursor.close() conn.close() print(obj)
DBUtils是Python的一个用于实现数据库链接池的模块。面试
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-21 10:26 # File : pool.py # Author : 天晴天朗 # Email : tqtl@tqtl.org import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection from manage import app POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块; maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数; mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立; maxcached=5, # 连接池中最多闲置的连接; maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: Useless无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错; maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制; setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='mysql.cuixiaozhao.com', port=3306, user='root', password='Tqtl911!@%*)', database='flask_session', charset='utf8' )
POOL = PersistentDB( creator=pymysql, # 使用连接数据库的模块 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 若是为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭连接。若是为True时, conn.close()则关闭连接,那么再次调用pool.connection时就会报错,由于已经真的关闭了链接(pool.steady_connection()能够获取一个新的连接) threadlocal=None, # 本线程独享值得对象,用于保存连接对象,若是连接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 检测当前正在运行链接数的是否小于最大连接数,若是不小于则:等待或报raise TooManyConnections异常 # 不然 # 则优先去初始化时建立的连接中获取连接 SteadyDBConnection。 # 而后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 若是最开始建立的连接没有连接,则去建立一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭连接后,链接就返回到链接池让后续线程继续使用。 conn = POOL.connection() # print(th, '连接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据经过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run() app.py
!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登陆</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
#!/usr/bin/python3 # -*- coding:utf-8 -*- # Project: Flask_Session # Software: PyCharm # Time : 2018-09-21 15:11 # File : register.py # Author : 天晴天朗 # Email : tqtl@tqtl.org from flask import Flask, render_template, request, redirect, Blueprint from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired('用户名不能为空!') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜爱', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有全部的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 再也不继续后续验证 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据经过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run() # app.py
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。redis