应领导要求 小弟今天也对RabbitMQ的业务逻辑流程简单的了解了一下。从win环境搭建RabbitMQ服务,到代码测试(单机部署、单生产者、单消费者),一套流程下来,感受都挺正常。但想着好记性不如烂笔头,因此仍是要尽量的将学到的东西作个笔记。html
单机部署就不介绍了,可参考博客http://www.cnblogs.com/LipeiNet/p/5977028.html,java
这里我要记录一下单机的多消费者。网络
首先是生产者:ide
package mytask; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.sun.corba.se.impl.orbutil.threadpool.TimeoutException; import java.io.IOException; /** * @Description:多消费者测试 * @Date: create in 2018-07-13 15:59 * @Author:Reynold-白 */ public class NewTask { private static final String TASK_QUEUE_NAME="task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("localhost"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //分发信息 for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send '"+message+"'"); } channel.close(); connection.close(); } }
消费者1:测试
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class LoanConsumer { public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); /** * 第一个参数:队列名字, * 第二个参数:队列是否可持久化即重启后该队列是否依然存在, * 第三个参数:该队列是否时独占的即链接上来时它占用整个网络链接, * 第四个参数:是否自动销毁即当这个队列再也不被使用的时候即没有消费者对接上来时自动删除, * 第五个参数:其余参数如TTL(队列存活时间)等。 */ channel.queueDeclare(LoanQueuesConstant.LOAN_QUEUE, true, false, false, null); System.out.println("LoanConsumer Waiting for messages"); //每次从队列获取的数量,其实 是MQ推送给消费者的。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Loan Received '" + message + "'"); try { // throw new Exception(); doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("Consumer Done"); //前面的参数表示该数据在队列中的索引位置,第二个参数 channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; //消息消费完成确认 channel.basicConsume(LoanQueuesConstant.LOAN_QUEUE, autoAck, consumer); } private static void doWork(String task) throws Exception{ try { Thread.sleep(1000); // 暂停1秒钟模拟服务消费时间 AutoIvsConsumer.autoInvest(10); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
消费者2 与消费者1相同。htm
有一个关键的配置:blog
boolean autoAck=false; //消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
这段表示,当消费者消费了这条信息后,会主动给队列推送响应。成功后队列会从列表中吧该条数据清除。表示成功处理。而 autoAck=false 亲测,能够实现当一台消费者down机后,队列没有收到响应,会回收这条数据,并交给其余消费者处理。索引
运行结果:rabbitmq
结果能够看到当work1消费02号时异常停机,而work2继续消费02号信息。队列