在介绍RabbitMQ以前,咱们须要了解一些最基础的概念,相信使用过或者据说过RabbitMQ的人都不会陌生,但笔者仍是不厌其烦地在这里讲述,由于笔者的理念是self contained。html
Queue
: 队列。计算机数据结构中的一种基本类型,遵循“先入先出”(FIFO)的原则,好比咱们平常生活中常见的排队时的队伍就是一个队列。Message Queue
: 消息队列,简称MQ。消息队列本质上也是队列,只不过队列中的元素为Message(消息),而消息则是服务之间最多见的通讯方式。流行的MQ框架主要有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。AMQP
:Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准高级消息队列协议,简单来讲,它就是一个消息列队的协议,其标准高,要求严。Erlang
:Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种能够应对大规模并发活动的编程语言和运行环境。RabbitMQ
:RabbitMQ是一个实现了AMQP高级消息队列协议的消息队列服务,用Erlang语言实现。RabbitMQ的运行原理以下图(后续咱们会解释其中的含义,现阶段只做为浏览):以上是咱们对RabbitMQ的最初认识。接下来咱们还须要了解RabbitMQ的下载与安装,以下:java
说了这么多,咱们为何要选择RabbitMQ,也就是说它的优点
又是什么呢?RabbitMQ的强大之处在于:python
带着对RabbitMQ的初次见面,咱们不妨再了解下如何简单地使用RabbitMQ。编程
在计算机领域中,每次学习一个新事物的惊喜,每每都是伴随着Hello World
。在编程语言中,会有输出“Hello World”
;在大数据中,“Hello World”就是统计单词的词频
;在Docker中,就是使用“Hello World”镜像
;在RabbitMQ,此次的“Hello World”就是生产者发送“Hello World”,而消费者输出“Hello World”
。
RabbitMQ就是消息代理,它接受并推进消息流动。你能够把它想象成一个邮局:当你把一封信塞进邮箱,你须要确保它能送到收信人的手里。而RabbitMQ就是一个邮箱,邮局,邮递员。不一样于真实的邮局(处理信件),RabbitMQ处理接受、存储、推进消息。
在RabbitMQ,或者消息队列领域中,有以下术语。小程序
生产者(Producer)
:生产者仅产生消息,也就说一个产生消息的程序就是生产者。对应于邮局的例子,生产者就是寄信人,由于他们产生信件。队列(Queue)
: 一个队列就是RabbitMQ中的邮箱。尽管消息会在RabbitMQ和应用程序之间流动,可是它们只会在队列中存储。一个队列仅受限于硬盘和内存大小,它是一个大的消息缓存区。许多生产者产生消息后会进入一个队列,许多消费者也会从同一个队列中获取消息。如下是咱们如何表示一个队列:消费者(Consumer)
:消费消息与接收消息的意思是一致的。一个消费者每每会等待接收消息。在邮局的例子中,消费者也许就是收信人。 介绍完生产者、队列、消费者后,咱们将会来学习RabbitMQ中的Hello World。
咱们使用Python的Pika模块来操做RabbitMQ。在本文中,咱们将会编写两个小程序:一个生产者(Producer)发送一条消息,而一个消费者(Consumer)将会接收这个消息并将它输出。这就是消息通讯的“Hello World”。
在下图中,P表明生产者,C表明消费者,中间的盒子表明队列——消息缓存区。咱们总的设计图以下:api
生产者会将消息发送至“hello”队列,消费者从从该队列中获取消息。缓存
在这一部分中,咱们将会让生产者来发送消息。网络
咱们的第一个程序send.py
将会发送一个消息至队列。首先咱们要作的是创建与RabbitMQ Server的链接。数据结构
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
咱们链接到了本地机器(localhost
)的一个代理。若是咱们须要链接不一样机器的代理,咱们只须要声明机器名称以及IP地址便可。
接着,在咱们发送消息以前,咱们须要确认队列是否存在。若是咱们发送消息到一个不存在的地方,RabbitMQ将会丢失这条消息。所以,咱们须要建立一个hello
队列,这里将是消息传递的地方。并发
channel.queue_declare(queue='hello')
咱们已经准备好发送消息了。咱们的第一条消息是字符串“Hello World!”,咱们将它发送至hello
队列。
在RabbitMQ中,消息不会被直接发送至队列,它须要经过exchange
才能作到。在这里咱们不须要了解exchange
的原理,咱们只须要知道,空字符串就表明默认的exchange
。该exchange
很特殊——它规定了咱们的消息往哪一个队列走。队列名称须要用routing_key
这个参数来声明:
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在退出程序前,咱们须要确保网络缓存被清空而且咱们的消息确实被传送至RabbitMQ。通常咱们经过关闭链接来实现。
connection.close()
在这一部分中,咱们将会让消费者来接受消息。
咱们的第二个程序receive.py
将会从队列中接受消息,并把其输出出来。
一样地,第一步是链接到RabbitMQ Server。这部分的代码与以前的部分相同。
下一步,更以前同样,须要确保队列存在。使用queue_declare
来建立队列是幂等的(idempoten) —— 咱们能够运行这条命令不少次,但只会建立一个队列。
channel.queue_declare(queue='hello')
也许你会好奇咱们为何要再一次声明这个列队,明明咱们在以前的代码中已经声明过了。这里这么作主要是为了确保队列已经存在。举例来讲,这边是先运行send.py
,但咱们不能肯定哪个程序会先运行。所以在这样的状况下,在两个程序中反复声明列队是不错的方式。
从队列中接受消息更加复杂。他须要经过callback
函数与列队关联。不管何时咱们接受到消息,这个callback
函数都被会Pika模块调用。在咱们的例子中,这个函数将会输出消息的内容。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
下一步,咱们须要告诉RabbitMQ,在hello
队列中,这个特定的callback
函数须要接受消息。
channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
auto_ack
参数的含义会在后面的文章中解释。
最后咱们建立一个永不中止的循环,用于接收消息:
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
上面的部分介绍了“Hello World”的理论方面,接下来,咱们会分别使用Python和Java程序来分别实现这个例子。
sent.py程序以下:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World from Python!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py程序以下:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
先启动receive.py,程序会提示“ [*] Waiting for messages. To exit press CTRL+C”,代表该消费者在等待接收消息。在运行sent.py,该程序会发送“Hello World from Python!”至队列,同时receive.py会输出该消息。每运行一次sent.py,receive.py会就会输出一个该消息,以下图:
咱们使用Gradle来构建这个项目,项目结构以下:
在build.gradle中,咱们引入第三方jar包,内容以下:
plugins { id 'java' } group 'rabbitmq' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0' // https://mvnrepository.com/artifact/org.slf4j/slf4j-api compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26' // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26' }
Send.java代码以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World from Java!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
Recv.java的代码以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
具体的操做方法同Python同样。
若是咱们把Python的“Hello World”当作一个简单的小系统,而Java的“Hello World”也当作一个简单的小系统,那么RabbitMQ能够沟通这两个系统,这也是RabbitMQ的一个特定:系统对接。
咱们在Python中运行receive.py
,而运行Java的Send.java
三次,运行Python的sent.py
两次,结果以下:
这样的测试结果是使人吃惊的,由于咱们用RabbitMQ打通了两个不一样语言的系统!
本文做为RabbitMQ入门的第一篇,但愿能对你们有所帮助。笔者也是初学RabbitM,文章中确定有不足之处,恳请你们批评指正。
感谢你们的阅读~