基于oslo_messaging的RPC通讯

oslo_messaging源于Openstack的一个经典的模块,用以实现服务间的RPC通讯。Client端将数据放入rabbitmq中,server端从消息队列中获取传送数据。python

oslo.messaging库就是把rabbitmq的python库作了封装,考虑到了编程友好、性能、可靠性、异常的捕获等诸多因素。让各个项目的开发者聚焦于业务代码的编写,而不用考虑消息如何发送和接收。编程

一张比较经典的图见下:后端

 

Target:做为消息发送者,须要在target中指定消息要发送到的topic,exchange, binding-key, consumer等信息。多线程

Transport(传输层)主要实现RPC底层的通讯(好比socket)以及事件循环,多线程等其余功能.能够经过URL来得到不一样transport的句柄.URL的格式为:socket

 transport://user:password@host:port[,hostN:portN]/virtual_host函数

 目前支持的Transport有rabbit,qpid与zmq,分别对应不一样的后端消息总线.用户能够使用oslo.messaging.get_transport函数来得到transport对象实例的句柄.性能

Notifier:消息的发送端,能够在不一样的优先级别上发送通知,这些优先级包括sample,critical,error,warn,info,debug,audit等spa

Notification Listener和Server相似,一个Notification Listener对象能够暴露多个endpoint,每一个endpoint包含一组方法.可是与Server对象中的endpoint不一样的是,这里的endpoint中的方法对应通知消息的不一样优先级。在发送消息时,指定方法info,warn等,在notifer listener监听消息队列,使用dispatcher对象根据消息的publish_id, event_type将消息路由到不一样的endpoint方法上。线程

举个例子,在notifier listener端程序见下:debug

 1 from  oslo_config import cfg  2 import oslo_messaging  3 
 4 class NotificationEndpoint(object):  5 # filter_rule = oslo_messaging.NotificationFilter(
 6 # publish_id='^compute.*')
 7     def warn(self, ctxt, publish_id, event_type, payload, metadata):  8         print "caesar==> %s"  % payload  9 
10 class ErrorEndpoint(object): 11 # filter_rule = oslo_messaging.NotificationFilter(
12 # event_type='^instance\..*\.start',
13 # context={'ctxt_key':'regexp'})
14 
15     def error(self, ctxt, publish_id, event_type, payload, metadata): 16         print "caesar==> %s"  % payload 17 
18 transport = oslo_messaging.get_notification_transport(cfg.CONF) 19 endpoints = [ 20  NotificationEndpoint(), 21  ErrorEndpoint() 22 ] 23 targets = [ 24     oslo_messaging.Target(topic='notification'), 25     oslo_messaging.Target(topic='notification_bis') 26 ] 27 
28 server = oslo_messaging.get_notification_listener(transport, targets, 29  endpoints) 30 server.start() 31 server.wait()

 

程序中,两个endpoint中分别有error和warn方法,当开启服务时,会建立四个topic消息 队列,见下:

在客户端,经过notifier中topic和方法,好比topic=notification 方法为error,便可以向notification.error队列中传入数据。

 1 from oslo_config import cfg  2 import oslo_messaging as messaging  3 
 4 transport = messaging.get_transport(cfg.CONF)  5 notifier = messaging.Notifier(transport, driver='messaging', topics=['notification'])  6 project_id = 'b23a5e41d1af4c20974bf58b4dff8e5a'
 7 user_id = 'ceb61464a3d341ebabdf97d1d4b97099'
 8 notifier.error(ctxt={},  9                 event_type='my_type', 10                 payload={ 11             'tenant_id': project_id, 12             'user_id': user_id, 13             'instance_id': '123', 14             'instance_type_id': 1, 15             'instance_type': 'm1.flavor', 16             'state': 'active'
17 
18 })

 执行notifier程序,查询消息队列为空,即已经被notification listnener消费,消息无阻塞。:

在notification listnener 路由到ErrorEndpoint的error方法,打印结果见下:

 

相关文章
相关标签/搜索