生产者:python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
# 声明一个管道,在管道里发消息
# 声明queue
channel.queue_declare(queue
=
'hello'
, durable
=
True
)
# 在管道里还得声明一个队列
# durable只是把队列持久化,消息不持久化
channel.basic_publish(exchange
=
'',
routing_key
=
'hello'
,
# 就是列队queue名字
body
=
'Hello World'
,
# 消息内容
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
#消息持久化若是队列没有设置durable=True的话消息是没有办法持久化的
)
)
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
# 不用关闭管道,关闭链接就行
|
消费者:服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import
pika
# 创建到达RabbitMQ Server的connection
# 此处RabbitMQ Server位于本机-localhost
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
# 声明queue,确认要从中接收message的queue
# queue_declare函数是幂等的,可运行屡次,但只会建立一次
# 若能够确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
# 但在producer和consumer中重复声明queue是一个好的习惯
channel.queue_declare(queue
=
'hello'
,durable
=
True
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.basic_qos(prefetch_count
=
1
)
#若是有一个消息,服务器就不发,没消息就发
# 定义回调函数
# 一旦从queue中接收到一个message回调函数将被调用
# ch:channel
# method:
# properties:
# body:message
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
#执行完后确认,client执行完后给rabbitmq返回的一个标识,收到这个标识后rabbitmq认为这个消息处理完了,不会在重复发送给其余client继续执行
# 从queue接收message的参数设置
# 包括从哪一个queue接收message,用于处理message的callback,是否要确认message
# 默认状况下是要对消息进行确认的,以防止消息丢失。
# 此处将no_ack明确指明为True,不对消息进行确认。
channel.basic_consume(callback,
queue
=
"hello"
,
#no_ack=True#不对消息确认
)
# 开始循环从queue中接收message并使用callback进行处理
channel.start_consuming()
|