人生终将是场单人旅途,孤独以前是迷茫,孤独事后是成长。html
先给你们说声抱歉,最近一周都没有发文,有一些比较要紧重要的事须要处理。java
今天正好得空,原本说准备写SpringIOC
相关的东西,可是发现想要梳理一遍仍是须要不少时间,因此我打算慢慢写,先把MQ给写了,再慢慢写其余相关的,毕竟偏理论的东西一遍要比较难写,像MQ这种偏实战的你们能够clone代码去玩一玩,仍是比较方便的。git
同时MQ也是Java进阶没必要可少的技术栈之一,因此Java开发从业者对它是必需要了解的。程序员
如今市面上有三种消息队列比较火分别是:RabbitMQ
,RocketMQ
和Kafka
。github
今天要讲的消息队列中我会以RabbitMQ
做为案例来入门,由于SpringBoot的amqp中默认只集成了RabbitMQ
,用它来说会方便许多,且RabbitMQ
的性能和稳定性都很不错,是一款通过时间考验的开源组件。面试
祝有好收获。spring
消息队列(MQ)全称为Message Queue,是一种应用程序对应用程序的通讯方法。数组
翻译一下就是:在应用之间放一个消息组件,而后应用双方经过这个消息组件进行通讯。浏览器
好端端的为啥要在中间放个组件呢?
小系统实际上是用不到消息队列的,通常分布式系统才会引入消息队列,由于分布式系统须要抗住高并发,须要多系统解耦,更须要对用户比较友好的响应速度,而消息队列的特性能够自然解耦,方便异步更能起到一个顶住高并发的削峰做用,完美解决上面的三个问题。
然万物抱阳负阴,系统之间忽然加了个中间件,提升系统复杂度的同时也增长了不少问题:
这些都是使用消息队列过程当中须要思考须要考虑的地方,消息队列能给你带来很大的便利,也能给你带来一些对应的麻烦。
上面说了消息队列带来的好处以及问题,而这些不在咱们今天这篇的讨论范围以内,我打算以后再写这些,咱们今天要作的是搭建出一个消息队列环境,让你们感觉一下基础的发消息与消费消息,更高级的问题会放在之后讨论。
RabbitMQ是一个消息组件,是一个erlang开发的AMQP(Advanced Message Queue)的开源实现。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ采用了AMQP协议,至于这协议怎么怎么样,咱们关心的是RabbitMQ
结构如何且怎么用。
仍是那句话,学东西须要先观其大貌,咱们要用RabbitMQ首先要知道它总体是怎么样,这样才有利于咱们接下来的学习。
咱们先来看看我刚画的架构图,由于RabbitMQ实现了AMQP协议,因此这些概念也是AMQP中共有的。
Broker
: 中间件自己。接收和分发消息的应用,这里指的就是RabbitMQ Server。
Virtual host
: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,相似于网络中的namespace概念。当多个不一样的用户使用同一个RabbitMQ server提供的服务时,能够划分出多个vhost,每一个用户在本身的vhost建立exchange/queue等。
Connection
: 链接。publisher/consumer和broker之间的TCP链接。断开链接的操做只会在client端进行,Broker不会断开链接,除非出现网络故障或broker服务出现问题。
Channel
: 渠道。若是每一次访问RabbitMQ都创建一个Connection,在消息量大的时候创建TCP Connection的开销会比较大且效率也较低。Channel是在connection内部创建的逻辑链接,若是应用程序支持多线程,一般每一个thread建立单独的channel进行通信,AMQP method包含了channel id帮助客户端和message broker识别channel,因此channel之间是彻底隔离的。Channel做为轻量级的Connection极大减小了操做系统创建TCP connection的开销。
Exchange
: 路由。根据分发规则,匹配查询表中的routing key,分发消息到queue中去。
Queue
: 消息的队列。消息最终被送到这里等待消费,一个message能够被同时拷贝到多个queue中。
Binding
: 绑定。exchange和queue之间的虚拟链接,binding中能够包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
看完了这些概念,我再给你们梳理一遍其流程:
当咱们的生产者端往Broker(RabbitMQ)
中发送了一条消息,Broker
会根据其消息的标识送往不一样的Virtual host
,而后Exchange
会根据消息的路由key
和交换器类型将消息分发到本身所属的Queue
中去。
而后消费者端会经过Connection
中的Channel
获取刚刚推送的消息,拉取消息进行消费。
Tip:某个Exchange
有哪些属于本身的Queue
,是由Binding
绑定关系决定的。
上面讲了RabbitMQ
大概的结构图和一个消息的运行流程,讲完了理论,这里咱们就准备实操一下吧,先进行RabbitMQ安装。
官网下载地址:www.rabbitmq.com/download.ht…
因为我尚未属于本身MAC电脑,因此这里的演示就按照Windows的来了,不过你们都是程序员,安装个东西总归是难不倒你们的吧😂
Windows下载地址:www.rabbitmq.com/install-win…
进去以后能够直接找到Direct Downloads,下载相关EXE程序进行安装就能够了。
因为RabbitMQ
是由erlang语言编写的,因此安装以前咱们还须要安装erlang环境,你下载RabbitMQ
以后直接点击安装,若是没有相关环境,安装程序会提示你,而后会让你的浏览器打开erlang的下载页面,在这个页面上根据本身的系统类型点击下载安装便可,安装完毕后再去安装RabbitMQ
。
这二者的安装都只须要一直NEXT
下一步就能够了。
安装完成以后能够按一下Windows
键看到效果以下:
Tip:其中Rabbit-Command后面会用到,是RabbitMQ的命令行操做台。
安装完RabbitMQ
咱们须要对咱们的开发环境也导入RabbitMQ
相关的JAR包。
为了方便起见,咱们能够直接使用Spring-boot-start
的方式导入,这里面也会包含全部咱们须要用到的RabbitMQ
相关的JAR包。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
复制代码
直接引入spring-boot-starter-amqp
便可。
搭建好环境以后,咱们就能够上手了。
考虑到这是一个入门文章,读者不少可能没有接触过RabbitMQ
,直接使用自动配置的方式可能会令你们很迷惑,由于自动配置会屏蔽不少细节,致使你们只看到了被封装后的样子,不利于你们理解。
因此在本节Hello World
这里,我会直接使用最原始的链接方式就行演示,让你们看到最原始的链接的样子。
Tip:这种方式演示的代码我都在放在prototype
包下面。
先来看看生产者代码,也就是咱们push消息的代码:
public static final String QUEUE_NAME = "erduo";
// 建立链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 链接到本地server
connectionFactory.setHost("127.0.0.1");
// 经过链接工厂建立链接
Connection connection = connectionFactory.newConnection();
// 经过链接建立通道
Channel channel = connection.createChannel();
// 建立一个名为耳朵的队列,该队列非持久(RabbitMQ重启后会消失)、非独占(非仅用于此连接)、非自动删除(服务器将再也不使用的队列删除)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();
// 发布消息
// 四个参数为:指定路由器,指定key,指定参数,和二进制数据内容
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息结束,发送内容为:" + msg);
channel.close();
connection.close();
复制代码
代码我都给了注释,可是我仍是要给你们讲解一遍,梳理一下。
先经过RabbitMQ
中的ConnectionFactory
配置一下将要链接的server-host,而后建立一个新链接,再经过此链接建立通道(Channel
),经过这个通道建立队列和发送消息。
这里看上去仍是很好理解的,我须要把建立队列和发送消息这里再拎出来讲一下。
建立队列
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
复制代码
建立队列的方法里面有五个参数,第一个是参数是队列的名称,日后的三个参数表明不一样的配置,最后一个参数是额外参数。
durable:表明是否将此队列持久化。
exclusive:表明是否独占,若是设置为独占队列则此队列仅对首次声明它的链接可见,并在链接断开时自动删除。
autoDelete:表明断开链接后是否自动删除此队列。
arguments:表明其余额外参数。
这些参数中durable常常会用到,它表明了咱们能够对队列作持久化,以保证RabbitMQ
宕机恢复后此队列也能够自行恢复。
发送消息
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
复制代码
发送消息的方法里是四个参数,第一个是必须的指定exchange,上面的示例代码中咱们传入了一个空字符串,这表明咱们交由默认的匿名exchange去帮咱们路由消息。
第二个参数是路由key,exchange会根据此key对消息进行路由转发,第三个参数是额外参数,讲消息持久化时会用到一下,最后一个参数就是咱们要发送的数据了,须要将咱们的数据转成字节数组的方式传入。
测试
讲完了这些API以后,咱们能够测试一下咱们的代码了,run一下以后,会在控制台打出以下:
这样以后咱们就把消息发送到了RabbitMQ
中去,此时能够打开RabbitMQ控制台(前文安装时提到过)去使用命令rabbitmqctl.bat list_queues
去查看消息队列如今的状况:
能够看到有一条message
在里面,这就表明咱们的消息已经发送成功了,接下来咱们能够编写一个消费者对里面的message
进行消费了。
消费者代码和生产者的差很少,都须要创建链接创建通道:
// 建立链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 链接到本地server
connectionFactory.setHost("127.0.0.1");
// 经过链接工厂建立链接
Connection connection = connectionFactory.newConnection();
// 经过链接建立通道
Channel channel = connection.createChannel();
// 建立消费者,阻塞接收消息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------------------------------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消息内容 : " + msg);
}
};
// 启动消费者消费指定队列
channel.basicConsume(Producer.QUEUE_NAME, consumer);
// channel.close();
// connection.close();
复制代码
创建完通道以后,咱们须要建立一个消费者对象,而后用这个消费者对象去消费指定队列中的消息。
这个示例中咱们就是新建了一个consumer
,而后用它去消费队列-erduo
中的消息。
最后两句代码我给注释掉了,由于一旦把链接也关闭了,那咱们的消费者就不能保持消费状态了,因此要开着链接,监听此队列。
ok,运行这段程序,而后咱们的消费者会去队列-erduo
拿到里面的消息,效果以下:
consumerTag:是这个消息的标识。
exchangeName:是这个消息所发送exchange的名字,咱们先前传入的是空字符串,因此这里也是空字符串。
exchangeName:是这个消息所发送路由key。
这样咱们的程序就处在一个监听的状态下,你再次调用生产者发送消息消费者就会实时的在控制上打印消息内容。
上面咱们演示了生产者和消费者,咱们生产者发送一条消息,消费者消费一条信息,这个时候咱们的RabbitMQ
应该有多少消息?
理论上来讲发送一条,消费一条,如今里面应该是0才对,可是如今的状况并非:
消息队列里面仍是有1条信息,咱们重启一下消费者,又打印了一遍咱们消费过的那条消息,经过消息上面的时间咱们能够看出来仍是当时咱们发送的那条信息,也就是说咱们消费者消费过了以后这条信息并无被删除。
这种情况出现的缘由是由于RabbitMQ
消息接收确认机制,也就是说一条信息被消费者接收到了以后,须要进行一次确认操做,这条消息才会被删除。
RabbitMQ
中默认消费确认是手动的,也能够将其设置为自动删除,自动删除模式消费者接收到消息以后就会自动删除这条消息,若是消息处理过程当中发生了异常,这条消息就等于没被处理完可是也被删除掉了,因此这里咱们会一直使用手动确认模式。
消息接受确认(ACK)的代码很简单,只要在原来消费者的代码里加上一句就能够了:
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------------------------------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消息内容 : " + msg);
// 消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("消息已确认");
}
};
复制代码
咱们将代码改为如此以后,能够再run一次消费者,能够看看效果:
再来看看RabbitMQ
中的队列状况:
从图中咱们能够看出消息消费后已经成功被删除了,其实大胆猜一猜,自动删除应该是在咱们的代码还没执行以前就帮咱们返回了确认,因此这就致使了消息丢失的可能性。
咱们采用手动确认的方式以后,能够先将逻辑处理完毕以后(可能出现异常的地方能够try-catch
起来),把手动确认的代码放到最后一行,这样若是出现异常状况致使这条消息没有被确认,那么这条消息会在以后被从新消费一遍。
今天的内容就到这里,下一篇将会咱们将会撇弃传统的手动创建链接的方式进行发消息收消息,而转用Spring帮咱们定义好的注解和Spring提供的RabbitTemplate,更方便的收发消息。
消息队列呢,其实用法都是同样的,只是各个开源消息队列的侧重点稍有不一样,咱们应该根据咱们本身的项目需求来决定咱们应该选取什么样的消息队列来为咱们的项目服务,这个项目选型的工做通常都是开发组长帮大家作了,通常是轮不到咱们来作的,可是面试的时候可能会考察相关知识,因此这几种消息队列咱们都应该有所涉猎。
好了,以上就是本期的所有内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍大家的每一个点赞都是我创做的最大动力。
我是耳朵,一个一直想作知识输出的伪文艺程序员,咱们下期见。