在线源码: GITHub : https://github.com/whyjust/fisherhtml
邮件发送:python
须要进行注册邮件发送或者功能模块须要发送邮件时能够采用:mysql
from threading import Thread from app import mail from flask import current_app,render_template from flask_mail import Message '''开启异步线程''' def send_async_mail(app,msg): '''mail.send发送须要获取上下文,所以添加with''' with app.app_context(): try: mail.send(msg) except Exception as e: raise e def send_mail(to, subject, template,**kwargs): ''' 发送邮件 :param to: 收件人 :param subject: 标题 :param template: 渲染模板 :param kwargs: 关键字参数 :return: ''' msg = Message('[鱼书]'+ '' +subject, sender=current_app.config['MAIL_USERNAME'], recipients=[to]) msg.html = render_template(template,**kwargs) ''' current_app是代理对象,开启新的线程时,咱们直接获取真实的app核心对象_get_current_object() ''' app = current_app._get_current_object() thr = Thread(target=send_async_mail,args=[app,msg]) thr.start()
状态标识git
状态的变动须要进行标识时(等待,成功,拒绝,撤销)github
from enum import Enum class PendingStatus(Enum): ''' 交易状态: 枚举方法实现 ''' Waiting = 1 Success = 2 Reject = 3 Redraw = 4 @classmethod def pending_str(cls,status,key): key_map = { cls.Waiting:{ 'requester':'等待对方邮寄', 'gifter':'等待你邮寄' }, cls.Success: { 'requester': '对方已邮寄', 'gifter': '你已邮寄,交易完成' }, cls.Reject: { 'requester': '对方已拒绝', 'gifter': '你已经拒绝' }, cls.Redraw: { 'requester': '你已撤销', 'gifter': '对方已撤销' } } return key_map[status][key]
模型定义的方法:web
1 password以hash方式存储ajax
2 用户信息生成tokensql
3 password的私有化数据库
from math import floor from werkzeug.security import generate_password_hash, check_password_hash from flask_login import UserMixin from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from app.libs.enums import PendingStatus from app.models.base import Base, db from sqlalchemy import Column, Integer, String, Boolean, Float from app import login_manager from app.models.drift import Drift from app.models.gift import Gift from app.models.wish import Wish from app.spider.yushu_book import YuShuBook from app.libs.helper import is_isbn_or_key from flask import current_app class User(Base, UserMixin): ''' 模型属性设置 , UserMixin 记录用户帐号的状态 ''' id = Column(Integer, primary_key=True) nickname = Column(String(24), nullable=False) _password = Column('password', String(128), nullable=True) phone_number = Column(String(18), unique=True) email = Column(String(50), unique=True, nullable=False) confirmed = Column(Boolean, default=False) beans = Column(Float, default=0) send_counter = Column(Integer, default=0) receive_counter = Column(Integer, default=0) wx_open_id = Column(String(50)) wx_name = Column(String(32)) ''' 将password方法hash加密只读并将其变为属性访问 ''' @property def password(self): return self._password @password.setter def password(self, raw): self._password = generate_password_hash(raw) def check_password(self, raw): return check_password_hash(self._password, raw) def generate_token(self, expiration=600): s = Serializer(current_app.config['SECRET_KEY'],expiration) return s.dumps({'id':self.id}).decode('utf-8') @staticmethod def reset_password(token,new_password): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except: return False uid = data.get('id') with db.auto_commit(): user = User.query.get(uid) if user: user.password = new_password return True @property def summary(self): return dict( nickname = self.nickname, beans = self.beans, email = self.email, send_receive = str(self.send_counter)+ '/' + str(self.receive_counter) ) @login_manager.user_loader def get_user(uid): ''' 继承UserMixin,进行用户的回调 :param uid: :return: ''' return User.query.get(int(uid))
方案二flask
from app.extensions import db,login_manager from werkzeug.security import generate_password_hash,check_password_hash #生成token的模块 from itsdangerous import TimedJSONWebSignatureSerializer as Seralize from flask_login import UserMixin from flask import current_app from .posts import Posts class User(UserMixin,db.Model): __tablename__ = 'user' id = db.Column('id',db.Integer,primary_key=True) username = db.Column(db.String(12),index=True) password_hash = db.Column(db.String(128)) sex = db.Column(db.Boolean,default=True) age = db.Column(db.Integer) email = db.Column(db.String(40)) icon = db.Column(db.String(70),default='default.jpg') #当期帐户激活状态 confirm = db.Column(db.Boolean,default=False) #参数1模型名称 参数2反向引用的字段名 参数3 加载方式 提供对象 posts = db.relationship('Posts',backref='user',lazy='dynamic') #secondary在多对多关系中指定关联表的名称 favorite = db.relationship('Posts',secondary='collections',backref=db.backref('users',lazy='dynamic'),lazy='dynamic') #添加使用append 删除使用remove @property def password(self): raise ValueError @password.setter def password(self, password): self.password_hash = generate_password_hash(password) #生成token的方法 def generate_token(self): s = Seralize(current_app.config['SECRET_KEY']) return s.dumps({'id':self.id}) #检测token的方法 @staticmethod def check_token(token): s = Seralize(current_app.config['SECRET_KEY']) #从当前的token中拿出字典 try: id = s.loads(token)['id'] except: return False #根据用户id取出对应用户的对象 u = User.query.get(id) #判断 当期u对象是否存在 if not u: return False #判断当期用户的激活状态 若是没有激活 则激活 if not u.confirm: u.confirm = True db.session.add(u) return True #验证密码 def check_password_hash(self,password): return check_password_hash(self.password_hash,password) def is_favorite(self,postsId): all = self.favorite.all() for p in all: if p.id==postsId: return True #lambda表达式 if list(filter(lambda p:p.id==int(postsId),all)): return True return False #登陆认证的回调,保持数据的一致性 @login_manager.user_loader def user_loader(uid): return User.query.get(int(uid))
1
extensions.py
负责三方模块的导入与2
app.__init__.py
负责create_app初始化3
view.__init__.py
负责蓝本的注册
方案一:
manage.py
from app import create_app from flask_script import Manager from flask_migrate import MigrateCommand #经过函数create_app进行包括蓝本/扩展/系统配置的初始化 app = create_app('default') manager = Manager(app) #给manage添加迁移命令db manager.add_command('db',MigrateCommand) if __name__ == '__main__': manager.run()
extensions.py
#extensions.py from flask_bootstrap import Bootstrap from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_mail import Mail from flask_login import LoginManager from flask_uploads import UploadSet,IMAGES,patch_request_class,configure_uploads from flask_moment import Moment from flask_cache import Cache #实例化db db = SQLAlchemy() #实例化bootstrap bootstrap = Bootstrap() #实例化migrate migrate = Migrate(db=db) #实例化邮箱 mail = Mail() #实例化用户登陆模块 login_manager = LoginManager() #实例化文件对象 file = UploadSet('photos',IMAGES) moment = Moment() #simple简单缓存 # cache = Cache(config={'CACHE_TYPE':'simple'}) cache = Cache(config={'CACHE_TYPE': 'simple'}) def config_extensions(app): #bootstrap初始化app bootstrap.init_app(app) #db初始化app db.init_app(app) #migrate初始化app migrate.init_app(app=app) #mail初始化 mail.init_app(app) #登陆模块初始化 login_manager.init_app(app) #moment时间模块初始化 moment.init_app(app) cache.init_app(app=app) #须要指定登陆端点 login_manager.login_view ='user.login' #提示信息 login_manager.login_message = '请登陆再访问' #设置session保护级别 login_manager.session_protection = 'strong' #配置文件上传 configure_uploads(app,file) patch_request_class(app,size=None)
app.__init__.py
#app.__init__.py from flask import Flask,render_template from app.settings import config from app.extensions import config_extensions from app.views import config_blueprint #初始化当前整个应用的函数 def create_app(config_name): app = Flask(__name__) #导入settings配置信息 app.config.from_object(config[config_name]) #第三方库初始化 config_extensions(app) #注册全部的蓝本函数 config_blueprint(app) #错误页面绑定app errors(app) return app def errors(app): @app.errorhandler(404) def page_not_found(e): return render_template('errors/error.html',error=e) @app.errorhandler(500) def page_not_found(e): return render_template('errors/error.html', error=e)
view.__init__.py
#view.__init__.py from .main import main from .user import user from .posts import posts BluePrint = [ (main,''), (user,''), (posts,'') ] #封装注册蓝本的函数 def config_blueprint(app): #循环注册蓝本 for blueprint,prefix in BluePrint: app.register_blueprint(blueprint,url_prefix=prefix)
方案二
fisher.py
from app import create_app app = create_app() if __name__ == '__main__': from werkzeug.contrib.fixers import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app) app.run()
app.__init__.py
#app.__init__.py '''建立flask核心对象''' from flask import Flask from app.models.base import db from flask_login import LoginManager from flask_mail import Mail from app.libs.limiter import Limiter login_manager = LoginManager() mail = Mail() limiter = Limiter() def create_app(): ''' 系统配置与蓝图须要绑定app :return: ''' app = Flask(__name__) app.config.from_object('app.secure') app.config.from_object('app.setting') register_blueprint(app) db.init_app(app) login_manager.init_app(app) login_manager.login_view = 'web.login' login_manager.login_message = '请先登陆或者注册' mail.init_app(app) with app.app_context(): db.create_all() return app #注册蓝本 def register_blueprint(app): from app.web.book import web app.register_blueprint(web)
view.__init__.py
#view.__init__.py from flask import Blueprint,render_template '''蓝图 blueprint ''' web = Blueprint('web',__name__) #__name__表明蓝图所在模块 @web.app_errorhandler(404) def not_found(e): ''' AOP: 处理全部的404请求 ''' return render_template('404.html'),404 @web.app_errorhandler(500) def internal_server_error(e): ''' AOP: 处理全部的500请求 ''' return render_template('500.html'),500 #在此处导入表明先初始化在导入应用 #防止循环调用的问题 from app.web import book from app.web import auth from app.web import drift from app.web import gift from app.web import main from app.web import wish
settings配置
1 根据不一样等级,配置系统设置
2 配置数据库的链接
3 配置不一样环境下的生产
方案一
import os base_path = os.path.abspath(os.path.dirname(__file__)) #配置全部环境的基类 class Config: SECRET_KEY = 'secret_key' SQLALCHEMY_COMMIT_ON_TEARDOWN = True SQLALCHEMY_TRACK_MODIFICATIONS = False MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.163.com') MAIL_USERNAME = os.environ.get('MAIL_USERNAME', 'xxx@163.com') MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD', 'xxx') #配置上传文件 MAX_CONTENT_LENGTH = 1024*1024*64 UPLOADED_PHOTOS_DEST = os.path.join(base_path,'static/upload') PAGE_NUM = 3 #测试 class TestingConfig(Config): SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog' #开发 class DevelopmentConfig(Config): SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog' # SQLALCHEMY_DATABASE_URI = 'sqlite:///'+ os.path.join(base_path,'develop.sqlite') #生产 class ProductionConfig(Config): SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog' #设置字典 config = { 'development':DevelopmentConfig, 'production':ProductionConfig, 'test':TestingConfig, 'default':DevelopmentConfig }
方案二
settings.py
''' 配置文件: 生产环境与开发环境相同的配置,setting能够上传git ''' PER_PAGE = 15 BEANS_UPLOAD_ONE_BOOK = 0.5 RECENT_BOOK_COUNT = 20
secure.py
''' 配置文件:保存单独的加密信息,secure不要上传git ''' DEBUG = True # 单数据库 SQLALCHEMY_DATABASE_URI = 'mysql+cymysql://root:xxx@xxx:3306/fisher' SECRET_KEY = '\SAAFsdfsdf:sdadzxcsd,./.dasdafasd' #Email相关配置 MAIL_SERVER = 'smtp.163.com' MAIL_PORT = 465 MAIL_USE_SSL = True MAIL_USE_TSL = False MAIL_USERNAME = 'xxx@163.com' MAIL_PASSWORD = 'xxx'
def calulate_start(self,page): return (page-1)* current_app.config.get('PER_PAGE')
模型类的设置
1 建立基类继承db.Model
2 db实例化经过继承添加容错自动提交数据库功能
3 封装了快速设置属性功能
base.py
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery from sqlalchemy import Column, Integer, SmallInteger from contextlib import contextmanager from datetime import datetime class SQLAlchemy(_SQLAlchemy): ''' 封装了数据库的自动提交回滚 ''' @contextmanager def auto_commit(self): try: yield self.session.commit() except Exception as e: self.session.rollback() raise e class Query(BaseQuery): ''' 自定义基类(继承,初始化),重写filter_by方法 ''' def filter_by(self, **kwargs): if 'status' not in kwargs.keys(): kwargs['status'] = 1 return super(Query, self).filter_by(**kwargs) db = SQLAlchemy(query_class=Query) class Base(db.Model): ''' 该模型表不想在数据库建立,添加__abstract__ = True不会建立该表 ''' __abstract__ = True '''类变量在类开始的时候就已经肯定了''' create_time = Column('create_time', Integer) status = Column(SmallInteger, default=1) def __init__(self): '''实例变量保证建立时间的准确性''' self.create_time = int(datetime.now().timestamp()) def delete(self): self.status = 0 def set_attrs(self, attrs_dict): for key, value in attrs_dict.items(): if hasattr(self, key) and key != 'id': setattr(self, key, value) @property def create_datetime(self): '''时间格式统一,将方法转换成属性调用''' if self.create_time: return datetime.fromtimestamp(self.create_time) else: return None