工做队列中,任务避免当即执行对资源相关的操做,由于耗时较长,须要等待任务完成才能执行下一步动做web
咱们将任务封装成一个消息,并发送到队列中。并发
后台运行的执行者将发送到队列中,若是有多个执行者,任务将会分配到不一样的执行者执行tcp
这个用在web应用解决一个短http请求窗口处理复杂处理逻辑很是有用fetch
使用任务队列可以并发执行任务,默认地,rabbitmq将发送每一个消息到下一个消费者,每一个消费者都会平均分配消息,这种分发消息的方式为round-robinspa
完成任务须要一段时间,假设在这段时间内线程崩溃,目前一旦将消息发送给消费者,这条消息就打上了删除的标示,一旦中止这个线程就丢失了正在处理的消息,咱们也会丢失掉全部发到这个工做者的消息,这些消息都没有被处理。线程
咱们须要的是,当一个工做者在不健康的状态,咱们能够把任务从新分配给另外一个工做者。blog
rabbitmq支持消息的确认,消费者告诉rabbitmq特定的消息已经收到,处理,rabbitmq能够删除它rabbitmq
若是消费者(channel关闭,链接断开,tcp链接断开)没有确认消息,rabbitmq确认消息没有彻底处理,将它从新放入队列,若是有其余在线的消费者,将会把消息处理的任务分配给其余在线的消费者,消息确认默认是开着的,没有确认的消息会再次被处理队列
咱们已经学会了如何确保消费者服务失败后,任务不会丢失,可是若是rabbitmq服务失败后,任务也会丢失资源
当rabbitmq退出或者崩溃后,它将会丢失队列和消息
两方面能够保证不会丢失,咱们应该将队列和消息设置为持久的
首先将queue设置为持久的
命令自己是对的,可是咱们已经使用了一个非持久的同名队列
rabbitmq不容许从新定义参数不一样的相同队列,所以会报错
其次咱们能够设置消息也是持久的 MessageProperties.PERSISTENT_TEXT_PLAIN
公平分发
平均分配,并不能保证每个消费者都可以公平的处理消息,可能有些很忙,有些却在等待消息
设置参数prefetchCount=1,这样的设置保证了一个消费者在接收到
消息后,在消费完成以前不会接收新的消息,会把这个消息发送给
尚未接收到消息的那个消费者
若是消息队列满了,而且全部的消费者都有任务在处理中,那么
可能须要添加更多的消费者,或者使用其余的策略对消息进行分发
相关代码以下
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String[] name={"a","b","c"};
String message = String.join(" ", name);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] send '" + message + "'");
} catch (Exception e) {
throw new RuntimeException();
}
}
}
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("received >>" + message);
try {
doWork(message);
} finally {
System.out.println("[x] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
});
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if ('.' == ch) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}