系统环境html
[root@Centos ~]# uname -r 3.10.0-1127.el7.x86_64 [root@Centos ~]# cat /etc/redhat-release CentOS Linux release 7.8.2003 (Core) # 关闭firewalld [root@Centos mq]# systemctl stop firewalld [root@Centos mq]# systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service. # 禁用selinux [root@Centos mq]# setenforce 0 [root@Centos mq]# getenforce Permissive [root@Centos mq]# vi /etc/sysconfig/selinux 把 SELINUX=enforcing 修改成 SELINUX=disabled
处理依赖node
RabbitMQ版本与Erlang版本的兼容关系请查看:https://www.rabbitmq.com/which-erlang.htmlpython
这次安装版本:linux
RabbitMQ 3.8.5 下载地址:https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/git
Erlang 22.3.4 下载地址:https://github.com/rabbitmq/erlang-rpm/releasesgithub
[root@Centos mq]# pwd /root/mq [root@Centos mq]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.4.2/erlang-22.3.4.2-1.el7.x86_64.rpm [root@Centos mq]# wget https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/rabbitmq-server-3.8.5-1.el7.noarch.rpm [root@Centos mq]# ls -lh total 34M -rw-r--r--. 1 root root 20M Jun 26 20:28 erlang-22.3.4.2-1.el7.x86_64.rpm -rw-r--r--. 1 root root 15M Jun 15 21:46 rabbitmq-server-3.8.5-1.el7.noarch.rpm [root@Centos mq]# yum -y localinstall erlang-22.3.4.2-1.el7.x86_64.rpm rabbitmq-server-3.8.5-1.el7.noarch.rpm Failed to set locale, defaulting to C Loaded plugins: fastestmirror Examining erlang-22.3.4.2-1.el7.x86_64.rpm: erlang-22.3.4.2-1.el7.x86_64 Marking erlang-22.3.4.2-1.el7.x86_64.rpm to be installed Examining rabbitmq-server-3.8.5-1.el7.noarch.rpm: rabbitmq-server-3.8.5-1.el7.noarch Marking rabbitmq-server-3.8.5-1.el7.noarch.rpm to be installed Resolving Dependencies --> Running transaction check ---> Package erlang.x86_64 0:22.3.4.2-1.el7 will be installed ---> Package rabbitmq-server.noarch 0:3.8.5-1.el7 will be installed --> Processing Dependency: socat for package: rabbitmq-server-3.8.5-1.el7.noarch Loading mirror speeds from cached hostfile * base: mirror.hkt.cc * extras: mirror.hkt.cc * updates: mirror.hkt.cc --> Running transaction check ---> Package socat.x86_64 0:1.7.3.2-2.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ==================================================================================================================== Package Arch Version Repository Size ==================================================================================================================== Installing: erlang x86_64 22.3.4.2-1.el7 /erlang-22.3.4.2-1.el7.x86_64 34 M rabbitmq-server noarch 3.8.5-1.el7 /rabbitmq-server-3.8.5-1.el7.noarch 15 M Installing for dependencies: socat x86_64 1.7.3.2-2.el7 base 290 k Transaction Summary ==================================================================================================================== Install 2 Packages (+1 Dependent package) Total size: 50 M Total download size: 290 k Installed size: 51 M Downloading packages: socat-1.7.3.2-2.el7.x86_64.rpm | 290 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : erlang-22.3.4.2-1.el7.x86_64 1/3 Installing : socat-1.7.3.2-2.el7.x86_64 2/3 Installing : rabbitmq-server-3.8.5-1.el7.noarch 3/3 Verifying : rabbitmq-server-3.8.5-1.el7.noarch 1/3 Verifying : socat-1.7.3.2-2.el7.x86_64 2/3 Verifying : erlang-22.3.4.2-1.el7.x86_64 3/3 Installed: erlang.x86_64 0:22.3.4.2-1.el7 rabbitmq-server.noarch 0:3.8.5-1.el7 Dependency Installed: socat.x86_64 0:1.7.3.2-2.el7 Complete! [root@Centos mq]#
安装完成后通常不须要额外的配置便可启动RabbitMQweb
[root@Centos mq]# systemctl start rabbitmq-server [root@Centos mq]# systemctl status rabbitmq-server ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled) Active: active (running) since Mon 2020-07-06 18:05:23 CST; 21s ago Main PID: 28698 (beam.smp) Status: "Initialized" CGroup: /system.slice/rabbitmq-server.service ├─28698 /usr/lib64/erlang/erts-10.7.2.1/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf ... ├─28801 erl_child_setup 32768 ├─28825 /usr/lib64/erlang/erts-10.7.2.1/bin/epmd -daemon ├─28845 inet_gethost 4 └─28846 inet_gethost 4 Jul 06 18:05:21 Centos rabbitmq-server[28698]: ########## Licensed under the MPL 1.1. Website: https://rabbitmq.com Jul 06 18:05:21 Centos rabbitmq-server[28698]: Doc guides: https://rabbitmq.com/documentation.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Support: https://rabbitmq.com/contact.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Tutorials: https://rabbitmq.com/getstarted.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Monitoring: https://rabbitmq.com/monitoring.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Logs: /var/log/rabbitmq/rabbit@Centos.log Jul 06 18:05:21 Centos rabbitmq-server[28698]: /var/log/rabbitmq/rabbit@Centos_upgrade.log Jul 06 18:05:21 Centos rabbitmq-server[28698]: Config file(s): (none) Jul 06 18:05:23 Centos systemd[1]: Started RabbitMQ broker. Jul 06 18:05:23 Centos rabbitmq-server[28698]: Starting broker... completed with 0 plugins. Hint: Some lines were ellipsized, use -l to show in full. [root@Centos mq]# ss -tan | grep 5672 LISTEN 0 128 *:25672 *:* LISTEN 0 128 [::]:5672 [::]:*
5672是工做端口,25672是集群间通讯商品。shell
启动web管理插件缓存
[root@Centos mq]# rabbitmq rabbitmqctl rabbitmq-plugins rabbitmq-server rabbitmq-diagnostics rabbitmq-queues rabbitmq-upgrade [root@Centos mq]# rabbitmq-plugins list warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell) Listing plugins with pattern ".*" ... Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@Centos |/ [ ] rabbitmq_amqp1_0 3.8.5 [ ] rabbitmq_auth_backend_cache 3.8.5 [ ] rabbitmq_auth_backend_http 3.8.5 [ ] rabbitmq_auth_backend_ldap 3.8.5 [ ] rabbitmq_auth_backend_oauth2 3.8.5 [ ] rabbitmq_auth_mechanism_ssl 3.8.5 [ ] rabbitmq_consistent_hash_exchange 3.8.5 [ ] rabbitmq_event_exchange 3.8.5 [ ] rabbitmq_federation 3.8.5 [ ] rabbitmq_federation_management 3.8.5 [ ] rabbitmq_jms_topic_exchange 3.8.5 [ ] rabbitmq_management 3.8.5 [ ] rabbitmq_management_agent 3.8.5 [ ] rabbitmq_mqtt 3.8.5 [ ] rabbitmq_peer_discovery_aws 3.8.5 [ ] rabbitmq_peer_discovery_common 3.8.5 [ ] rabbitmq_peer_discovery_consul 3.8.5 [ ] rabbitmq_peer_discovery_etcd 3.8.5 [ ] rabbitmq_peer_discovery_k8s 3.8.5 [ ] rabbitmq_prometheus 3.8.5 [ ] rabbitmq_random_exchange 3.8.5 [ ] rabbitmq_recent_history_exchange 3.8.5 [ ] rabbitmq_sharding 3.8.5 [ ] rabbitmq_shovel 3.8.5 [ ] rabbitmq_shovel_management 3.8.5 [ ] rabbitmq_stomp 3.8.5 [ ] rabbitmq_top 3.8.5 [ ] rabbitmq_tracing 3.8.5 [ ] rabbitmq_trust_store 3.8.5 [ ] rabbitmq_web_dispatch 3.8.5 [ ] rabbitmq_web_mqtt 3.8.5 [ ] rabbitmq_web_mqtt_examples 3.8.5 [ ] rabbitmq_web_stomp 3.8.5 [ ] rabbitmq_web_stomp_examples 3.8.5 [root@Centos mq]# rabbitmq-plugins enable rabbitmq_management warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell) Enabling plugins on node rabbit@Centos: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@Centos... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins. [root@Centos mq]# ss -tanl | grep 5672 LISTEN 0 128 *:25672 *:* LISTEN 0 128 *:15672 *:* LISTEN 0 128 [::]:5672 [::]:*
rabbitmq-management管理插件启用后会监听15672端口,做为管理web登录:http://ip:15672,默认用户为guest,密码guest,此用户只能从本地登录。bash
用户管理
rabbitmqctl add_user username password 增长用户 rabbitmqctl delete_user username 删除用户 rabbitmqctl change_password username newpassword 更改密码 rabbitmqctl set_user_tags username tag 设置权限tag
[root@Centos mq]# rabbitmqctl add_user admin admin123 Adding user "admin" ... [root@Centos mq]# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ...
上图中说明用户属于不一样的tag拥有的权限。
当RabbitMQ安装好后,仅有guest用户拥有对默认虚拟主机“/”有访问权限,而咱们本身增长的“admin”用户没有虚拟主机与之对应,因此咱们须要增长一个虚拟主机。
其中1-5是在工做中常见的模型。下边针对这5种模型展开讲解
专业术语
Message Broker: 消息代理,RabbitMQ就是一个消息代理server
Producing: 生产,指仅发送消息数据,发送消息数据的程序就是Producer
Queue: 队列,指RabbitMQ服务内部邮箱名称,是存储消息数据的容器,数据的存储载体,只受主机的内存和硬盘约束,实质是一个大的消息缓冲区。
Consuming:消费,接收消息,接收消息数据的程序就是Consumer
Channel: 通道,一个链接容许多个客户端链接
Exchange: 交换机(器),接收生产者发来的消息,决定如何路由给服务器中的队列。经常使用的类型有:direct(point-to-point)、topic(publish-subscribe)、fanout(multicast)
Bind: 绑定,创建消息队列和交换器间的关系,即交换器拿到数据,把什么样的数据送给哪一个队列
Virtual Hosts: 虚拟主机,一批交换机、消息队列和相关对象的集合。为了多用户互不干扰,使用虚拟主机分组交换机、消息队列
Topic: 主题
下边针对不一样的工做模型使用python代码来讲明讲解,使用的库为pika
$ pip install pika
这是最为简单的生产者消费者模型,消息队列就是一个FIFO的队列。
#producer.py import pika #创建链接 credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters('172.16.152.130', 5672, 'test_vh', # 虚拟主机 credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() # 建立一个通道 #通道上声明队列名称,server中没有此队列就会建立 channel.queue_declare(queue='hello') #使用默认交换机把body内容发送到名称为hello的队列中 channel.basic_publish(exchange='', # 为空字符表示使用默认的交换机 routing_key='hello', # exchange为空字符串时,必须使用routing_key,表示把消息发往哪一个队列 body='hello world!' # 消息主体 ) print("Sent 'hello world!'") connection.close() #关闭链接
运行该程序后,能够在web管理界面中查看到相应的Exchange,Queue已建立
可见一个虚拟主机下会自动建立各类类型的交换机。
上边代码中建立链接过于复杂,pika提供了另外一种更优雅的方法,代码修改以下:
#producer.py import pika #更优雅的方式建立链接参数 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 #通道上声明队列名称,server中没有此队列就会建立 channel.queue_declare(queue='hello') #使用默认交换机把body内容发送到名称为hello的队列中 channel.basic_publish(exchange='', #为空字符表示使用默认的交换机 routing_key='hello', #exchange为空字符串时,必须使用routing_key,表示把消息发往哪一个队列 body='hello world!!!!' #消息主体 ) print("Sent 'hello world!!!!'") connection.close() #关闭链接
消费方的代码以下:
#consumer.py import pika #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) #阻塞链接 channel = connection.channel() channel.queue_declare(queue='hello') #声明队列 #回调函数 def callback(ch, method, properties, body): print('Received: {}'.format(body)) print('channel: {}'.format(ch)) print('method: {}'.format(method)) print('properties: {}'.format(properties)) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True ) print('Waiting for message. To exit press CTRL + C') channel.start_consuming() #循环取队列数据
说明:
channel.basic_consume()
中的auto_ack=True
时,表示队列中的数据被消费后就被确认已被消费掉,若设置为False
那当前消费者程序断开后,以前被消费过的数据又被置为Ready
状态,即又能被消费者从新消费。
工做队列即为简单队列模型中的一个消费者变为多个消费者。把生产者producer.py
中生产数据略为修改:
#producer.py import pika import time #创建链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 #通道上声明队列名称,server中没有此队列就会建立 channel.queue_declare(queue='hello') #使用默认交换机把body内容发送到名称为hello的队列中 for i in range(40): channel.basic_publish(exchange='', #为空字符表示使用默认的交换机 routing_key='hello', #exchange为空字符串时,必须使用routing_key,表示把消息发往哪一个队列 body='data{:02}'.format(i) #消息主体 ) time.sleep(0.2) print("Sent 'hello world!!!!'") connection.close() #关闭链接
消费者代码:
#consumer1.py import pika #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) #阻塞链接 channel = connection.channel() channel.queue_declare(queue='hello') #声明队列 #回调函数 def callback(ch, method, properties, body): print('Received: {}'.format(body)) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True ) print('Waiting for message. To exit press CTRL + C') channel.start_consuming() #循环取队列数据
测试时先运行消费者代码,并复制多份运行,以运行两份为例,再运行生产者代码,能够从输出中观测到
#第一个消费者输出 Received: b'data00' Received: b'data02' Received: b'data04' Received: b'data06' Received: b'data08' Received: b'data10' Received: b'data12' Received: b'data14' Received: b'data16' Received: b'data18' ... Received: b'data38' #第二个消费者输出 Received: b'data01' Received: b'data03' Received: b'data05' Received: b'data07' Received: b'data09' Received: b'data11' Received: b'data13' Received: b'data15' Received: b'data17' Received: b'data19' ... Received: b'data39'
可知,这种工做模式是一种竞争的工做方式,消息队列中的一个消息只能由一个消费者消费,并且从结果可知,不一样的消费者取数据是以轮询
的方式。
简单队列
和工做队列
模式的图中没有画出交换机,但都使用了默认的交换机。
试想生活中的订报纸这样一个场景,全部的订阅者(消费者)订阅一份报纸(消息),都应该拿到一分内容相同的报纸。
报社发布报纸到邮局(Exchange),邮局决定经过怎样的方式把报纸送到订阅都的信箱,订阅者从本身的信箱(Queue)中获取报纸。
当前Publish/Subscribe这种模式,Exchange的类型为fanout
,即为一对多,广播模式。
多个Queue须要与Exchange创建关系,这里就是Binging。
生产者代码以下:
import pika import time #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 #定义交换机 channel.exchange_declare(exchange='logs', #指定交换机 exchange_type='fanout' #指定交换机类型 ) #向交换机中发送数据 for i in range(40): channel.basic_publish(exchange='logs', #指定交换机 routing_key='', #fanout类型不指定 body='data{:02}'.format(i) #消息主体 ) time.sleep(0.2) print("Sent 'hello world!!!!'") connection.close() # 关闭链接
生产者不关心queue,只关心数据要发往哪一个交换机。没有queue来存储数据意味着没有消费者时,生产者生产的数据发送到交换机后就丢弃掉。
消费者代码以下:
import pika #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 #定义交换机 channel.exchange_declare(exchange='logs', #指定交换机 exchange_type='fanout' #指定交换机类型 ) #生成queue result1 = channel.queue_declare(queue='', #为空字符串时会生成一个惟一的队列名称 exclusive=True #表示当前生成的队列只容许当前这个链接使用,链接一旦断开,当前队列也将自动删除 ) result2 = channel.queue_declare(queue='', exclusive=True) q1name = result1.method.queue #获取队列的名称 q2name = result2.method.queue print(q1name, q2name) #绑定binding channel.queue_bind(exchange='logs', queue=q1name) channel.queue_bind(exchange='logs', queue=q2name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): #print(ch, method, properties, body) print('Received: {}'.format(body)) channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) channel.basic_consume(queue=q2name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
先运行消费者代码后,logs交换机被建立,相应的queue被建立
再运行生产者代码,输出以下:
amq.gen-_pqdfOYPRb0kvKgLBEZSCw amq.gen-PUV29dEM8j2EKYjcmS78BQ [*] Waiting for logs. To exit press CTRL+C Received: b'data00' Received: b'data00' Received: b'data01' Received: b'data01' Received: b'data02' Received: b'data02' Received: b'data03' Received: b'data03' Received: b'data04' Received: b'data04' ... Received: b'data37' Received: b'data37' Received: b'data38' Received: b'data38' Received: b'data39' Received: b'data39'
每个消息都打印了两次,在实际生产环境中,若是一个生产者的数据有可能多个业务模块都须要获取,那能够采起此种模式,只要在该业务模块中指定相应的交换机,本身生成一个queue来缓存相应的数据便可。
若是先启动了生产者,接着启动消费者,那部分数据会被丢失。因没有queue来存储数据,exchange收到数据后就丢掉。
Routing模型就是数据发送到交换机后根据规则(routing_key)进行路由发送。该模型下交换机类型为direct
。
生产者代码:
import pika import time import random #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 x_name = 'color' #交换机名称 colors = ('orange', 'red', 'green', 'black') # routing_key # 定义交换机及类型 channel.exchange_declare(exchange=x_name, #交换机名称 exchange_type='direct', #路由 ) for i in range(20): rk = colors[random.randint(0, 3)] channel.basic_publish( exchange=x_name, routing_key=rk, body='data_{}_{:02}'.format(rk, i) ) time.sleep(0.2) connection.close()
消费者代码:
import pika #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 x_name = 'color' #交换机名称 colors = ('orange', 'red', 'green', 'black') #routing_key #定义交换机 channel.exchange_declare(exchange=x_name, #指定交换机 exchange_type='direct', #指定交换机类型 ) #生成queue result1 = channel.queue_declare(queue='', exclusive=True) q1name = result1.method.queue print(q1name) #绑定 channel.queue_bind(exchange=x_name, queue=q1name, routing_key=colors[0]) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): # print(ch, method, properties, body) print('Received: {}'.format(body)) channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
先运行消费者,再运行生产者,消费者的终端输出
amq.gen-DAOzdpOzfFodaJI9xj6b4Q [*] Waiting for logs. To exit press CTRL+C Received: b'data_orange_01' Received: b'data_orange_06' Received: b'data_orange_08' Received: b'data_orange_09' Received: b'data_orange_17' Received: b'data_orange_18'
只有routing_key=colors[0]
即为orange
的消息才被路由到了消费都定义的queue上后被消费者获取。
在web界面中也能查看到相应的交换机、queue、routing_key间的绑定关系
多重绑定
如图,若是一个routing_key被屡次绑定,那和fanout
模式就相似了,但又有不一样,fanout
时exchange不作数据过虑,而direct
时仍然会作数据过滤这个动做,只是过滤后会把相应的消息发送到多个队列中。
Topic的routing_key必须使用.
点号 分割的单词组成。支持通配符:
* 表示严格的一个单词 # 表示0个或多个单词
若是queue绑定的routing_key只是一个#
,这个queue其实能够接收全部消息,相似于fanout
若是没有使用任何通配符,效果相似于direct。
生产者代码:
import pika import time import random #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 x_name = 'products' #交换机名称 product_types = ('phone', 'pc', 'tv') #产品类型 colors = ('red', 'green', 'blue') #定义交换机及类型 channel.exchange_declare(exchange=x_name, #交换机名称 exchange_type='topic', #话题模式 ) for i in range(20): rk = '{}.{}'.format(product_types[random.randint(0, 2)], colors[random.randint(0, 2)]) msg = 'data_{}_{:02}'.format(rk, i) channel.basic_publish(exchange=x_name, routing_key=rk, body=msg ) time.sleep(0.2) connection.close()
消费者代码:
import pika #建立链接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通连上建立一个通道 x_name = 'products' #交换机名称 topics = ('phone.*', '*.red') #定义交换机 channel.exchange_declare(exchange=x_name, #指定交换机 exchange_type='topic', #指定交换机类型 ) #生成queue q1 = channel.queue_declare(queue='', exclusive=True) q1name = q1.method.queue #绑定 channel.queue_bind(exchange=x_name, queue=q1name, routing_key=topics[0]) #修改routing_key后再测试 def callback(ch, method, properties, body): #print(ch, method, properties, body) print('Received: {}'.format(body)) #消费 channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) #循环取队列数据 print('Waiting for message. To exit press CTRL+C') channel.start_consuming()
先运行消费者,再运行生产者,在消费者程序的终端中输出:
Waiting for message. To exit press CTRL+C Received: b'data_phone.blue_00' Received: b'data_phone.red_01' Received: b'data_phone.green_02' Received: b'data_phone.green_03' Received: b'data_phone.blue_06' Received: b'data_phone.blue_08' Received: b'data_phone.green_10' Received: b'data_phone.blue_16'
符合topics[0],即phone.*
的匹配模式。
Topic实质上也是direct,只是支持模式匹配而已。
RPC和Publisher Confirms使用较少,不作说明。
参考:
https://www.rabbitmq.com/install-rpm.html#downloads
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
https://www.rabbitmq.com/tutorials/tutorial-four-python.html
https://www.rabbitmq.com/tutorials/tutorial-five-python.html