模型:java
获取 MQ 链接ide
public static Connection getConnection() throws IOException, TimeoutException { // 定义一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setHost("127.0.0.1"); // AMQP 5672 factory.setPort(5672); // vhost factory.setVirtualHost("/vhost_ljf"); // 用户名 factory.setUsername("ljf"); // 密码 factory.setPassword("123456"); return factory.newConnection(); }
生产者生产消息fetch
public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取一个链接 Connection connection = ConnectionUtils.getConnection(); // 从链接中获取一个通道 Channel channel = connection.createChannel(); // 建立队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello simple!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("--send msg: " + msg); channel.close(); connection.close(); } }
消费者接收消息spa
public class Recv { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 建立通道 Channel channel = connection.createChannel(); // 队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("recv: " + msg); } }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
简单队列的不足code
耦合性高,生产者一一对应消费者(若是我想有多个消费者消费队列中的消息,这时候就不行了);blog
队列名变动,这时候得同时变动。rabbitmq
模型队列
为何会出现工做队列?ip
simple 队列是一一对应的,并且咱们实际开发,生产者发送消息是绝不费力的,而消费者通常是要跟业务相结合的,消费者接收到消息以后就须要处理,可能须要花费时间,这时候队列就会积压了不少消息。内存
生产者
/** * |----C1 * P----Queue----| * |----C2 */ public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{ // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello" + i; System.out.println("[WQ] send: " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); } }
消费者
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg: " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done."); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg: " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done."); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象
先运行消费者1和消费者2,再运行生产者
消费者1 和 消费者2 处理的消息数量是同样多的。
消费者1:偶数
消费者2:奇数
这种方式叫作轮询分发(round-robin),结果就是无论谁忙谁悠闲,都不会多给一个消息。
生产者
public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{ // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每一个消费者:发送确认消息以前,消息队列不发送下一个消息到消费者,一次只处理一个消息 */ int prefetchCount = 1; channel.basicQos(prefetchCount); for (int i = 0; i < 50; i++) { String msg = "hello" + i; System.out.println("[WQ] send: " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i*5); } channel.close(); connection.close(); } }
消费者
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 保证一次只发送一个 // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg: " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done."); // 手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; // 自动应答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取链接 Connection connection = ConnectionUtils.getConnection(); // 获取 channel final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 保证一次只发送一个 // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg: " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done."); // 手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; // 自动应答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象
消费者2 处理的消息比 消费者1 多,能者多劳。
boolean autoAck = false; // 自动应答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
boolean autoAck = true;
(自动确认模式)一旦 rabbitmq 将消息分发给消费者,就会从内存中删除;
这种状况下,若是杀死正在执行的消费者,就会丢失正在处理的消息。
boolean autoAck = false;
(手动模式)若是一个消费者挂掉,就会交付给其余消费者;
rabbitmq 支持消息应答,消费者发送一个消息应答,告诉 rabbitmq 这个消息我已经处理完成,能够删掉,而后 rabbitmq 就删除内存中的消息。
消息应答默认是打开的,即为 false;
若是 rabbitmq 挂了,消息任然会丢失。
// 声明队列 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
注意:rabbitmq 不容许从新定义(不一样参数)一个已存在的队列