rabbitmq分布式工做队列work queues

任务队列解耦

工做队列中,任务避免当即执行对资源相关的操做,由于耗时较长,须要等待任务完成才能执行下一步动做web

咱们将任务封装成一个消息,并发送到队列中。并发

后台运行的执行者将发送到队列中,若是有多个执行者,任务将会分配到不一样的执行者执行tcp

这个用在web应用解决一个短http请求窗口处理复杂处理逻辑很是有用fetch

 

Round-robin 分发

使用任务队列可以并发执行任务,默认地,rabbitmq将发送每一个消息到下一个消费者,每一个消费者都会平均分配消息,这种分发消息的方式为round-robinspa

message acknowledgment 消息确认

 

完成任务须要一段时间,假设在这段时间内线程崩溃,目前一旦将消息发送给消费者,这条消息就打上了删除的标示,一旦中止这个线程就丢失了正在处理的消息,咱们也会丢失掉全部发到这个工做者的消息,这些消息都没有被处理。线程

咱们须要的是,当一个工做者在不健康的状态,咱们能够把任务从新分配给另外一个工做者。blog

rabbitmq支持消息的确认,消费者告诉rabbitmq特定的消息已经收到,处理,rabbitmq能够删除它rabbitmq

 

若是消费者(channel关闭,链接断开,tcp链接断开)没有确认消息,rabbitmq确认消息没有彻底处理,将它从新放入队列,若是有其余在线的消费者,将会把消息处理的任务分配给其余在线的消费者,消息确认默认是开着的,没有确认的消息会再次被处理队列

 

消息持久化

 

咱们已经学会了如何确保消费者服务失败后,任务不会丢失,可是若是rabbitmq服务失败后,任务也会丢失资源

当rabbitmq退出或者崩溃后,它将会丢失队列和消息

两方面能够保证不会丢失,咱们应该将队列和消息设置为持久的

首先将queue设置为持久的

命令自己是对的,可是咱们已经使用了一个非持久的同名队列

rabbitmq不容许从新定义参数不一样的相同队列,所以会报错

其次咱们能够设置消息也是持久的 MessageProperties.PERSISTENT_TEXT_PLAIN

 

 

公平分发

 

平均分配,并不能保证每个消费者都可以公平的处理消息,可能有些很忙,有些却在等待消息

设置参数prefetchCount=1,这样的设置保证了一个消费者在接收到

消息后,在消费完成以前不会接收新的消息,会把这个消息发送给

尚未接收到消息的那个消费者

 

若是消息队列满了,而且全部的消费者都有任务在处理中,那么

可能须要添加更多的消费者,或者使用其余的策略对消息进行分发

相关代码以下

 

 

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

 

public static void main(String[] args) {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String[] name={"a","b","c"};

String message = String.join(" ", name);

channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

System.out.println(" [x] send '" + message + "'");

} catch (Exception e) {

throw new RuntimeException();

}

}

}

 

 

 

 

 

public class Worker {

 

 

private static final String TASK_QUEUE_NAME = "task_queue";

 

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

final Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

channel.basicQos(1);

DeliverCallback deliverCallback = ((consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println("received >>" + message);

try {

doWork(message);

} finally {

System.out.println("[x] done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

});

channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {

});

 

 

}

 

private static void doWork(String task) {

for (char ch : task.toCharArray()) {

if ('.' == ch) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

}

}

相关文章
相关标签/搜索