npm install -g @tarojs/cli
css
taro init taro-login
html
cd taro-login
前端
npm run dev:weapp
python
... class App extends Component { config = { pages: [ 'pages/user/user', // new 'pages/index/index', ],
若是咱们须要用户一进入就取得用户的受权,以便于进行某些记录用户信息的操做,而微信又要求用户去点页面上的某个按钮才能获取信息,那怎么办呢?只能把一个按钮放在用户不能不点的地方,那就只有弹窗了。微信wx.showModal
不能知足咱们的需求,只能本身造一个,在用户第一次进来的时候弹窗,再次进来的时候则不显示。为了让这个组件具备拓展性,咱们根据传入的值来修改确认
位置按钮的属性,若是是受权的弹窗就改按钮属性为openType='getUserInfo'
。(摘自 Taro 多端开发实现原理与项目实战)
import Taro, { Component } from '@tarojs/taro' import { View, Button } from '@tarojs/components' import './modal.scss' class Modal extends Component { constructor() { super(...arguments) this.state = {} } onConfirmClick = () => { this.props.onConfirmCallback() } onCancelClick = () => { this.props.onCancelCallback() } onAuthConfirmClick = (e) => { this.props.onConfirmCallback(e.detail) } preventTouchMove = (e) => { e.stopPropagation() } render() { const { title, contentText, cancelText, confirmText, isAuth } = this.props return ( <View className='toplife_modal' onTouchMove={this.preventTouchMove}> <View className='toplife_modal_content'> <View className='toplife_modal_title'>{title}</View> <View className='toplife_modal_text'>{contentText}</View> <View className='toplife_modal_btn'> <Button className='toplife_modal_btn_cancel' onClick={this.onCancelClick}>{cancelText}</Button> {!isAuth ? <Button className='toplife_modal_btn_confirm' onClick={this.onConfirmClick}>{confirmText}</Button> : <Button className='toplife_modal_btn_confirm' openType='getUserInfo' onGetUserInfo={this.onAuthConfirmClick}>受权</Button>} </View> </View> </View> ) } } Modal.defaultProps = { title: '', contentText: '', cancelText: '取消', confirmText: '肯定', isAuth: false, onCancelCallback: () => { }, onConfirmCallback: () => { } } export default Modal
Modal
组件还算比较简单,组件的属性:mysql
字段 | 说明 |
---|---|
title | 提示的标题 |
contentText | 提示的描述 |
cancelText | 取消按钮的文案 |
cancelCallback | 取消回调的函数 |
confirmText | 确认按钮的文案 |
confirmCallback | 确认回调函数 |
isAuth | 标记是否为受权按钮 |
在内部设置了一个函数preventTouchMove
,其做用是弹窗出现蒙层的时候,阻止在蒙版上的滑动手势onTouchMove
。另一个函数authConfirmClick
, 当isAuth
为真时,确认按钮为取得我的信息的受权按钮,此时把我的信息当值传递给调用的函数。(摘自 Taro 多端开发实现原理与项目实战)
/*postcss-pxtransform rn eject enable*/ .toplife_modal { position: fixed; width: 100%; height: 100%; left: 0; top: 0; background-color: rgba(0, 0, 0, .8); z-index: 100; &_content { position: absolute; left: 50%; top: 50%; width: 600px; height: 320px; transform: translate(-50%, -50%); background-color: #fff; color: #232321; text-align: center; border-radius: 30px; } &_title { margin-top: 40px; font-size: 32px; } &_text { margin-top: 40px; font-size: 24px; } &_btn { position: absolute; bottom: 0; left: 0; width: 100%; height: 88px; border-top: 2px solid #eee; &_cancel { color: #8c8c8c; border-radius: 0; border: 0; border-right: 2px solid #eee; border-bottom-left-radius: 30px; } &_confirm { color: #666; border-radius: 0; border: 0; border-bottom-right-radius: 30px; } button { display: block; float: left; width: 50%; height: 88px; text-align: center; line-height: 88px; font-size: 32px; box-sizing: border-box; background-color: #fff; &::after { border: 0; } } } }
user.js
中引用该Modal
组件import Taro, { Component } from '@tarojs/taro'; import { View, Image, Text } from '@tarojs/components'; import classnames from 'classnames' import Modal from '../../components/modal/modal'; import { setGlobalData } from '../../utils/globalData'; import { getUserInfo, getIsAuth } from '../../utils/getUser'; class Info extends Component { config = { navigationBarTitleText: 'TARO商城', enablePullDownRefresh: true, backgroundTextStyle: 'dark', disableScroll: true } constructor() { super(...arguments) this.state = { animationClass: '', showAuthModal: false, shouldIndexHidden: false, } this.env = process.env.TARO_ENV } hideAuthModal() { this.setState({ showAuthModal: false }) Taro.setStorage({ key: 'isHomeLongHideAuthModal', data: true }) } onProcessAuthResult = (userData) => { Taro.setStorage({ key: 'isHomeLongHideAuthModal', data: true }) if (userData.userInfo) { setGlobalData('userData', userData) } this.setState({ showAuthModal: false }) getIsAuth() } async onPullDownRefresh() { if (this.state.shouldIndexHidden) { Taro.stopPullDownRefresh() // 中止下拉刷新 } else { await this.props.onFetchIndexList() Taro.stopPullDownRefresh() // 中止下拉刷新 } } componentDidMount() { if (this.env === 'weapp') { // 用类名来控制动画 setTimeout(async () => { const userData = await getUserInfo(); Taro.getStorage({ key: 'isHomeLongHideAuthModal', success: (res) => { const isHomeLongHideAuthModal = res.data; let showAuthModal if (!userData && !this.state.showAuthModal && !isHomeLongHideAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: 'animation', showAuthModal }) }, fail: () => { let showAuthModal if (!userData && !this.state.showAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: 'animation', showAuthModal }) } }) }, 1000) getIsAuth() } else if (this.env === 'h5' || this.env === 'rn') { console.log('h5登陆') } } render() { const { animationClass, shouldIndexHidden, showAuthModal } = this.state const { loginname, avatar_url } = this.props; const indexClassNames = classnames('container', 'index', animationClass, { hidden: shouldIndexHidden }) return ( <View className={indexClassNames}> <View className='login-head'> <Image className='login-head-back' src={require('../../assets/img/loginBack.jpg')} /> <Image className='login-head-head' src={avatar_url ? avatar_url : require('../../assets/img/head.png')} /> {loginname ? <Text classnames='login-head-name'>{loginname}</Text> : null} </View> {showAuthModal && <Modal title='受权提示' contentText='诚邀您完成受权,尊享畅游体验' onCancelCallback={this.hideAuthModal.bind(this)} onConfirmCallback={this.onProcessAuthResult.bind(this)} isAuth />} </View> ) } } export default Info
咱们是如何保证这个应用只有一次受权弹窗呢? 关键代码是Taro.setStorageSync('isHomeLongHideAuthModal', true)
,若是弹出了一次,就在本地存一个标记已经弹过受权框,下一次弹窗以前能够根据此判断。至此咱们完成了受权处理,但若是能够的话仍是要优雅一些,在须要的时候才征求用户受权,保证用户体验。(摘自Taro 多端开发实现原理与项目实战)web
/src/utils/globalData.jssql
const globalData = {} export function setGlobalData(key, val) { globalData[key] = val } export function getGlobalData(key) { return globalData[key] }
/src/utils/request.jsshell
import Taro from '@tarojs/taro'; import '@tarojs/async-await'; export function getJSON(url, data) { Taro.showLoading(); return Taro.request({ url: url, data: data, method: 'GET' }).then(result => { Taro.hideLoading(); return result; }) } export function postJSON(url, data) { Taro.showLoading() return Taro.request({ header: { 'content-type': 'application/json' }, url: url, data: data, method: 'POST' }).then(result => { Taro.hideLoading(); return result; }); }
/src/constants/api数据库
const rootPath = 'http://127.0.0.1:5000/v1'; const apiObject = { registerclient: rootPath + '/client/register', //注册用户 getusertoken: rootPath + '/token', // 登陆成功以后获取用户token checkusertoken: rootPath + '/token/secret', //验证用户token getuserinfo: rootPath + '/user', //获取用户信息 } export default apiObject;
/src/utils/getUser.jsnpm
import Taro from '@tarojs/taro' import { getGlobalData } from './globalData' import api from '../constants/api'; import { postJSON } from '../utils/request'; async function getUserInfo() { const userData = getGlobalData('userData') if (userData) { return userData } try { const _userData = await Taro.getUserInfo() return _userData } catch (err) { console.log(err) console.log('微信登陆或用户接口故障') return null } } async function getIsAuth() { const loginRes = await Taro.login() let { userInfo } = await getUserInfo() let isAuth = false if (userInfo) { // 使用微信注册新用户 let result = await postJSON(api.registerclient, { "avatar": userInfo.avatarUrl, "sex": userInfo.gender, "nickname": userInfo.nickName, "account": loginRes.code, "type": 200 }); if (result.data.error_code == 0) { // 登陆用户,获取token,缓存到前端 const tokenRes = await Taro.login() let auth_token = await postJSON(api.getusertoken, { "account": tokenRes.code, "type": 200 }) if (auth_token.statusCode == 201) { Taro.setStorage({ key: 'token', data: auth_token.data.token })// 设置到缓存 Taro.showToast({ title: '受权成功' }) userInfo.isAuth = true isAuth = true } } else { Taro.showToast({ title: '受权失败,请稍后再试', icon: 'none' }) } } else { userInfo = { isAuth: false } } console.log('isAuth: ', isAuth) return isAuth } export { getUserInfo, getIsAuth }
├── app │ ├── __init__.py │ ├── api │ │ ├── __init__.py │ │ └── v1 │ │ ├── __init__.py │ │ ├── client.py │ │ ├── token.py │ │ └── user.py │ ├── apps.py │ ├── config │ │ ├── secure.py │ │ └── settings.py │ ├── libs │ │ ├── enums.py │ │ ├── error.py │ │ ├── error_code.py │ │ ├── format_time.py │ │ ├── get_openid.py │ │ ├── redprint.py │ │ ├── scope.py │ │ └── token_auth.py │ ├── models │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-36.pyc │ │ │ ├── base.cpython-36.pyc │ │ │ └── user.cpython-36.pyc │ │ ├── base.py │ │ └── user.py │ └── validators │ ├── __init__.py │ ├── base.py │ └── forms.py ├── manage.py └── requirements.txt
requirements.txt
Flask Flask-SQLAlchemy psycopg2-binary cymysql Flask-Testing coverage flake8 flask-debugtoolbar flask-cors flask-migrate flask-bcrypt pyjwt gunicorn requests flask-httpauth flask-wtf
app
目录和__init__.py
文件和apps.py
# File: /app/apps.py # -*- coding: utf-8 -*- from flask import Flask as _Flask from flask.json import JSONEncoder as _JSONEncoder from app.libs.error_code import ServerError from datetime import date class JSONEncoder(_JSONEncoder): def default(self, o): if hasattr(o, 'keys') and hasattr(o, '__getitem__'): return dict(o) if isinstance(o, date): return o.strftime('%Y-%m-%d') raise ServerError() class Flask(_Flask): json_encoder = JSONEncoder
# File: /app/__init__.py # -*- coding: utf-8 -*- from .apps import Flask from flask_debugtoolbar import DebugToolbarExtension from flask_cors import CORS from flask_migrate import Migrate from flask_bcrypt import Bcrypt from app.models.base import db # instantiate 实例化 toolbar = DebugToolbarExtension() migrate = Migrate(db=db) bcrypt = Bcrypt() def create_app(): # instantiate the app app = Flask(__name__) # enable CORS CORS(app) # set config app.config.from_object('app.config.settings') app.config.from_object('app.config.secure') # set up extensions toolbar.init_app(app) migrate.init_app(app, db) bcrypt.init_app(app) # register blueprints register_blueprints(app) register_plugin(app) # shell context for flask cli @app.shell_context_processor def ctx(): return {'app': app, 'db': db} return app def register_blueprints(app): from app.api.v1 import create_blueprint_v1 app.register_blueprint(create_blueprint_v1(), url_prefix='/v1') def register_plugin(app): db.init_app(app) with app.app_context(): db.create_all()
/app/config/目录,在这个目录下新建两个文件settings.py和secure.py
# File: /app/config/settings.py # -*- coding: utf-8 -*- # TOKEN_EXPIRATION = 30 * 24 * 3600 DEBUG = 'true' TOKEN_EXPIRATION_DAYS = 30 TOKEN_EXPIRATION_SECONDS = 0 # encryption的复杂程度,默认值为12 BCRYPT_LOG_ROUNDS = 4
# File: /app/config/secure.py # -*- coding: utf-8 -*- SQLALCHEMY_DATABASE_URI = \ 'mysql+cymysql://root:root1234@localhost/flask-rest' SECRET_KEY = '***' SQLALCHEMY_TRACK_MODIFICATIONS = True MINA_APP = { 'AppID': '***', 'AppSecret': '***' }
manage.py
#File: /manage.py # -*- coding: utf-8 -*- from werkzeug.exceptions import HTTPException from app import create_app from app.libs.error import APIException from app.libs.error_code import ServerError app = create_app() @app.errorhandler(Exception) def framework_error(e): """全局拦截异常""" if isinstance(e, APIException): return e if isinstance(e, HTTPException): code = e.code msg = e.description error_code = 1007 return APIException(msg, code, error_code) else: if app.config['DEBUG']: return ServerError() else: raise e if __name__ == '__main__': app.run()
新建文件夹 /app/libs/
#File: /app/libs/error.py # -*- coding: utf-8 -*- """ 自定义错误文件 """ from flask import request, json from werkzeug.exceptions import HTTPException class APIException(HTTPException): """自定义api请求错误,返回的json格式""" code = 500 msg = '抱歉,后台发生了错误 (* ̄︶ ̄)!' error_code = 999 def __init__(self, msg=None, code=None, error_code=None, headers=None): if code: self.code = code if error_code: self.error_code = error_code if msg: self.msg = msg super(APIException, self).__init__(msg, None) def get_body(self, environ=None): body = dict( msg=self.msg, error_code=self.error_code, request=request.method + ' ' + self.get_url_no_param() ) text = json.dumps(body) return text def get_headers(self, environ=None): return [('Content-Type', 'application/json')] @staticmethod def get_url_no_param(): full_path = str(request.full_path) main_path = full_path.split('?') return main_path[0]
#File: /app/libs/error_code.py # -*- coding: utf-8 -*- from werkzeug.exceptions import HTTPException from app.libs.error import APIException class Success(APIException): code = 201 msg = 'success' error_code = 0 class DeleteSuccess(Success): code = 202 error_code = 1 class ServerError(APIException): code = 500 msg = '抱歉,后台发生了错误 (* ̄︶ ̄)!' error_code = 999 class ClientTypeError(APIException): code = 400 msg = '未检测到客户端类型' error_code = 1006 class ParameterException(APIException): code = 400 msg = '无效参数' error_code = 1000 class NotFound(APIException): code = 404 msg = '没有找到对应的资源 O__O...' error_code = 1001 class AuthFailed(APIException): code = 401 error_code = 1005 msg = '认证失败' class Forbidden(APIException): code = 403 error_code = 1004 msg = '禁止访问,不在对应权限内' class SingleLogin(APIException): code = 400 error_code = 2002 msg = '请从新登陆' class DuplicateAct(APIException): code = 400 error_code = 2001 msg = '请勿重复操做'
#File: /app/libs/redprint.py # -*- coding: utf-8 -*- class Redprint: def __init__(self, name): self.name = name self.mound = [] def route(self, rule, **options): def decorator(f): self.mound.append((f, rule, options)) return f return decorator def register(self, bp, url_prefix=None): if url_prefix is None: url_prefix = '/' + self.name for f, rule, options in self.mound: endpoint = self.name + '+' + \ options.pop("endpoint", f.__name__) bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
#File: /app/api/v1/__init__.py # -*- coding: utf-8 -*- from flask import Blueprint from app.api.v1 import user, client, token def create_blueprint_v1(): bp_v1 = Blueprint('v1', __name__) user.api.register(bp_v1) client.api.register(bp_v1) token.api.register(bp_v1) return bp_v1
client.py
#File: /app/api/v1/client.py # -*- coding: utf-8 -*- from app.libs.error_code import Success, ParameterException from app.libs.redprint import Redprint from app.models.user import User from app.validators.forms import ClientForm, UserEmailForm, MinaForm from app.libs.enums import ClientTypeEnum from app.libs.get_openid import get_openid api = Redprint('client') @api.route('/register', methods=['POST']) def create_client(): form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: __register_user_by_email, ClientTypeEnum.USER_MINA: __register_user_by_mina, } promise[form.type.data]() return Success() def __register_user_by_email(): form = UserEmailForm().validate_for_api() User.register_by_email(form.nickname.data, form.account.data, form.secret.data) def __register_user_by_mina(): form = MinaForm().validate_for_api() account = get_openid(form.account.data) if account is None: raise ParameterException else: User.register_by_mina(form.nickname.data, account, form.sex.data, form.avatar.data)
token.py
#File: /app/api/v1/token.py # -*- coding: utf-8 -*- import jwt import datetime from flask import current_app, jsonify from app.libs.enums import ClientTypeEnum from app.libs.error_code import AuthFailed from app.libs.redprint import Redprint from app.models.user import User from app.validators.forms import ClientForm, TokenForm from app.libs.format_time import get_format_timestamp api = Redprint('token') @api.route('', methods=['POST']) def get_token(): """登陆功能,认证成功返回token""" form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: User.verify, ClientTypeEnum.USER_MINA: User.mina_login, } identity = promise[ClientTypeEnum(form.type.data)]( form.account.data, form.secret.data ) # Token token = generate_auth_token(identity['uid'], form.type.data, identity['login_time'], identity['scope']) t = {'token': token.decode('ascii')} return jsonify(t), 201 @api.route('/secret', methods=['POST']) def get_token_info(): """获取令牌信息""" form = TokenForm().validate_for_api() auth_token = form.token.data try: data = jwt.decode(auth_token, current_app.config['SECRET_KEY']) except jwt.ExpiredSignatureError: raise AuthFailed(msg='token is expired', error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg='token is invalid', error_code=1002) r = { 'scope': data['scope'], 'create_at': get_format_timestamp(data['iat']), 'expire_in': get_format_timestamp(data['exp']), 'uid': data['uid'], 'login_time': get_format_timestamp(data['login_time']) } return jsonify(r) def generate_auth_token(uid, ac_type, login_time, scope=None): """生成令牌""" try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta( days=current_app.config['TOKEN_EXPIRATION_DAYS'], seconds=current_app.config['TOKEN_EXPIRATION_SECONDS'], ), 'iat': datetime.datetime.utcnow(), 'uid': uid, 'type': ac_type.value, 'login_time': login_time, 'scope': scope, } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) except Exception as e: return e
user.py
#File: /app/api/v1/user.py # -*- coding: utf-8 -*- from flask import jsonify, g from app.libs.error_code import DeleteSuccess from app.libs.redprint import Redprint from app.libs.token_auth import auth from app.models.base import db from app.models.user import User api = Redprint('user') @api.route('/<int:uid>', methods=['GET']) @auth.login_required def super_get_user(uid): user = User.query.filter_by(id=uid).first_or_404() return jsonify(user) @api.route('', methods=['GET']) @auth.login_required def get_user(): uid = g.user.uid user = User.query.filter_by(id=uid).first_or_404() return jsonify(user) @api.route('/<int:uid>', methods=['DELETE']) def super_delete_user(uid): with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess() @api.route('', methods=['DELETE']) @auth.login_required def delete_user(): uid = g.user.uid with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess() @api.route('', methods=['PUT']) def update_user(): return 'update'
models
#File: /app/models/base.py # -*- coding: utf-8 -*- from datetime import datetime from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery from sqlalchemy import inspect, Column, Integer, SmallInteger, orm from contextlib import contextmanager from app.libs.error_code import NotFound class SQLAlchemy(_SQLAlchemy): @contextmanager def auto_commit(self): try: yield self.session.commit() except Exception as e: db.session.rollback() raise e class Query(BaseQuery): def filter_by(self, **kwargs): if 'status' not in kwargs.keys(): kwargs['status'] = 1 return super(Query, self).filter_by(**kwargs) def get_or_404(self, ident): rv = self.get(ident) if not rv: raise NotFound() return rv def first_or_404(self): rv = self.first() if not rv: raise NotFound() return rv db = SQLAlchemy(query_class=Query) class Base(db.Model): __abstract__ = True create_time = Column(Integer) status = Column(SmallInteger, default=1) def __init__(self): self.create_time = int(datetime.now().timestamp()) def __getitem__(self, item): return getattr(self, item) @property def create_datetime(self): if self.create_time: return datetime.fromtimestamp(self.create_time) else: return None def set_attrs(self, attrs_dict): for key, value in attrs_dict.items(): if hasattr(self, key) and key != 'id': setattr(self, key, value) def delete(self): """删除用户,注销用户""" self.status = 0 def active(self): """激活用户""" self.status = 1 def update(self): """更新数据库的表内容""" try: db.session.commit() except Exception as e: db.session.rollback() return str(e) def keys(self): return self.fields def hide(self, *keys): for key in keys: self.fields.remove(key) return self def append(self, *keys): for key in keys: self.fields.append(key) return self class MixinJSONSerializer: @orm.reconstructor def init_on_load(self): self._fields = [] # self._include = [] self._exclude = [] self._set_fields() self.__prune_fields() def _set_fields(self): pass def __prune_fields(self): columns = inspect(self.__class__).columns if not self._fields: all_columns = set(columns.keys()) self._fields = list(all_columns - set(self._exclude)) def hide(self, *args): for key in args: self._fields.remove(key) return self def keys(self): return self._fields def __getitem__(self, key): return getattr(self, key)
#File: /app/models/user.py # -*- coding: utf-8 -*- from datetime import datetime from flask import current_app from sqlalchemy import Column, Integer, String, SmallInteger from app import bcrypt from app.libs.error_code import AuthFailed from app.models.base import Base, db from app.libs.format_time import get_current_timestamp from app.libs.get_openid import get_openid from app.libs.error_code import ParameterException class User(Base): id = Column(Integer, primary_key=True) nickname = Column(String(24), unique=True) email = Column(String(24), unique=True) mobile = Column(String(11), unique=True) sex = Column(Integer, default=0) # 1男2女 avatar = Column(String(200)) # 头像 register_ip = Column(String(100)) # 注册ip auth = Column(SmallInteger, default=1) # 权限 openid = Column(String(80), unique=True) _password = Column('password', String(100)) login_time = Column(Integer, default=int(datetime.now().timestamp())) @property def login_datetime(self): if self.login_time: return datetime.fromtimestamp(self.login_time) else: return None def keys(self): return ['id', 'nickname', 'email', 'auth'] @property def password(self): return self._password @password.setter def password(self, raw): self._password = bcrypt.generate_password_hash( raw, current_app.config['BCRYPT_LOG_ROUNDS']).decode('utf-8') @staticmethod def register_by_email(nickname, account, secret): """经过邮箱注册""" with db.auto_commit(): user = User() user.nickname = nickname user.email = account user.password = secret db.session.add(user) @staticmethod def verify(email, password): """经过邮箱登陆""" user = User.query.filter_by(email=email).first_or_404() if not user.check_password(password): raise AuthFailed() scope = 'AdminScope' if user.auth == 2 else 'UserScope' login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {'uid': user.id, 'scope': scope, 'login_time': login_time} def check_password(self, raw): if not self._password: return False return bcrypt.check_password_hash(self._password, raw) @staticmethod def register_by_mina(nickname, account, sex, avatar): """经过小程序注册""" with db.auto_commit(): user = User() user.nickname = nickname user.openid = account user.sex = sex user.avatar = avatar db.session.add(user) @staticmethod def mina_login(account, secret): """经过小程序登陆""" openid = get_openid(account) # 经过code来来获取openid if openid is None: raise ParameterException user = User.query.filter_by(openid=openid).first_or_404() scope = 'AdminScope' if user.auth == 2 else 'UserScope' login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {'uid': user.id, 'scope': scope, 'login_time': login_time}
# File: /app/libs/enums.py # -*- coding: utf-8 -*- from enum import Enum class ClientTypeEnum(Enum): USER_EMAIL = 100 USER_MOBILE = 101 # 微信小程序 USER_MINA = 200 # 微信公众号 USER_WX = 201
#File: /app/libs/format_time.py # -*- coding: utf-8 -*- import datetime def get_current_date(): """获取当前时间""" return datetime.datetime.now() def get_current_timestamp(): """获取当前时间的时间戳""" return int(datetime.datetime.now().timestamp()) def get_format_date(date=None, format_time="%Y-%m-%d %H:%M:%S"): """获取格式化时间""" if date is None: date = datetime.datetime.now() return date.strftime(format_time) def get_format_timestamp(date=None, format_time="%Y-%m-%d %H:%M:%S"): """格式化时间戳""" if date is None: date = datetime.datetime.now() return datetime.datetime.fromtimestamp(date).strftime(format_time)
#File: /app/libs/get_openid.py # -*- coding: utf-8 -*- import requests import json from flask import current_app def get_openid(code): api = 'https://api.weixin.qq.com/sns/jscode2session' params = 'appid={0}&secret={1}&js_code={2}&grant_type=authorization_code' \ .format(current_app.config['MINA_APP']['AppID'], current_app.config['MINA_APP']['AppSecret'], code) url = api + '?' + params response = requests.get(url=url) res = json.loads(response.text) openid = None if 'openid' in res: openid = res['openid'] return openid
scope.py
权限管理函数#File: /app/libs/scope.py # -*- coding: utf-8 -*- class Scope: allow_api = [] allow_module = [] forbidden = [] def __add__(self, other): """重载加号运算符""" self.allow_api = self.allow_api + other.allow_api self.allow_api = list(set(self.allow_api)) self.allow_module = self.allow_module + other.allow_module self.allow_module = list(set(self.allow_module)) self.forbidden = self.forbidden + other.forbidden self.forbidden = list(set(self.forbidden)) return self class AdminScope(Scope): allow_module = ['v1.user'] def __init__(self): pass class UserScope(Scope): forbidden = ['v1.user+super_get_user', 'v1.user+super_delete_user'] def __init__(self): self + AdminScope() def is_in_scope(scope, endpoint): # 把类名的字符串实例化 scope = globals()[scope]() splits = endpoint.split('+') red_name = splits[0] if endpoint in scope.forbidden: return False if endpoint in scope.allow_api: return True if red_name in scope.allow_module: return True else: return False
#File: /app/libs/token_auth.py # -*- coding: utf-8 -*- import jwt from collections import namedtuple from flask import current_app, g, request from flask_httpauth import HTTPBasicAuth from app.models.user import User as _User from app.libs.scope import is_in_scope from app.libs.error_code import AuthFailed, Forbidden, SingleLogin auth = HTTPBasicAuth() User = namedtuple('User', ['uid', 'ac_type', 'scope', 'login_time']) @auth.verify_password def verify_password(token, password): user_info = verify_auth_token(token) if not user_info: return False else: g.user = user_info return True def verify_auth_token(token): try: data = jwt.decode(token, current_app.config['SECRET_KEY']) except jwt.ExpiredSignatureError: raise AuthFailed(msg='token is expired', error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg='token is invalid', error_code=1002) uid = data['uid'] ac_type = data['type'] scope = data['scope'] login_time = data['login_time'] user = _User.query.filter_by(id=uid).first_or_404() if login_time != user.login_time: raise SingleLogin() # request 视图函数 allow = is_in_scope(scope, request.endpoint) if not allow: raise Forbidden() return User(uid, ac_type, scope, login_time)
#File: /app/validators/base.py # -*- coding: utf-8 -*- from flask import request from wtforms import Form from app.libs.error_code import ParameterException class BaseForm(Form): def __init__(self): data = request.get_json(silent=True) args = request.args.to_dict() super(BaseForm, self).__init__(data=data, **args) def validate_for_api(self): valid = super(BaseForm, self).validate() if not valid: raise ParameterException(msg=self.errors) return self
#File: /app/validators/forms.py # -*- coding: utf-8 -*- from wtforms import StringField, IntegerField from wtforms.validators import DataRequired, length, Email, Regexp from wtforms import ValidationError from app.libs.enums import ClientTypeEnum from app.models.user import User from app.validators.base import BaseForm as Form class ClientForm(Form): account = StringField(validators=[ DataRequired(message='不容许为空'), length(min=5, max=32)]) secret = StringField() type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = client class UserEmailForm(ClientForm): account = StringField(validators=[Email(message='invalidate email')]) secret = StringField(validators=[ DataRequired(), # password can only include letters , numbers and "_" Regexp(r'^[A-Za-z0-9_*&$#@]{6,22}$') ]) nickname = StringField(validators=[DataRequired(), length(min=2, max=22)]) def validate_account(self, value): if User.query.filter_by(email=value.data).first(): raise ValidationError() class TokenForm(Form): token = StringField(validators=[DataRequired()]) class MinaForm(Form): account = StringField(validators=[ DataRequired(message='不容许为空'), length(min=10, max=80)]) nickname = StringField(validators=[DataRequired()]) sex = IntegerField(validators=[DataRequired()]) avatar = StringField(validators=[DataRequired()]) type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = client
error_code.md
error_code | msg |
---|---|
0 | 建立成功 |
1 | 删除成功 |
999 | 未知错误 - 后台发生了错误 |
1000 | 无效参数 |
1001 | 没有找到对应的资源 |
1002 | token is invalid |
1003 | token is expired |
1004 | 禁止访问,不在对应权限内 |
1005 | 认证失败 |
1006 | 未检测到客户端类型 |
2001 | 请勿重复操做 |
2002 | 请从新登陆 |
Flask + PyJWT 实现基于Json Web Token的用户认证受权
endpoint | HTTP Method | Authenticated? | Result | json Body |
---|---|---|---|---|
/v1/client/register | POST | NO | 注册用户 | {"account":"666@qq.com","secret":"123456","type":100,"nickname":"666"} |
/v1/token | POST | NO | 获取token | {"account":"666@qq.com","secret":"123456","type":100,"nickname":"666"} |
/v1/user | GET | NO | 用户详情 | 空 |
/v1/user/2 | GET | YES | 管理员获取用户详情 | 空 |
/v1/token/secret | POST | NO | token详情 | {"token":"*"} |
cd users
flask db migrate
flask db upgrade
学习资料: