RabbitMQ的几种应用场景

以前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就能够基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其Python客户端pika为例简单介绍以下。更详尽的信息可参阅:http://www.rabbitmq.com/getstarted.html 。html

以前的几篇文章:
RabbitMQ概念及环境搭建(一)单节点安装与配置
RabbitMQ概念及环境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及环境搭建(三)RabbitMQ cluster
RabbitMQ概念及环境搭建(四)RabbitMQ High Availability
RabbitMQ概念及环境搭建(五)与web的整合

python

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者如下简称P,相对应的“消费者”乃message接收者如下简称C,message经过queue由P到C,queue存在于RabbitMQ,可存储尽量多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

web

应用场景1-“Hello Word”缓存

一个P向queue发送一个message,一个C从该queue接收message并打印。
并发

send.py 
producer,链接至RabbitMQ Server,声明队列,发送message,关闭链接,退出。函数

 

[python]  view plain  copy
 
  1. #!/usr/bin/python27  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #与RabbitMQ Server创建链接  
  6. #链接到的broker在本机-localhost上  
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明队列以向其发送消息消息  
  12. #向不存在的位置发送消息时RabbitMQ将消息丢弃  
  13. #queue='hello'指定队列名字  
  14. channel.queue_declare(queue='hello', durable=True)  
  15.   
  16. #message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange  
  17. #使用默认exchange时容许经过routing_key明确指定message将被发送给哪一个queue  
  18. #body参数指定了要发送的message内容  
  19. channel.basic_publish(exchange='',  
  20.                       routing_key='hello',  
  21.                       body='Hello World!')  
  22.   
  23. print " [x] Sent 'Hello World!'"  
  24.   
  25. #关闭与RabbitMq Server间的链接  
  26. connection.close()  

receive.py 
consumer,链接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。测试

 

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #创建到达RabbitMQ Server的connection  
  6. #此处RabbitMQ Server位于本机-localhost  
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明queue,确认要从中接收message的queue  
  12. #queue_declare函数是幂等的,可运行屡次,但只会建立一次  
  13. #若能够确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue  
  14. #但在producer和consumer中重复声明queue是一个好的习惯  
  15. channel.queue_declare(queue='hello')  
  16.   
  17. print ' [*] Waiting for messages. To exit press CTRL+C'  
  18.   
  19. #定义回调函数  
  20. #一旦从queue中接收到一个message回调函数将被调用  
  21. #ch:channel  
  22. #method:  
  23. #properties:  
  24. #body:message  
  25. def callback(ch, method, properties, body):  
  26.     print " [x] Received %r" % (body,)  
  27.   
  28. #从queue接收message的参数设置  
  29. #包括从哪一个queue接收message,用于处理message的callback,是否要确认message  
  30. #默认状况下是要对消息进行确认的,以防止消息丢失。  
  31. #此处将no_ack明确指明为True,不对消息进行确认。  
  32. channel.basic_consume(callback,  
  33.                       queue='hello',  
  34.                       no_ack=True)  
  35.   
  36. #开始循环从queue中接收message并使用callback进行处理  
  37. channel.start_consuming()  

测试fetch

 

[plain]  view plain  copy
 
  1. python send.py  
  2. python receive.py  


应用场景2-work queuesui

将耗时的消息处理经过队列分配给多个consumer来处理,咱们称此处的consumer为worker,咱们将此处的queue称为Task Queue,其目的是为了不资源密集型的task的同步处理,也即当即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。spa


new_task.py
创建链接,声明队列,发送能够模拟耗时任务的message,断开链接、退出。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import sys  
  5.   
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #仅仅对message进行确认不能保证message不丢失,好比RabbitMQ崩溃了queue就会丢失  
  11. #所以还需使用durable=True声明queue是持久化的,这样即使Rabb崩溃了重启后queue仍然存在  
  12. channel.queue_declare(queue='task_queue', durable=True)  
  13.   
  14. #从命令行构造将要发送的message  
  15. message = ' '.join(sys.argv[1:]) or "Hello World!"  
  16.   
  17. #除了要声明queue是持久化的外,还需声明message是持久化的  
  18. #basic_publish的properties参数指定message的属性  
  19. #此处pika.BasicProperties中的delivery_mode=2指明message为持久的  
  20. #这样一来RabbitMQ崩溃重启后queue仍然存在其中的message也仍然存在  
  21. #需注意的是将message标记为持久的并不能彻底保证message不丢失,由于  
  22. #从RabbitMQ接收到message到将其存储到disk仍需一段时间,若此时RabbitMQ崩溃则message会丢失  
  23. #何况RabbitMQ不会对每条message作fsync动做  
  24. #可经过publisher confirms实现更强壮的持久性保证  
  25. channel.basic_publish(exchange='',  
  26.                       routing_key='task_queue',  
  27.                       body=message,  
  28.                       properties=pika.BasicProperties(  
  29.                          delivery_mode = 2, # make message persistent  
  30.                       ))  
  31. print " [x] Sent %r" % (message,)  
  32. connection.close()  

worker.py
创建链接,声明队列,不断的接收message,处理任务,进行确认。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import time  
  5.   
  6. #默认状况RabbirMQ将message以round-robin方式发送给下一个consumer  
  7. #每一个consumer接收到的平均message量是同样的  
  8. #能够同时运行两个或三个该程序进行测试  
  9.   
  10. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  11.         host='localhost'))  
  12. channel = connection.channel()  
  13.   
  14. #仅仅对message进行确认不能保证message不丢失,好比RabbitMQ崩溃了  
  15. #还需使用durable=True声明queue是持久化的,这样即使Rabb崩溃了重启后queue仍然存在其中的message不会丢失  
  16. #RabbitMQ中不容许使用不一样的参数定义同名queue  
  17. channel.queue_declare(queue='task_queue', durable=True)  
  18.   
  19. print ' [*] Waiting for messages. To exit press CTRL+C'  
  20.   
  21. #回调函数,函数体模拟耗时的任务处理:以message中'.'的数量表示sleep的秒数  
  22. def callback(ch, method, properties, body):  
  23.     print " [x] Received %r" % (body,)  
  24.     time.sleep( body.count('.') )  
  25.     print " [x] Done"  
  26.     #对message进行确认  
  27.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  28.   
  29. #若存在多个consumer每一个consumer的负载可能不一样,有些处理的快有些处理的慢  
  30. #RabbitMQ并无论这些,只是简单的以round-robin的方式分配message  
  31. #这可能形成某些consumer积压不少任务处理不完而一些consumer长期处于饥饿状态  
  32. #可使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer处理并确认了上一个message后才分配新的message给他  
  33. #不然分给另外一个空闲的consumer  
  34. channel.basic_qos(prefetch_count=1)  
  35.   
  36. #这里移除了no_ack=True这个参数,也即须要对message进行确认(默认行为)  
  37. #不然consumer在偶然down后其正在处理和分配到该consumer还未处理的message可能发生丢失  
  38. #由于此时RabbitMQ在发送完message后当即从内存删除该message  
  39. #假如没有设置no_ack=True则consumer在偶然down掉后其正在处理和分配至该consumer但还将来得及处理的message会从新分配到其余consumer  
  40. #没有设置no_ack=True则consumer在收到message后会向RabbitMQ反馈已收到并处理了message告诉RabbitMQ能够删除该message  
  41. #RabbitMQ中没有超时的概念,只有在consumer down掉后从新分发message  
  42. channel.basic_consume(callback,  
  43.                       queue='task_queue')  
  44.   
  45. channel.start_consuming()  

测试

[plain]  view plain  copy
 
  1. python new_task.py "A very hard task which takes two seconds.."  
  2. python worker.py  

 

应用场景3-Publish/Subscribe

在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。如今咱们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息能够被全部运行的log接收者接收。所以,咱们能够运行一个log接收者直接在屏幕上显示log,同时运行另外一个log接收者将log写入磁盘文件。

 


receive_logs.py
日志消息接收者:创建链接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #做为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在  
  10. channel.exchange_declare(exchange='logs',  
  11.                          type='fanout')  
  12.   
  13. #在不一样的producer和consumer间共享queue时指明queue的name是重要的  
  14. #但某些时候,好比日志系统,须要接收全部的log message而非一个子集  
  15. #并且仅对当前的message 流感兴趣,对于过期的message不感兴趣,那么  
  16. #能够申请一个临时队列这样,每次链接到RabbitMQ时会以一个随机的名字生成  
  17. #一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue  
  18. result = channel.queue_declare(exclusive=True)  
  19.   
  20. #用于获取临时queue的name  
  21. queue_name = result.method.queue  
  22.   
  23. #exchange与queue之间的关系成为binding  
  24. #binding告诉exchange将message发送该哪些queue  
  25. channel.queue_bind(exchange='logs',  
  26.                    queue=queue_name)  
  27.   
  28. print ' [*] Waiting for logs. To exit press CTRL+C'  
  29.   
  30. def callback(ch, method, properties, body):  
  31.     print " [x] %r" % (body,)  
  32.   
  33. #从指定地queue中consume message且不确认  
  34. channel.basic_consume(callback,  
  35.                       queue=queue_name,  
  36.                       no_ack=True)  
  37.   
  38. channel.start_consuming()  

emit_log.py
日志消息发送者:创建链接,声明fanout类型的exchange,经过exchage向queue发送日志消息,消息被广播给全部接收者,关闭链接,退出。

 

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3.   
  4. import pika  
  5. import sys  
  6.   
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #producer只能经过exchange将message发给queue  
  12. #exchange的类型决定将message路由至哪些queue  
  13. #可用的exchange类型:direct\topic\headers\fanout  
  14. #此处定义一个名称为'logs'的'fanout'类型的exchange,'fanout'类型的exchange简单的将message广播到它所知道的全部queue  
  15. channel.exchange_declare(exchange='logs',  
  16.                          type='fanout')  
  17.   
  18. message = ' '.join(sys.argv[1:]) or "info: Hello World!"  
  19.   
  20. #将message publish到名为log的exchange中  
  21. #由于是fanout类型的exchange,这里无需指定routing_key  
  22. channel.basic_publish(exchange='logs',  
  23.                       routing_key='',  
  24.                       body=message)  
  25.   
  26. print " [x] Sent %r" % (message,)  
  27.   
  28. connection.close()  

测试

 

 

[plain]  view plain  copy
 
  1. python receive_logs.py  
  2. python emit_log.py "info: This is the log message"  

 

应用场景4-Routing

应用场景3中构建了简单的log系统,能够将log message广播至多个receiver。如今咱们将考虑只把指定的message类型发送给其subscriber,好比,只把error message写到log file而将全部log message显示在控制台。


receive_logs_direct.py
log message接收者:创建链接,声明direct类型的exchange,声明queue,使用提供的参数做为routing_key将queue绑定到exchange,开始循环接收log message并打印。

 

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #声明一个名为direct_logs类型为direct的exchange  
  10. #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在  
  11. channel.exchange_declare(exchange='direct_logs',  
  12.                          type='direct')  
  13.   
  14. result = channel.queue_declare(exclusive=True)  
  15. queue_name = result.method.queue  
  16.   
  17. #从命令行获取参数:routing_key  
  18. severities = sys.argv[1:]  
  19. if not severities:  
  20.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)  
  21.     sys.exit(1)  
  22.   
  23. for severity in severities:  
  24.     #exchange和queue之间的binding可接受routing_key参数  
  25.     #该参数的意义依赖于exchange的类型  
  26.     #fanout类型的exchange直接忽略该参数  
  27.     #direct类型的exchange精确匹配该关键字进行message路由  
  28.     #对多个queue使用相同的binding_key是合法的  
  29.     channel.queue_bind(exchange='direct_logs',  
  30.                        queue=queue_name,  
  31.                        routing_key=severity)  
  32.   
  33. print ' [*] Waiting for logs. To exit press CTRL+C'  
  34.   
  35. def callback(ch, method, properties, body):  
  36.     print " [x] %r:%r" % (method.routing_key, body,)  
  37.   
  38. channel.basic_consume(callback,  
  39.                       queue=queue_name,  
  40.                       no_ack=True)  
  41.   
  42. channel.start_consuming()  

emit_log_direct.py
log message发送者:创建链接,声明direct类型的exchange,生成并发送log message到exchange,关闭链接,退出。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3.   
  4. import pika  
  5. import sys  
  6.   
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明一个名为direct_logs的direct类型的exchange  
  12. #direct类型的exchange  
  13. channel.exchange_declare(exchange='direct_logs',  
  14.                          type='direct')  
  15.   
  16. #从命令行获取basic_publish的配置参数  
  17. severity = sys.argv[1] if len(sys.argv) > else 'info'  
  18. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
  19.   
  20. #向名为direct_logs的exchage按照设置的routing_key发送message  
  21. channel.basic_publish(exchange='direct_logs',  
  22.                       routing_key=severity,  
  23.                       body=message)  
  24.   
  25. print " [x] Sent %r:%r" % (severity, message)  
  26. connection.close()  

测试:

python receive_logs_direct.py info
python emit_log_direct.py info "The message"

应用场景5-topic

应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不一样的log message发送给不一样的subscriber(也即分别经过不一样的routing_key将queue绑定到exchange,这样exchange即可将不一样的message根据message内容路由至不一样的queue)。但仍然存在限制,不能根据多个规则路由消息,好比接收者要么只能收error类型的log message要么只能收info类型的message。若是咱们不只想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,须要topic类型的exchange。topic类型的exchange中routing_key中能够包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。

receive_logs_topic.py
log message接收者:创建链接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #声明一个名为direct_logs类型为direct的exchange  
  10. #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在  
  11. channel.exchange_declare(exchange='direct_logs',  
  12.                          type='direct')  
  13.   
  14. result = channel.queue_declare(exclusive=True)  
  15. queue_name = result.method.queue  
  16.   
  17. #从命令行获取参数:routing_key  
  18. severities = sys.argv[1:]  
  19. if not severities:  
  20.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)  
  21.     sys.exit(1)  
  22.   
  23. for severity in severities:  
  24.     #exchange和queue之间的binding可接受routing_key参数  
  25.     #该参数的意义依赖于exchange的类型  
  26.     #fanout类型的exchange直接忽略该参数  
  27.     #direct类型的exchange精确匹配该关键字进行message路由  
  28.     #对多个queue使用相同的binding_key是合法的  
  29.     channel.queue_bind(exchange='direct_logs',  
  30.                        queue=queue_name,  
  31.                        routing_key=severity)  
  32.   
  33. print ' [*] Waiting for logs. To exit press CTRL+C'  
  34.   
  35. def callback(ch, method, properties, body):  
  36.     print " [x] %r:%r" % (method.routing_key, body,)  
  37.   
  38. channel.basic_consume(callback,  
  39.                       queue=queue_name,  
  40.                       no_ack=True)  
  41.   
  42. channel.start_consuming()  

emit_log_topic.py
log message发送者:创建链接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭链接,退出。

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import sys  
  5.   
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #声明一个名为topic_logs的topic类型的exchange  
  11. #topic类型的exchange可经过通配符对message进行匹配从而路由至不一样queue  
  12. channel.exchange_declare(exchange='topic_logs',  
  13.                          type='topic')  
  14.   
  15. routing_key = sys.argv[1] if len(sys.argv) > else 'anonymous.info'  
  16. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
  17.   
  18. channel.basic_publish(exchange='topic_logs',  
  19.                       routing_key=routing_key,  
  20.                       body=message)  
  21.   
  22. print " [x] Sent %r:%r" % (routing_key, message)  
  23. connection.close()  

 

测试:

 

[plain]  view plain  copy
 
  1. python receive_logs_topic.py "*.rabbit"  
  2. python emit_log_topic.py red.rabbit Hello  

 

应用场景6-PRC

在应用场景2中描述了如何使用work queue将耗时的task分配到不一样的worker中。可是,若是咱们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个彻底不一样的故事。这一模式被称为远程过程调用。如今,咱们将构建一个RPC系统,包含一个client和可扩展的RPC server,经过返回斐波那契数来模拟RPC service。

rpc_server.py
RPC server:创建链接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包含参数的调用请求后调用本身的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。

 

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #创建到达RabbitMQ Server的connection  
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #声明一个名为rpc_queue的queue  
  11. channel.queue_declare(queue='rpc_queue')  
  12.   
  13. #计算指定数字的斐波那契数  
  14. def fib(n):  
  15.     if n == 0:  
  16.         return 0  
  17.     elif n == 1:  
  18.         return 1  
  19.     else:  
  20.         return fib(n-1) + fib(n-2)  
  21.   
  22. #回调函数,从queue接收到message后调用该函数进行处理  
  23. def on_request(ch, method, props, body):  
  24.     #由message获取要计算斐波那契数的数字  
  25.     n = int(body)  
  26.   
  27.     print " [.] fib(%s)"  % (n,)  
  28.     #调用fib函数得到计算结果  
  29.     response = fib(n)  
  30.       
  31.     #exchage为空字符串则将message发送个到routing_key指定的queue  
  32.     #这里queue为回调函数参数props中reply_ro指定的queue  
  33.     #要发送的message为计算所得的斐波那契数  
  34.     #properties中correlation_id指定为回调函数参数props中co的rrelation_id  
  35.     #最后对消息进行确认  
  36.     ch.basic_publish(exchange='',  
  37.                      routing_key=props.reply_to,  
  38.                      properties=pika.BasicProperties(correlation_id = \  
  39.                                                          props.correlation_id),  
  40.                      body=str(response))  
  41.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  42.   
  43. #只有consumer已经处理并确认了上一条message时queue才分派新的message给它  
  44. channel.basic_qos(prefetch_count=1)  
  45.   
  46. #设置consumeer参数,即从哪一个queue获取消息使用哪一个函数进行处理,是否对消息进行确认  
  47. channel.basic_consume(on_request, queue='rpc_queue')  
  48.   
  49. print " [x] Awaiting RPC requests"  
  50.   
  51. #开始接收并处理消息  
  52. channel.start_consuming()  

rpc_client.py
RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的链接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性做出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。

 

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import uuid  
  5.   
  6. #在一个类中封装了connection创建、queue声明、consumer配置、回调函数等  
  7. class FibonacciRpcClient(object):  
  8.     def __init__(self):  
  9.         #创建到RabbitMQ Server的connection  
  10.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(  
  11.                 host='localhost'))  
  12.   
  13.         self.channel = self.connection.channel()  
  14.           
  15.         #声明一个临时的回调队列  
  16.         result = self.channel.queue_declare(exclusive=True)  
  17.         self.callback_queue = result.method.queue  
  18.   
  19.         #此处client既是producer又是consumer,所以要配置consume参数  
  20.         #这里的指明从client本身建立的临时队列中接收消息  
  21.         #并使用on_response函数处理消息  
  22.         #不对消息进行确认  
  23.         self.channel.basic_consume(self.on_response, no_ack=True,  
  24.                                    queue=self.callback_queue)  
  25.       
  26.     #定义回调函数  
  27.     #比较类的corr_id属性与props中corr_id属性的值  
  28.     #若相同则response属性为接收到的message  
  29.     def on_response(self, ch, method, props, body):  
  30.         if self.corr_id == props.correlation_id:  
  31.             self.response = body  
  32.    
  33.     def call(self, n):  
  34.         #初始化response和corr_id属性  
  35.         self.response = None  
  36.         self.corr_id = str(uuid.uuid4())  
  37.          
  38.         #使用默认exchange向server中定义的rpc_queue发送消息  
  39.         #在properties中指定replay_to属性和correlation_id属性用于告知远程server  
  40.         #correlation_id属性用于匹配request和response  
  41.         self.channel.basic_publish(exchange='',  
  42.                                    routing_key='rpc_queue',  
  43.                                    properties=pika.BasicProperties(  
  44.                                          reply_to = self.callback_queue,  
  45.                                          correlation_id = self.corr_id,  
  46.                                          ),  
  47.                                    #message需为字符串  
  48.                                    body=str(n))  
  49.   
  50.         while self.response is None:  
  51.             self.connection.process_data_events()  
  52.           
  53.         return int(self.response)  
  54.   
  55. #生成类的实例  
  56. fibonacci_rpc = FibonacciRpcClient()  
  57.   
  58. print " [x] Requesting fib(30)"  
  59. #调用实例的call方法  
  60. response = fibonacci_rpc.call(30)  
  61. print " [.] Got %r" % (response,)  

测试:

[python]  view plain  copy
 
    1. python rpc_server.py  
    2. python rpc_client.py  
相关文章
相关标签/搜索