RabbitMq是一个消息代理:它接收、存储、转发消息。它由3个组件构成,生产者、队列、消费者。ide
生产者:向队列中发送消息。函数
队列:存储生产者发送过来的消息,并转发给消费者。spa
消费者:接收到队列转发过来的消息,消费处理。3d
1.简单队列模型图代理
2.实现生产者code
① 声明对列名blog
private final static String QUEUE_NAME = "hello.rabbitmq";
② 建立链接---链接RabbitMQrabbitmq
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = null; try { connection = factory.newConnection(); }catch (Exception e){ e.printStackTrace(); }
③ 建立通道队列
Channel channel = connection.createChannel();
④ 通道中声明队列内存
channel.queueDeclare(QUEUE_NAME ,true,false,false,null);
⑤ 发送消息
String mssage = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
⑥ 关闭链接
channel.close();
connection.close();
3.实现消费者
方式一:
①-④:前4步骤与生产者彻底相同
⑤:收到消息以后的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); };
⑥:建立消费者。从指定队列中消费消息
channel.basicConsume(QUEUE_NAME, true,deliverCallback,consumerTag->{});
①-④:与生产这彻底相同。
⑤:定义消费者,该消费者内包含 handleDelivery()方法,当消息到达队列以后,就会触发这个方法。
//定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println(message); //手动回执 channel.basicAck(envelope.getDeliveryTag(),false); }
};
⑥:监听队列
channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,consumer);
● 轮询模式(默认)
public void selectAll(){ //MQUtil的具体实现参照简单队列生产者中第二步骤 Connection connection = MQUtil.getConnection(); Channel channel = null; try {
//得到信道 channel = connection.createChannel();for(int i=0;i< 50;i++){ String message="消息:"+i;
//声明队列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null); //发送消息
channel.basicPublish("",UserQueues.HELLO_RABBITMQ_SELECT_ALL,null,message.getBytes()); Thread.sleep(1000); } channel.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } }
● 公平分发模式
public void selectAll(){ Connection connection = MQUtil.getConnection(); Channel channel = null; try { channel = connection.createChannel(); //限制每次只给同一个消费者发送一条消息,收到消息确认以后再次发送第二条消息
channel.basicQos(1); for(int i=0;i< 50;i++){ String message="消息:"+i; channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null); channel.basicPublish("",UserQueues.HELLO_RABBITMQ_SELECT_ALL,null,message.getBytes()); Thread.sleep(1000); } channel.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } }
● 轮询模式(默认)public void selectAllUser(byte[] bytes){ Connection connection = MQUtil.getConnection(); try { Channel channel = connection.createChannel(); //定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) { //获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String message = new String(body); System.out.println(message); } }; //监听队列
boolean autoack = true;
channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,autoack,consumer); }catch (Exception e){ e.printStackTrace(); } }
● 公平分发模式
public void selectAllUser(byte[] bytes){ Connection connection = MQUtil.getConnection(); try { Channel channel = connection.createChannel(); channel.basicQos(1); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println(message); //手动回执,处理完消息以后,回馈给队列 channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听队列,第二个参数改成false,意思是关闭自动应答,采用手动应答的模式
boolean autoack = false; channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,autoack,consumer); }catch (Exception e){ e.printStackTrace(); } }
以上两种消费者,轮询模式:采用自动确认模式。一旦rabbitmq将消息发送到消费者,消息就会从内存中删除,此时若是消费者还没有消费完成,可是由于某种缘由,消费者挂掉,还没有消费完成的消息就会丢失。
公平分发模式:采用的手动确认模式。rabbitmq发送消息给消费者,若是此消费者发生异常,该消息将交给另外一个正常的消费者去消费,消费完成以后,消费这发送ack消息告诉rabbitmq。本身消费完成,rabbitmq从内存中删除该消息。
消息应答主要是为了解决消费者发生异常时的处理方法,可是当Rabbitmq集群发生异常时,也会形成消息的丢失。为了应对这种状况,咱们须要将RabbitMQ中的消息持久化,将消息保存在磁盘中。
设置方式:
在生产者中实现,在生命队列时,将第二个参数设置为true便可;
channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null);
①:将生产者的消息,发送到队列当中,供消费者消费。
②:在rabbitmq中只有队列具备存储能力,交换机是不具有存储能力的。
③:交换机的几种类型:
● Fanout(不处理路由键)
特色:路由键为空字符串,交换机将消息装发到全部与之绑定的队列中。
● Direct(处理路由键)
特色:生产者与消费者都有定义的路由键,转发器将消息转发到与之绑定的而且与生产者有相同路由键的队列当中。
●
P:生产者
X:交换机(转发器),生产者发送的消息通过交换机,转发到多个消息队列当中。
目的:向交换机中发送消息
步骤:
①:经过ConnectionFactory类,与rabbitmq获取链接。
②:经过该链接建立一个通道,接下来的工做都是在该通道内部完成。
③:在通道内声明交换机。
④:向交换机发送消息。
实现:
/** * 向交换机中发送消息 * */ public class Send { public static void main(String[] args) throws Exception{ String message = "rabbitmq的交换机"; //与mq得到链接 Connection connection = MQUtil.getConnection(); //建立一个通道 Channel channel = connection.createChannel(); //声明交换机,第二个参数表明交换机的类型 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,"fanout"); System.out.println(message); //向交换机中发送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,"",null,message.getBytes()); //断开链接 channel.close(); connection.close(); } }
目的:从队列中消费消息
步骤:
①:经过ConnectionFactory类,与rabbitmq获取链接。
②:经过该链接建立信道。
③:在信道内声明队列。
④:将队列绑定到交换机。
⑤:定义消费者。
⑥:监听队列。
实现:
public class Reve1 { public static void main(String[] args) throws Exception{ Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_EXCHANGE11,true,false,false,null); //绑定队列到交换机 channel.queueBind(UserQueues.HELLO_RABBITMQ_EXCHANGE11,MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,""); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("队列1:"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听队列 channel.basicConsume(UserQueues.HELLO_RABBITMQ_EXCHANGE11,false,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }
目的: 声明Direct类型的交换机,将消息发送到交换机,向交换机发送消息时,设置路由键
实现:
public class Send { public static void main(String[] args) throws Exception{ String message="rabbitmq路由模式"; Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,"direct"); System.out.println(message); String routingKey="info"; //向交换机发送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,routingKey,null,message.getBytes()); channel.close(); connection.close(); } }
目的:从队列中消费消息
实现:
public class Rece1 { public static void main(String[] args) throws Exception{ Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1,true,false,false,null); String routingKey="error"; //将队列绑定到交换机 channel.queueBind(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1, MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,routingKey); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("消费者1:"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1,false,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }
①:路由模式中交换机与队列之间使用的是固定的routingKey来绑定的,主题模式中可使用统配符。
②:# 匹配多个,* 匹配单个。
/** * 主题模式: * 第三种交换机:通配符匹配交换机类型 * #:匹配所有 * *:匹配单个 */ public class Send { public static void main(String[] args) throws Exception{ String message="主题模式交换机"; String routingKey="order.save"; System.out.println(message); Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,"topic"); //向交换机发送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,routingKey,null,message.getBytes()); channel.close(); connection.close(); } }
public class Reve1 { public static void main(String[] args) throws IOException,InterruptedException, TimeoutException { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,true,false,false,null); //将队列绑定到交换机 channel.queueBind(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,"order.#"); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者1"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; //定义监听 boolean autoAck=false; channel.basicConsume(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,autoAck,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }