生活不止眼前的苟且,还有永远读不懂的诗和到不了的远方。html
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。全部主要的编程语言均有与代理接口通信的客户端库。java
RabbitMQ
下载地址,本系列咱们采用docker
的方式来安装RabbitMQ
,RabbitMQ
的docker
镜像地址。关于如何docker
的安装和使用可参考如下连接;git
启动RabbitMQ
github
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone daocloud.io/library/rabbitmq:3.7.14-management-alpine
docker
安装完成以后可经过http://localhost:15672/#/
访问图形界面(用户名密码均为admin
)编程
Server
简单来讲就是消息队列服务器实体Exchange
消息交换机,它指定消息按什么规则,路由到哪一个队列Queue
消息队列载体,每一个消息都会被投入到一个或多个队列Binding
: 绑定,它的做用就是把exchange
和queue
按照路由规则绑定起来Routing Key
: 路由关键字,exchange
根据这个关键字进行消息投递VHost
: 虚拟主机,一个broker
里能够开设多个vhost
,用做不一样用户的权限分离。Producer
: 消息生产者,就是投递消息的程序Consumer
: 消息消费者,就是接受消息的程序Channel
: 消息通道,在客户端的每一个链接里,可创建多个channel
,每一个channel
表明一个会话任务注意:由Exchange
、Queue
、RoutingKey
三个才能决定一个从Exchange
到Queue
的惟一的线路。服务器
关于RabbitMQ
消息模型架构
Direct
交换机:彻底根据key
进行投递。Topic
交换机:在key
进行模式匹配后进行投递。Fanout
交换机:它采起广播模式,消息进来时,将会被投递到与改交换机绑定的全部队列中。package com.niocoder.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Procuder {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 2. 经过连接工厂建立链接
Connection connection = factory.newConnection();
// 3. 经过connection 建立一个channel
Channel channel = connection.createChannel();
// 4. 经过channel发送数据
for (int i = 0; i < 3; i++) {
String msg = "Hello World";
/** * * @param exchange the exchange to publish the message to * * @param routingKey the routing key * * @param props other properties for the message - routing headers etc * * @param body the message body */
channel.basicPublish("", "hello", null, msg.getBytes());
}
// 5. 关闭链接
channel.close();
connection.close();
}
}
复制代码
package com.niocoder.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 建立ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 2. 经过连接工厂建立链接
Connection connection = factory.newConnection();
// 3. 经过connection 建立一个channel
Channel channel = connection.createChannel();
// 4. 建立一个队列
String queueName = "hello";
/** * * @param queue the name of the queue * * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * * @param arguments other properties (construction arguments) for the queue */
channel.queueDeclare(queueName, true, false, false, null);
// 5. 建立消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 6. 设置channel
channel.basicConsume(queueName,true,consumer);
while (true){
// 7. 获取消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
复制代码
先启动消费者,再启动生产者框架