为何要使用MQ消息中间件?它解决了什么问题?关于为何要使用消息中间件?消息中间件是如何作到同步变异步、流量削锋、应用解耦的?网上已经有不少说明,我这里就再也不说明。我在接下来的RabbitMq系列博客里会将官方的讲解翻译过来,同时加以本身的理解整理成博客,但愿能和你们共同交流,一块儿进步。java
RabbitMq原理图spring
RabbitMq是一个消息中间件:它接收消息、转发消息。你能够把它理解为一个邮局:当你向邮箱里寄出一封信后,邮递员们就能最终将信送到收信人手中。相似的,RabbitMq就比如是一个邮箱、邮局和邮递员。RabbitMq和邮局最大的区别是:RabbitMq接收、转发的都是二进制数据块--消息,而不是纸质的数据文件。api
RabbitMq、消息相关术语以下:数组
生产者:生产者只发送消息,发送消息的程序即为生产者:缓存
消息队列:消息队列就至关于RabbitMq中的邮箱名称。尽管消息在你的程序和RabbitMq中流动,但它只能存储在消息队列中。队列本质上是一个大的消息缓存,它能存多少消息,取决于主机的内存和磁盘限制。多个生产者能够往同一个消息队列中发送消息;多个消费者能够从同一个队列中获取数据。咱们如下列图形来表示一个消息队列:bash
消费者:消费者是一个等待接收消息的程序:服务器
注意:生产者、消费者和RabbitMq能够在不一样的机器上;在不少的应用中,一个生产者同时也多是消费者。微信
在这小节里,咱们将写一个消息生产者用来发送消息、一个消息消费者来消费消息(接收消息并打印出来)。框架
在下面图形中,“P”是咱们的生产者,“C”是咱们的消费者,中间的红框是咱们的消息队列,保存了从生产者那里接收到的准备转发到消费方的消息。异步
RabbitMq使用多种协议,本指南使用AMQP 0-9-1协议,该协议是一个开源的、通用的消息协议。RabbitMq有多种语言的客户端,这里咱们使用JAVA语言的客户端作实验。经过如下地址下载RabbitMq客户端jar包和依赖包:
把这三个jar包拷贝到你的工做目录,包括后面教程要新建的java文件。
生产者链接RabbitMq,发送一条简单的消息”Hello World!“后就退出。
在Send.java类中,须要引入如下依赖包:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
复制代码
给队列起个名字:
1 public class Send {
2 private final static String QUEUE_NAME = "hello";
3 public static void main(String[] argv) throws Exception {
4 ...
5 }
6 }
复制代码
建立链接到服务器的链接Collection:
1 onnectionFactory factory = new ConnectionFactory();
2 factory.setHost("localhost");
3 try (Connection connection = factory.newConnection();
4 Channel channel = connection.createChannel()) {
5
6 }
复制代码
这个链接即套接字链接,为咱们处理协议版本协商和身份验证等。这里咱们链接一个本地的RabbitMq:所以是localhost,若是你想要链接一个远程机器上的RabbitMq,只须要把localhst改为那台机器的计算机名或是IP地址。
建立完链接以后,咱们继续建立一个信道:Channel。咱们须要使用try-with-resource表达式,由于Connection和Channel都实现了JAVA接口Closeable,属于资源,须要关闭,这样咱们就不须要显示地在咱们的代码中进行关闭了。(关于信道,请参考文章最顶部的RabbitMq原理图,是TCP里面的虚拟连接,例如:电缆至关于一个TCP,信道就是里面的一个独立光纤,一条TCP上面建立多条信道是没有问题的;TCP一旦打开就分建立AMQP信道;不管是发布消息、接收消息、订阅队列,这些动做都是经过信道完成的)。
为了发送消息,咱们还必需要定义一个须要发送到的消息队列,这些都要使用try-with-resource表达式:
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2 String message = "Hello World!";
3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
4 System.out.println(" [x] Sent '" + message + "'");
复制代码
定义一个消息队列是幂等的:只有该队列不存在的时候才能被建立,消息是二进制数组,所以你能够根据须要指定编码。
完成的Send.java以下:
1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4
5 public class Send {
6
7 private final static String QUEUE_NAME = "hello";
8
9 public static void main(String[] argv) throws Exception {
10 ConnectionFactory factory = new ConnectionFactory();
11 factory.setHost("localhost");
12 try (Connection connection = factory.newConnection();
13 Channel channel = connection.createChannel()) {
14 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 String message = "Hello World!";
16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
17 System.out.println(" [x] Sent '" + message + "'");
18 }
19 }
20 }
复制代码
消费者监听RabbitMq中的消息,所以与生产者发送一条消息就退出不一样,消费者要保持运行状态来接收消息并打印出来。
Recv.java一样须要导入如下依赖包:
1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4 import com.rabbitmq.client.DeliverCallback;
复制代码
与生产者相同,咱们须要建立Connetcion和Channel、定义队列(须要监听并接收消息的队列):
1 public class Recv {
2
3 private final static String QUEUE_NAME = "hello";
4
5 public static void main(String[] argv) throws Exception {
6 ConnectionFactory factory = new ConnectionFactory();
7 factory.setHost("localhost");
8 Connection connection = factory.newConnection();
9 Channel channel = connection.createChannel();
10
11 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
13
14 }
15 }
复制代码
注意咱们也在这里声明队列,由于咱们可能在生产者以前启动消费者,咱们想要确保在咱们尝试消费消息的时候队列就已经存在了。
这里咱们为何不使用try-with-resource表达式自动关闭channl和connection?经过这样,咱们就可使咱们的程序一直保持运行状态,若是把这些关了,程序也就中止了。这就尴尬了,由于咱们须要保持消费者一直处于异步监听消息过来的状态。
RabbitMq会将队列中的消息异步地推送过来,咱们须要提供一个回调函数来缓存消息直到咱们须要用到这些消息:
1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
2 String message = new String(delivery.getBody(), "UTF-8");
3 System.out.println(" [x] Received '" + message + "'");
4 };
5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
复制代码
Rec.java完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
复制代码
在官方手册中,测试部分他们是将客户端jar和依赖jar添加到classpath路径,而后在cmd终端来运行的,我以为麻烦,所以,我这里放到IDEA中来运行,效果是同样的。
第一步:首先运行Send.java:
输出结果:
[x] Sent 'Hello World!'
复制代码
查看RabbitMq控制台:
说明消息已经发送成功。
第二步:启动消费者Recv.java:
输出结果:
[x] Received 'Hello World!'
复制代码
说明消息已经消费成功了,此时再查看控制台:
消息依然存在在队列中,可是区别是,在第一张图中Ready由1变成了0,Unacknowledged由0变成了1;第二张图中Ready也由1变成0,Unacked由0变成了1。为何会这样?按道理,消息消费了以后就应该删除掉,不然可能形成重复消费。关于这方面知识,将会在后面的章节中再介绍(Ack机制)。
上面虽然实现了功能,但在实际工做中,咱们更多的多是使用SpringBoot、SpringCloud等成熟的框架来实现。本小节就经过SpringBoot来实现以上功能。
建立工程的时候选择RabbitMq:
工程目录以下:
Provider和Consumer的配置文件相同,IP请替换成你本身的:
1 #RabbitMq
2 spring.rabbitmq.host=192.168.xx.xx
3 spring.rabbitmq.username=rabbitmq
4 spring.rabbitmq.password=123456
5
6 hello_world.queue=hello
复制代码
为方便让系统启动时就往队列发送消息,因此写了一个SenderRunner类:
1 @Component
2 public class SenderRunner implements ApplicationRunner {
3
4 @Autowired
5 private Send send;
6
7 @Override
8 public void run(ApplicationArguments args) throws Exception {
9 send.doSender("Hello RabbitMq");
10 }
11 }
复制代码
Send.java
1 @Component
2 public class Send {
3
4 @Value("${hello_world.queue}")
5 private String queueName;
6
7 @Autowired
8 private AmqpTemplate amqpTemplate;
9
10 public void doSender(String msg) {
11
12 amqpTemplate.convertAndSend(queueName,msg);
13 System.out.println("发送消息:" + msg);
14 }
15 }
复制代码
启动类:
1 @SpringBootApplication
2 public class ProviderApplication {
3 public static void main(String[] args) {
4 SpringApplication.run(ProviderApplication.class, args);
5 }
6 }
复制代码
Recv.java
@Component
public class Recv {
@RabbitListener(queues = "${hello_world.queue}")
public void receive(String msg) {
System.out.println("接收到消息:" + msg);
}
}
复制代码
启动Provider:
查看控制台:
启动Consumer:
可见,SpringBoot为咱们作了不少封装,隐藏了不少底层的细节,使用起来简单多了。(未完待续......)