微服务最近一二年很是热门,谈论也比较多,简单的说,微服务将单一应用程序做为由众多小型服务构成之套件加以开发的方式,其中各项服务都拥有本身的进程并利用轻量化机制(一般为HTTP源API)实现通讯。下面来一张示例图:html
Microservices Architecture: python
上面2幅图已经形象说明微服务是什么东西了,同时软件部署方式须要创建在容器上。微服务相关生态会在Java和Go语言中比较成熟,尤为是Java。而Python做为后端,这方面会比较弱一点,微服务框架目前能看到了也就Nameko,而且技术也没那么成熟,因目前业务Python场景比Go语言稍多,因此先来玩一下Python如何玩微服务吧。mysql
关于微服务,还有2个概念也比较热,下面简单说起一下。git
Service Mesh 服务网格,这个概念刚开始晦涩难懂,网上也有人说事下一代微服务,简单的说,当成千上万的微服务部署在Kubernetes 上,总体来讲也是至关复杂的,由于每一个微服务都须要健康检查、处理错误、延时等,而Kubernetes虽然能够提供健康检查和自动恢复,可是还须要熔断模式、服务发现、API管理、加密校验等等。而这些就是 Service Mesh须要解决的问题。github
更加详细的介绍: philcalcado.com/2017/08/03/…redis
做为服务间通讯的基础设施层,能够将它比做是应用程序或者说微服务间的TCP/IP,负责服务之间的网络调用、限流、熔断和监控。对于编写应用程序来讲通常无须关心TCP/IP这一层(好比经过 HTTP 协议的 RESTful 应用),一样使用Service Mesh也就无须关系服务之间的那些原来是经过应用程序或者其余框架实现的事情,好比Spring Cloud,如今只要交给Service Mesh就能够了。sql
Service Mesh有以下几个特色:docker
Service Mesh的架构以下图所示: json
比较流行的开源软件有Istio,有机会再去玩一下。flask
无服务架构,第一次接触是在AWS的技术峰会上,简单的说就是不须要关心服务器,整个计算堆栈,包括运行功能代码的操做系统进程,彻底由云提供商管理。更增强化了 DevOps 的理念。
实际玩过AWS Lambda 无服务应用程序,确实很方便,简化为一个函数,经过 API Gateway + Lambda 则可实现Web服务。按请求量收费,这一点目前以为很坑,尤为是请求量大时,产生的费用远远比本身将应用部署在Docker上会贵不少。 因此目前无服务架构的场景也是很是适合一些一次性任务,请求量调用很少的场景来讲会很是方便,开发者成员就能够本身开发本身部署,再也不须要关心服务器。 能够免除全部运维性操做,开发人员能够更加专一于核心业务的开发,实现快速上线和迭代。
Nameko是Python中的微服务框架,git地https://github.com/nameko/nameko,受欢迎度暂时还不高,官方文档的介绍实现了:
It comes with built-in support for:
简单的说RPC创建在AMQP上,在AMQP上实现了发布订阅,实现了简单的HTTP服务,还有Websocket RPC。
这简直跟Java的生态彻底感受是小儿科。架构经过RabbitMQ做为message broker,供给各个Nameko Service之间的通讯。
更多的细节请查看官方文档。
接下来实践一下,以某一个业务场景为例。
场景: 假设社交场景中,评论别人的文章,服务器给文章做者推送一条消息告知有人评论,同时评论必须得先注册。
涉及到2个微服务,注册服务和推送服务,同时有一个评论接口。
首先须要准备Python3环境,Redis简单起见做为用户登陆注册的存储,Nameko用pip安装,RabbitMQ最好用Docker安装。
# RabbitMQ docker 安装命令
docker search rabbitmq
docker pull rabbitmq:3.7-rc-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7-rc-management
# 须要默认运行在5672端口
Doc: https://github.com/docker-library/docs/tree/master/rabbitmq
# nameko 运行服务命令:
nameko run service --broker amqp://guest:guest@localhost
其中 guest:guest是RabbitMQ Docker镜像的用户名和密码
复制代码
同时为了方便API测试,经过flasgger提供Swagger UI进行集成Flask。
准备好环境后,开始代码部分演示。
├── app
│ └── api.py
├── dependence
│ ├── __init__.py
│ └── services
│ ├── __init__.py
│ ├── config.py
│ └── redis_service.py
└── microservices
├── push.py
└── register.py
复制代码
代码结构如上:
须要实践的是2个功能:
先介绍一下dependence中的代码:
# content of redis_service
class RedisService(object):
def __init__(self):
self.redis_instance = RedisClient.get_redis(
config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT,
config.REDIS_DB)
self.users_key = "users"
self.users_data_key = "users_data"
def check_registered_and_get_info(self, u_id):
""" Check if the user is registered and return user information if registered. """
user_data = self.redis_instance.hget(self.users_data_key, u_id)
if not user_data:
return False, None
return True, json.loads(user_data)
def check_email_is_registered(self, email):
u_id = self.redis_instance.hget(self.users_key, email)
return u_id
def register(self, u_id, email, data):
self.redis_instance.hset(self.users_key, email, u_id)
result = self.redis_instance.hset(self.users_data_key, u_id,json.dumps(data))
return result
复制代码
接下来看API代码部分:
# content of api.py
import time
import random
from flask import Flask, request, jsonify
from flasgger import Swagger
from nameko.standalone.rpc import ClusterRpcProxy
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--port", help="app running port", type=int, default=5000)
parse_args = parser.parse_args()
app = Flask(__name__)
Swagger(app)
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
复制代码
代码校长,分开展现,这部分是引用部分,须要nameko和Swagger,nameko是提供微服务的RPC服务代理,同时须要提供CONFIG,内容是Message Broker地址,其实就RabbitMQ。 Swagger是和Flask结合,方便网页界面进行API测试。
# content of api.py
@app.route('/api/v1/comment', methods=['POST'])
def comment():
""" Comment API Parameters Explain: timestamp 评论时间 u_id 用户id content 评论内容 article_id 文章ID article_u_id 文章做者用户id parent_comment_id 父评论id (optional) --- parameters: - name: body in: body required: true schema: id: comment properties: timestamp: type: integer u_id: type: string content: type: string article_id: type: integer article_u_id: type: integer parent_comment_id: type: integer responses: code: description: 0 Comment Success! message: description: Error Message! data: description: return comment_id """
data = request.json
article_u_id = data.get("article_u_id")
u_id = data.get("u_id")
code, message = 0, ""
if not article_u_id or not u_id:
code, message = 10003, "article_u_id or u_id is null."
response = dict(code=code, message=message, data="")
return jsonify(response)
with ClusterRpcProxy(CONFIG) as rpc:
user_data = rpc.register.check_registered(u_id)
if not user_data:
code, message = 10004, "You need to register to comment."
response = dict(code=code, message=message, data="")
return jsonify(response)
# push message
print("Push Message: article_u_id: {}".format(article_u_id))
result, message = rpc.push.push(article_u_id, data.get("content"))
print("push result: {}, message: {}".format(result, message))
# save comment data
print("Save Comment Data: article_id: {} content: {}".format(
data.get("article_id"), data.get("content")))
data = dict(comment_id=int(time.time()))
response = dict(code=0, message="", data=data)
return jsonify(response)
复制代码
评论接口,描述部分是提供Swagger的API接口描述(其规范须要遵循Swagger规范,具体能够查看官方文档),提供评论者的用户ID,文章ID,评论的内容,文章做者用户ID(简单起见,直接客户端提供,正常场景是根据文章ID找到做者的用户ID)。
实现的功能也很是简单,先经过调用检查注册服务看评论者是否有注册,没有就直接返回须要注册才能评论。若是已经注册,则调用推送服务给做者进行推送通知。以后并保存评论信息,返回评论ID。
关键信息就是在 注册和推送 微服务的实现,保存评论信息,我这里直接print,没有作实际的操做。
@app.route('/api/v1/register', methods=['POST'])
def register():
""" Register API Parameters Explain: timestamp 注册时间 email 注册邮箱 name 名称 language 语言 country 国家 --- parameters: - name: body in: body required: true schema: id: data properties: timestamp: type: integer email: type: string name: type: string language: type: string country: type: string responses: code: description: 0 register success. message: description: Error Message! data: description: return u_id """
user_data = request.json
email = user_data.get("email")
code, message = 0, ""
if not email:
code, message = 10000, "email is null."
response = dict(code=code, message=message, data="")
return jsonify(response)
u_id = None
with ClusterRpcProxy(CONFIG) as rpc:
u_id, message = rpc.register.register(email, user_data)
if message:
code = 10001
data = dict(u_id=u_id)
response = dict(code=code, message=message, data=data)
return jsonify(response)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(parse_args.port), debug=True)
复制代码
这是api.py最后一段,实现的是注册接口,简单的说就是调用注册服务,若是已经注册则直接返回,不然存储用户信息。
关键在于注册服务的实现。
运行 python api.py
打开 http://localhost:5000/apidocs/
能够看到以下界面:
点开其中一个API,能够看到以下界面:
很是方便进行API接口调试。
接下来重点来了,演示微服务代码部分:
import random
from nameko.rpc import rpc
import sys
sys.path.append("..")
from dependence.services import RedisService
class RegisterService(object):
name = "register"
def __init__(self):
self.redis_handle = RedisService()
@rpc
def check_registered(self, u_id):
is_registered, user_data = self.redis_handle.check_registered_and_get_info(u_id)
if is_registered:
return user_data
return None
@staticmethod
def generate_u_id():
"""
Test Function
"""
return str(random.randint(7000000, 9999999))
@rpc
def register(self, email, user_data):
u_id = self.redis_handle.check_email_is_registered(email)
if u_id:
return u_id, "already registered."
u_id = self.generate_u_id()
register_result = self.redis_handle.register(u_id, email, user_data)
if register_result:
return u_id, ""
return None, "register failed.
复制代码
关注register的实现,须要导入nameko.rpc,而且用rpc装饰该函数。实现很是简单,里面代码就是逻辑部分,生成u_id,而后存储到redis中。
这样就实践了第一个功能,在API中调用微服务。
接下来看看 push 服务的实现:
import random
from nameko.rpc import rpc, RpcProxy
import sys
sys.path.append("..")
from dependence.services import RedisService
class PushService(object):
name = "push"
register_rpc = RpcProxy("register")
@rpc
def push(self, u_id, content):
user_data = self.register_rpc.check_registered(u_id)
if not user_data:
print("User:{} not existed.".format(u_id))
return False, "not registered."
language, country = user_data["language"], user_data["country"]
# get language push content
print("Push Progress: u_id: {} language: {}, country: {}, content: {}".
format(u_id, language, country, content))
return True, "push success."
复制代码
push服务中须要调用注册服务,判断文章做者是否注册(其实可以发表文章确定是已经注册,这里是指演示),这样就微服务中调用微服务,须要额外import RpcProxy,指定 注册服务 RpcProxy("register"),而后再服务中调用便可,而且拿到用户的信息,判断语言和国家,推送对应的语言内容。
总体来说,Nameko这个框架,代码层实现很是简单,轻量级,简单实用。可是功能不全,Python 后端应用场景很少。
开三个终端,分别运行:
cd microservices & nameko run push
cd microservices & nameko run register
cd app & python api.py
复制代码
打开http://localhost:5000/apidocs/#/
准备注册数据:
注册信息1
{
"country": "CN",
"email": "nameko@nameko.com",
"language": "ZH",
"name": "xiaohua",
"timestamp": 1553652949
}
注册信息2
{
"country": "CN",
"email": "nameko2@nameko.com",
"language": "ZH",
"name": "xiaoming",
"timestamp": 1553652950
}
复制代码
操做示例:
返回信息:
返回信息1
{
"code": 0,
"data": {
"u_id": "7434029"
},
"message": ""
}
返回信息2
{
"code": 0,
"data": {
"u_id": "8240184"
},
"message": ""
}
复制代码
调用push接口:
点击执行execute。
返回信息:
python api.py 终端打印信息:
Push Message: article_u_id: 7434029
push result: True, message: push success.
Save Comment Data: article_id: 100 content: very good.
127.0.0.1 - - [27/Mar/2019 23:24:07] "POST /api/v1/comment HTTP/1.1" 200 -
复制代码
Python 微服务就演示到这里,完整代码 github 地址github.com/CrystalSkyZ…
下一篇聊一下RPC。
更多精彩文章,请关注公众号『天澄技术杂谈』