flask开发restful api系列(8)-再谈项目结构

  上一章,咱们讲到,怎么用蓝图建造一个好的项目,今天咱们继续深刻。上一章中,咱们全部的接口都写在view.py中,若是几十个,还稍微好管理一点,假如上百个,上千个,怎么找?全部接口堆在一块儿就显得杂乱无章。flask没有推荐你们在这方面的功能,一般都是由本身来实现。咱们一般的作法,都是按照功能划分文件,把不一样功能的应用接口,划分到不一样文件,若是某个功能的接口不少,再细分一下。固然你也能够按照其余划分方式划分,只要记住一点,项目怎么管理方便,就怎么划分,千万不要被框架框死。html

  废话说过了,来点实际的。上一个版本,咱们只有注册,登陆,注销等基本功能,总不能就修改一个加密接口吧,这一个版本老板要求把微博功能加进去,固然,这边的微博不是新浪的那个,就是能发一些东西,里面有标题,不超过140字的文字内容,不超过9张图片的图片集等。继续在上一个版本的model.py和view.py中添加。python

model.py以下:redis

class SmallBlog(Base):
    __tablename__ = 'small_blog'

    id = Column('id', Integer, primary_key=True)
    post_user_id = Column('post_user_id', Integer, ForeignKey(User.id))
    post_time = Column('post_time', DateTime, default=datetime.datetime.now)
    title = Column('title', String(30), index=True)
    text_content = Column('text_content', String(140))
    picture_content = Column('picture_content', String(900))
    post_user = relationship('User', backref=backref('small_blogs'))

    @hybrid_property
    def pictures(self):
        if not self.picture_content:
            return []
        return self.picture_content.split(',')

    @pictures.setter
    def pictures(self, urls):
        self.picture_content = ','.join(urls)

    def to_dict(self):
        return {
            'id': self.id,
            'post_user_picture': self.post_user.head_picture,
            'post_user_name': self.post_user.nickname,
            'post_time': self.post_time.strftime('%Y-%m-%d %H:%M:%S'),
            'title': self.title,
            'text_content': self.text_content,
            'pictures': self.pictures
        }

 

  就多了一个SmallBlog的model,而后更新数据库,如何更新数据库,请看个人 flask开发restful api系列(3)--利用alembic进行数据库更改 这篇文章。还有,这边的图片存储方式,我是用每张图片的url,中间用","分隔开,而后存储,客户端请求的时候,再分隔开,返回url的数组。这样就用最简单的方式实现了0到9张图片的动态添加。若是你有更好的方案,请及时回复我,你们共同窗习。sql

  还有,我在这里面定义了3个函数,关于pictures的显示和设置,这个稍微熟悉一点python的都知道,只不过在普通的类中,直接@property。而在sqlalchemy里面,用@hybrid_property便可。还有就是专门为返回json格式数据作准备的函数to_dict(),获取每一个对象,而后执行这个函数就能够返回字典方式。这个你们能够用别的库,也能够用本身写的to_dict函数,我在不少地方强调,写代码必定要灵活,千万不能被框架限定死了。若是哪天以为很差,也能够用flask别的扩展库实现串行化对象。数据库

  model.py方面添加好,下面就在view.py中添加以下函数:json

view.pyflask

 1 @api.route('/get-multi-qiniu-token')
 2 @login_check
 3 def get_multi_qiniu_token():
 4     count = request.args.get('count')
 5 
 6     if not 0 < int(count) < 10:
 7         return jsonify({'code': 0, 'message': '一次只能获取1到9个'})
 8 
 9     key_token_s = []
10     for x in range(int(count)):
11         key = uuid.uuid1()
12         token = current_app.q.upload_token(current_app.bucket_name, key, 3600)
13         key_token_s.append((key, token))
14     return jsonify({'code': 1, 'key_token_s': key_token_s})
15 
16 
17 @api.route('/post-blog', methods=['POST'])
18 @login_check
19 def post_blog():
20     user = g.current_user
21 
22     title = request.get_json().get('title')
23     text_content = request.get_json().get('text_content')
24     pictures = request.get_json().get('pictures')
25 
26     newblog = SmallBlog(title=title, text_content=text_content, post_user=user)
27 
28     newblog.pictures = pictures
29     db_session.add(newblog)
30     try:
31         db_session.commit()
32     except Exception as e:
33         print e
34         db_session.rollback()
35         return jsonify({'code': 0, 'message': '上传不成功'})
36     return jsonify({'code': 1, 'message': '上传成功'})
37 
38 
39 @api.route('/get-blogs')
40 @login_check
41 def get_blogs():
42     last_id = request.args.get('last_id')
43     if not int(last_id):
44         blogs = db_session.query(SmallBlog).order_by(desc(SmallBlog.id)).limit(10)
45     else:
46         blogs = db_session.query(SmallBlog).filter(SmallBlog.id < int(last_id)).order_by(desc(SmallBlog.id)).limit(10)
47     return jsonify({'code': 1, 'blogs': [blog.to_dict() for blog in blogs]})

  多了3个函数,第一个函数get_multi_qiniu_token,看名字也能够看出来,这个就是获取七牛token的方法,只不过获取多个,这边限制在1到9个之间,本身上传一个数目,而后返回key和token。api

  第二个函数 post_blog就是一个提交新blog的过程,惟一要注意的就是pictures,在客户端直接上传数组,而后赋值就能够了。数组

  第三个函数 get_blogs有点意思,其实就是根据last_id获得上面10条数据,若是第一次没有last_id,就填0就能够了。最主要是给客户端使用,你总不能一会儿把全部数据都给客户端,一般都是必定的条目,而后下拉,根据最上面的id,来获取其上10条数据,这样能够无限下拉刷新,只要一个接口就行了。就跟微博同样。若是你有其余好办法,欢迎一块儿探讨。服务器

  服务器端写好了,咱们就写客户端吧,客户端代码也很简单,针对这3个函数,写3个接口而已。

client.py

    def get_multi_qiniu_token(self, count, path='/get-multi-qiniu-token'):
        self.headers = {'token': self.token}
        payload = {'count': count}
        response = requests.get(url=self.base_url + path, params=payload, headers=self.headers)
        response_data = json.loads(response.content)
        key_token_s = response_data.get('key_token_s')
        return key_token_s

    def post_blog(self, title, text_content, picture_files, path='/post-blog'):
        self.headers = {'token': self.token}
        count = len(picture_files)
        key_token_s = self.get_multi_qiniu_token(count=count)
        pictures = []

        for x in range(count):
            put_file(key_token_s[x][1], key_token_s[x][0], picture_files[x])
            pictures.append(self.qiniu_base_url + key_token_s[x][0])

        payload = {'title': title, 'text_content': text_content, 'pictures': pictures}
        self.headers = {'content-type': 'application/json', 'token': self.token}
        response = requests.post(url=self.base_url + path, data=json.dumps(payload), headers=self.headers)
        response_data = json.loads(response.content)
        print response_data.get('code')
        return response_data

    def get_blogs(self, last_id, path='/get-blogs'):
        self.headers = {'token': self.token}
        payload = {'last_id': last_id}
        response = requests.get(url=self.base_url + path, params=payload, headers=self.headers)
        response_data = json.loads(response.content)
        return response_data

  测试一下吧,

if __name__ == '__main__':
    api = API_1_1()
    u = api.login('13565208554', '123456')
    # key_token_s = api.get_multi_qiniu_token(4)
    api.post_blog(title=u'dsfsdfrerg434', text_content=u'是打发撒dsfsdgsdfsgs法上得分未违法',
                  picture_files=['./img/4.png', './img/2.png', './img/3.png'])
    blogs = api.get_blogs(0)
    print blogs
    api.logout()

  随便上传几个,打印一下,就能够看到效果了,好了,添加完毕。

  下面咱们就介绍一下结构,想象一下,咱们目前添加了最最基本的功能,就要添加3个接口。其实一个功能慢慢作下去,几十个接口都是少的,若是都放在view.py里面,你们想象一个文件有多少接口,不要说看起来很是难看,就是之后查找bug,也不是通常的复杂。在传统python中,一般都是根据功能划分文件,固然也有其余的,就像咱们上面所说的,只要找到适合你本身的,就去改变,不要被框架限定死。

  咱们目前在view.py里面,其实有不少功能了,有装饰器模块,验证模块,注册模块,七牛模块,博客模块等,其实如今接口少,就少分一点文件,这个只要灵活便可,没有必要限定本身,咱们就划分auth.py, main.py, decorators.py,blogs.py这4个模块,其中auth.py里面包含验证和注册;main.py里面包含一些公用,包括运行接口先后以及七牛token和key的获取;decorators.py就是装饰器;blogs.py就是博客接口。大体先这么分,之后遇到其余接口,再动态添加便可。

  转移一下以前的代码,把view.py的代码分别复制到各个py文件下,而后删除view.py文件,如今的文件结构以下:

  是否是清晰多了?这里再把各个py文件的代码复制一下,最最重要的就是__init__.py里面的代码,必定要覆盖到全部接口。

# coding:utf-8
from flask import Blueprint

api = Blueprint('api1_1', __name__)

from . import auth, blogs, decorators, main

  看清楚from . import auth, blogs, decorators, main这行代码,就是程序启动的时候,覆盖全部接口。

下面分别是auth.py, blogs.py, decorators.py, main.py文件,这里就不用一一介绍了吧。

auth.py

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session, SmallBlog, desc
import hashlib
import time
from app.util import message_validate
import random
from .decorators import login_check

from . import api


@api.route('/login', methods=['POST'])
def login():
    phone_number = request.get_json().get('phone_number')
    encryption_str = request.get_json().get('encryption_str')
    random_str = request.get_json().get('random_str')
    time_stamp = request.get_json().get('time_stamp')
    user = User.query.filter_by(phone_number=phone_number).first()

    if not user:
        return jsonify({'code': 0, 'message': '没有此用户'})

    password_in_sql = user.password

    s = hashlib.sha256()
    s.update(password_in_sql)
    s.update(random_str)
    s.update(time_stamp)
    server_encryption_str = s.hexdigest()

    if server_encryption_str != encryption_str:
        return jsonify({'code': 0, 'message': '密码错误'})

    m = hashlib.md5()
    m.update(phone_number)
    m.update(user.password)
    m.update(str(int(time.time())))
    token = m.hexdigest()

    pipeline = current_app.redis.pipeline()
    pipeline.hmset('user:%s' % user.phone_number, {'token': token, 'nickname': user.nickname, 'app_online': 1})
    pipeline.set('token:%s' % token, user.phone_number)
    pipeline.expire('token:%s' % token, 3600*24*30)
    pipeline.execute()

    return jsonify({'code': 1, 'message': '成功登陆', 'nickname': user.nickname, 'token': token})


@api.route('/user')
@login_check
def user():
    user = g.current_user

    nickname = current_app.redis.hget('user:%s' % user.phone_number, 'nickname')
    return jsonify({'code': 1, 'nickname': nickname, 'phone_number': user.phone_number})


@api.route('/logout')
@login_check
def logout():
    user = g.current_user

    pipeline = current_app.redis.pipeline()
    pipeline.delete('token:%s' % g.token)
    pipeline.hmset('user:%s' % user.phone_number, {'app_online': 0})
    pipeline.execute()
    return jsonify({'code': 1, 'message': '成功注销'})


@api.route('/set-head-picture', methods=['POST'])
@login_check
def set_head_picture():
    head_picture = request.get_json().get('head_picture')
    user = g.current_user
    user.head_picture = head_picture
    try:
        db_session.commit()
    except Exception as e:
        print e
        db_session.rollback()
        return jsonify({'code': 0, 'message': '未能成功上传'})
    current_app.redis.hset('user:%s' % user.phone_number, 'head_picture', head_picture)
    return jsonify({'code': 1, 'message': '成功上传'})


@api.route('/register-step-1', methods=['POST'])
def register_step_1():
    """
    接受phone_number,发送短信
    """
    phone_number = request.get_json().get('phone_number')
    user = User.query.filter_by(phone_number=phone_number).first()

    if user:
        return jsonify({'code': 0, 'message': '该用户已经存在,注册失败'})
    validate_number = str(random.randint(100000, 1000000))
    result, err_message = message_validate(phone_number, validate_number)

    if not result:
        return jsonify({'code': 0, 'message': err_message})

    pipeline = current_app.redis.pipeline()
    pipeline.set('validate:%s' % phone_number, validate_number)
    pipeline.expire('validate:%s' % phone_number, 60)
    pipeline.execute()

    return jsonify({'code': 1, 'message': '发送成功'})


@api.route('/register-step-2', methods=['POST'])
def register_step_2():
    """
    验证短信接口
    """
    phone_number = request.get_json().get('phone_number')
    validate_number = request.get_json().get('validate_number')
    validate_number_in_redis = current_app.redis.get('validate:%s' % phone_number)

    if validate_number != validate_number_in_redis:
        return jsonify({'code': 0, 'message': '验证没有经过'})

    pipe_line = current_app.redis.pipeline()
    pipe_line.set('is_validate:%s' % phone_number, '1')
    pipe_line.expire('is_validate:%s' % phone_number, 120)
    pipe_line.execute()

    return jsonify({'code': 1, 'message': '短信验证经过'})


@api.route('/register-step-3', methods=['POST'])
def register_step_3():
    """
    密码提交
    """
    phone_number = request.get_json().get('phone_number')
    password = request.get_json().get('password')
    password_confirm = request.get_json().get('password_confirm')

    if len(password) < 7 or len(password) > 30:
        # 这边能够本身拓展条件
        return jsonify({'code': 0, 'message': '密码长度不符合要求'})

    if password != password_confirm:
        return jsonify({'code': 0, 'message': '密码和密码确认不一致'})

    is_validate = current_app.redis.get('is_validate:%s' % phone_number)

    if is_validate != '1':
        return jsonify({'code': 0, 'message': '验证码没有经过'})

    pipeline = current_app.redis.pipeline()
    pipeline.hset('register:%s' % phone_number, 'password', password)
    pipeline.expire('register:%s' % phone_number, 120)
    pipeline.execute()

    return jsonify({'code': 1, 'message': '提交密码成功'})


@api.route('/register-step-4', methods=['POST'])
def register_step_4():
    """
    基本资料提交
    """
    phone_number = request.get_json().get('phone_number')
    nickname = request.get_json().get('nickname')

    is_validate = current_app.redis.get('is_validate:%s' % phone_number)

    if is_validate != '1':
        return jsonify({'code': 0, 'message': '验证码没有经过'})

    password = current_app.redis.hget('register:%s' % phone_number, 'password')

    new_user = User(phone_number=phone_number, password=password, nickname=nickname)
    db_session.add(new_user)

    try:
        db_session.commit()
    except Exception as e:
        print e
        db_session.rollback()
        return jsonify({'code': 0, 'message': '注册失败'})
    finally:
        current_app.redis.delete('is_validate:%s' % phone_number)
        current_app.redis.delete('register:%s' % phone_number)

    return jsonify({'code': 1, 'message': '注册成功'})
View Code

blogs.py

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session, SmallBlog, desc
from .decorators import login_check
from . import api


@api.route('/post-blog', methods=['POST'])
@login_check
def post_blog():
    user = g.current_user

    title = request.get_json().get('title')
    text_content = request.get_json().get('text_content')
    pictures = request.get_json().get('pictures')

    newblog = SmallBlog(title=title, text_content=text_content, post_user=user)

    newblog.pictures = pictures
    db_session.add(newblog)
    try:
        db_session.commit()
    except Exception as e:
        print e
        db_session.rollback()
        return jsonify({'code': 0, 'message': '上传不成功'})
    return jsonify({'code': 1, 'message': '上传成功'})


@api.route('/get-blogs')
@login_check
def get_blogs():
    last_id = request.args.get('last_id')
    if not int(last_id):
        blogs = db_session.query(SmallBlog).order_by(desc(SmallBlog.id)).limit(10)
    else:
        blogs = db_session.query(SmallBlog).filter(SmallBlog.id < int(last_id)).order_by(desc(SmallBlog.id)).limit(10)
    return jsonify({'code': 1, 'blogs': [blog.to_dict() for blog in blogs]})
View Code

decorator.py

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from functools import wraps


def login_check(f):
    @wraps(f)
    def decorator(*args, **kwargs):
        token = request.headers.get('token')
        if not token:
            return jsonify({'code': 0, 'message': '须要验证'})

        phone_number = current_app.redis.get('token:%s' % token)
        if not phone_number or token != current_app.redis.hget('user:%s' % phone_number, 'token'):
            return jsonify({'code': 2, 'message': '验证信息错误'})

        return f(*args, **kwargs)
    return decorator
View Code

main.py

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session, SmallBlog, desc
import uuid

from . import api
from .decorators import login_check


@api.before_request
def before_request():
    token = request.headers.get('token')
    phone_number = current_app.redis.get('token:%s' % token)
    if phone_number:
        g.current_user = User.query.filter_by(phone_number=phone_number).first()
        g.token = token
    return


@api.teardown_request
def handle_teardown_request(exception):
    db_session.remove()


@api.route('/get-multi-qiniu-token')
@login_check
def get_multi_qiniu_token():
    count = request.args.get('count')

    if not 0 < int(count) < 10:
        return jsonify({'code': 0, 'message': '一次只能获取1到9个'})

    key_token_s = []
    for x in range(int(count)):
        key = uuid.uuid1()
        token = current_app.q.upload_token(current_app.bucket_name, key, 3600)
        key_token_s.append((key, token))
    return jsonify({'code': 1, 'key_token_s': key_token_s})


@api.route('/get-qiniu-token')
def get_qiniu_token():
    key = uuid.uuid4()
    token = current_app.q.upload_token(current_app.bucket_name, key, 3600)
    return jsonify({'code': 1, 'key': key, 'token': token})
View Code

  运行一下,看看有没有问题。整个文件结构进一步优化,我想哪一个接口在哪一个文件里,一目了然。其实如今能够回头看看,假设哪天个人model里面有上百张表,我该如何用model.py文件呢?这个能够留给你们去实践一下,必定要记住,这虽然不是必要的,但好的项目结构能让人一会儿明白你整个项目,即便之后你离职,下一个同事也能快速上手。最后,再啰嗦一句,必定要灵活,不要被框架限定死。 

相关文章
相关标签/搜索