转 https://blog.csdn.net/leixiaotao_java/article/details/78924863java
一、maven依赖
-
-
<groupId>commons-lang</groupId>
-
<artifactId>commons-lang</artifactId>
-
-
-
-
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>
3.4.1</version>
-
二、RabbitMQ重要方法介绍(基本经常使用的)
2.一、建立链接
-
-
ConnectionFactory cf =
new ConnectionFactory();
-
-
-
-
-
-
-
-
cf.setPort(AMQP.PROTOCOL.PORT);
-
-
connection = cf.newConnection();
-
-
channel = connection.createChannel();
-
-
-
-
2.二、声明队列
-
-
-
-
-
-
-
-
-
-
-
channel.queueDeclare(
"testQueue", true, false, false, null);
此方法通常由Producer调用建立消息队列。若是由Consumer建立队列,有可能Producer发布消息的时候Queue尚未被建立好,会形成消息丢失的状况。web
2.三、声明Exchange
-
-
-
-
-
-
-
-
-
-
-
channel.exchangeDeclare(
"leitao","topic", true,false,null);
2.四、将queue和Exchange进行绑定(Binding)
-
-
-
-
-
-
-
-
-
channel.queueBind(
"testQueue", "leitao", "testRoutingKey");
2.五、发布消息
-
-
-
-
-
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
当exchange的值为空字符串或者是amq.direct时,此时的交换器类型默认是direct类型,能够不用单独声明Exchange,也不用单独进行Binding,系统默认将queue名称做为RoutingKey进行了绑定。服务器
两个传入参数的含义网络
mandatory异步
当mandatory标志位设置为true时,若是exchange根据自身类型和消息routeKey没法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。maven
immediateide
当immediate标志位设置为true时,若是exchange在将消息路由到queue(s)时发现对于的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的全部queue(一个或者多个)都没有消费者时,该消息会经过basic.return方法返还给生产者。函数
归纳来讲,mandatory标志告诉服务器至少将该消息route到一个队列中,不然将消息返还给生产者;immediate标志告诉服务器若是该消息关联的queue上有消费者,则立刻将消息投递给它,若是全部queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。工具
注意:在RabbitMQ3.0之后的版本里,去掉了immediate参数的支持,发送带immediate=true标记的publish会返回以下错误:性能
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。
为何取消支持:immediate标记会影响镜像队列性能,增长代码复杂性,并建议采用“TTL”和“DLX”等方式替代。
2.6、接收消息
-
-
-
-
-
-
-
channel.basicQos(
10,false);
-
-
channel.basicQos(
15,true);
-
-
-
-
-
-
-
-
-
-
-
channel.basicConsume(queueName,
false, Consumer);
2.七、Consumer处理消息
-
-
-
-
-
-
-
-
-
void handleDelivery(String consumerTag,
-
-
AMQP.BasicProperties properties,
-
-
三、Producer消息确认机制
3.一、什么是生产者消息确认机制?
没有消息确认模式时,生产者不知道消息是否是已经到达了Broker服务器,这对于一些业务严谨的系统来讲将是灾难性的。消息确认模式能够采用AMQP协议层面提供的事务机制实现(此文没有这种实现方式),可是会下降RabbitMQ的吞吐量。RabbitMQ自身提供了一种更加高效的实现方式:confirm模式。
消息生产者经过调用Channel.confirmSelect()方法将Channel信道设置成confirm模式。一旦信道被设置成confirm模式,该信道上的全部消息都会被指派一个惟一的ID(从1开始),一旦消息被对应的Exchange接收,Broker就会发送一个确认给生产者(其中deliveryTag就是此惟一的ID),这样消息生产者就知道消息已经成功到达Broker。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack 。
3.二、开启confirm模式
如上所说生产者经过调用Channel.confirmSelect()方法将Channel信道设置成confirm模式。
注意:已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
3.三、普通confirm模式
普通confirm模式是串行的,即每次发送了一次消息,生产者都要等待Broker的确认消息,而后根据确认标记权衡消息重发仍是继续发下一条。因为是串行的,在效率上是比较低下的。
(1)重点方法
-
-
-
-
-
-
-
boolean waitForConfirms() throws InterruptedException;
(2)部分使用代码以下:
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
-
-
if(channel.waitForConfirms())
-
System.out.println(
"send success!");
-
-
System.out.println(
"send error!");
-
-
3.四、批量confirm模式
批量confirm模式是异步的方式,效率要比普通confirm模式高许多,可是此种方式也会形成线程阻塞,想要进行失败重发就必需要捕获异常。网络上还有采用waitForConfirms()实现批量confirm模式的,可是只要一条失败了,就必须把这批次的消息通通再重发一次,很是的消耗性能,所以此文不予考虑。
(1)重点代码
-
-
-
-
-
-
-
void waitForConfirmsOrDie() throws IOException, InterruptedException;
(2)部分代码以下:
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
-
-
-
channel.waitForConfirmsOrDie();
3.五、ConfirmListener监听器模式
RabbitMQ提供了一个ConfirmListener接口专门用来进行确认监听,咱们能够实现ConfirmListener接口来建立本身的消息确认监听。ConfirmListener接口中包含两个回调方法:
-
-
-
-
void handleAck(long deliveryTag, boolean multiple) throws IOException;
-
-
-
-
-
void handleNack(long deliveryTag, boolean multiple) throws IOException;
其中deliveryTag是Broker给每条消息指定的惟一ID(从1开始);multiple表示是否接收全部的应答消息,好比multiple=true时,发送100条消息成功事后,咱们并不会收到100次handleAck方法调用。
(1)重要方法
-
-
channel.addConfirmListener(
new MyConfirmListener());
(2)部分使用代码以下:
-
-
-
-
channel.addConfirmListener(
new MyConfirmListener());
-
-
channel.addReturnListener(
new MyReturnListener());
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.
-
-
-
-
-
public class MyConfirmListener implements ConfirmListener{
-
-
-
-
-
-
-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
-
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
-
-
-
-
-
-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)失败!服务器broker丢失了消息");
-
-
-
-
-
-
-
-
-
public class MyReturnListener implements ReturnListener {
-
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
-
BasicProperties properties, byte[] body) throws IOException {
-
System.out.println(
"消息发送到队列失败:回复失败编码:"+replyCode+";回复失败文本:"+replyText+";失败消息对象:"+SerializationUtils.deserialize(body));
-
-
四、Consumer消息确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在注册消费者时,能够指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(或磁盘,若是是持久化消息的话)中移去消息。不然,RabbitMQ会在队列中消息被消费后当即删除它。
当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分红了两部分:一部分是等待投递给消费者的消息(web管理界面上的Ready状态);一部分是已经投递给消费者,可是尚未收到消费者ack信号的消息(web管理界面上的Unacked状态)。若是服务器端一直没有收到消费者的ack信号,而且消费此消息的消费者已经断开链接,则服务器端会安排该消息从新进入队列,等待投递给下一个消费者(也可能仍是原来的那个消费者)。
(1)重要方法
-
-
-
-
-
-
-
-
-
-
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
-
-
-
-
-
-
-
-
-
-
void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
-
-
-
-
-
-
-
-
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
-
-
-
-
-
-
void basicReject(long deliveryTag, boolean requeue) throws IOException;
(2)部分使用代码以下:
-
-
channel.basicConsume(queueName,
false, this);
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
-
-
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"接收并处理消息:"+tagId);
-
-
channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
五、Demo项目总体代码
此demo就是向RabbitMQ服务器上面发送20个消息,消息体是map,里面装的是tagId=数字。而后注册了两个消费者,分别处理奇数和偶数。
5.一、链接工具类
-
-
-
-
public class ConnectionUtil {
-
-
-
-
-
-
public ConnectionUtil(String queueName) throws IOException {
-
this.queueName = queueName;
-
-
ConnectionFactory cf =
new ConnectionFactory();
-
-
-
-
-
-
-
cf.setPort(AMQP.PROTOCOL.PORT);
-
-
connection = cf.newConnection();
-
-
channel = connection.createChannel();
-
-
-
-
-
-
-
-
-
-
-
channel.queueDeclare(queueName,
true, false, false, null);
-
-
-
public void close() throws IOException{
-
-
-
-
5.二、具体生产者
-
-
-
-
public class MessageProducer {
-
-
private ConnectionUtil connectionUtil;
-
-
public MessageProducer(ConnectionUtil connectionUtil){
-
this.connectionUtil=connectionUtil;
-
-
-
-
-
public void sendMessage(Serializable object) throws IOException{
-
-
-
-
-
-
-
-
-
-
connectionUtil.channel.basicPublish(
"", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
-
System.out.println(
"MessageProducer发送了一条消息:"+object);
-
-
5.三、公共消费者父类
-
-
-
-
public class MessageConsumer implements Consumer {
-
-
protected String consumerTag;
-
-
protected ConnectionUtil connectionUtil;
-
-
public MessageConsumer(ConnectionUtil connectionUtil){
-
this.connectionUtil=connectionUtil;
-
-
-
public void basicConsume(){
-
-
-
-
-
-
-
-
connectionUtil.channel.basicQos(
10,false);
-
connectionUtil.channel.basicQos(
15,true);
-
-
-
-
-
-
-
-
-
-
connectionUtil.channel.basicConsume(connectionUtil.queueName,
false, this);
-
}
catch (IOException e) {
-
-
-
-
-
-
-
-
public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
-
-
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"消费者:"+consumerTag+",注册成功!");
-
-
-
-
-
-
-
public void handleCancelOk(String consumerTag) {
-
System.out.println(consumerTag+
" 手动取消消费者注册成功!");
-
-
-
-
-
-
public void handleCancel(String consumerTag) throws IOException {
-
System.out.println(
"由于外部缘由消费者:"+consumerTag+" 取消注册!");
-
-
-
-
-
-
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
-
System.out.println(
"通道或基础链接被关闭");
-
-
-
-
-
-
-
-
-
-
public void handleRecoverOk(String consumerTag) {
-
-
-
5.四、具体的消费者
-
-
-
-
public class EvenConsumer extends MessageConsumer {
-
-
public EvenConsumer(ConnectionUtil connectionUtil) {
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"EvenConsumer消费者:"+consumerTag+",注册成功!");
-
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"EvenConsumer接收并处理消息:"+tagId);
-
-
connectionUtil.channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
connectionUtil.channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
-
-
-
-
-
public class OddConsumer extends MessageConsumer {
-
-
public OddConsumer(ConnectionUtil connectionUtil) {
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"OddConsumer消费者:"+consumerTag+",注册成功!");
-
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"OddConsumer接收并处理消息:"+tagId);
-
-
connectionUtil.channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
connectionUtil.channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
-
5.五、监听器
-
-
-
-
public class MyConfirmListener implements ConfirmListener{
-
-
-
-
-
-
-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
-
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
-
-
-
-
-
-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)失败!服务器broker丢失了消息");
-
-
-
-
-
-
-
-
-
public class MyReturnListener implements ReturnListener {
-
-
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
-
BasicProperties properties, byte[] body) throws IOException {
-
System.out.println(
"消息发送到队列失败:回复失败编码:"+replyCode+";回复失败文本:"+replyText+";失败消息对象:"+SerializationUtils.deserialize(body));
-
-
5.六、客户端
-
-
-
public static void main(String[] args) {
-
-
-
-
-
-
-
-
-
-
}
catch (IOException e) {
-
-
}
catch (InterruptedException e) {
-
-
-
-
-
public void publishMessage() throws IOException, InterruptedException{
-
ConnectionUtil connectionUtil=
new ConnectionUtil("testqueue");
-
MessageProducer producer=
new MessageProducer(connectionUtil);
-
connectionUtil.channel.confirmSelect();
-
-
connectionUtil.channel.addConfirmListener(
new MyConfirmListener());
-
connectionUtil.channel.addReturnListener(
new MyReturnListener());
-
-
-
HashMap<String, Object> map=
new HashMap<String, Object>();
-
-
producer.sendMessage(map);
-
-
-
-
-
public void addConsumer() throws IOException{
-
ConnectionUtil connectionUtil=
new ConnectionUtil("testqueue");
-
OddConsumer odd=
new OddConsumer(connectionUtil);
-
-
EvenConsumer even=
new EvenConsumer(connectionUtil);
-
-
-
-
5.七、测试结果
-
MessageProducer发送了一条消息:{tagId=
1}
-
MessageProducer发送了一条消息:{tagId=
2}
-
MessageProducer发送了一条消息:{tagId=
3}
-
Exchange接收消息:
1(deliveryTag)成功!multiple=false
-
Exchange接收消息:
2(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
4}
-
Exchange接收消息:
3(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
5}
-
Exchange接收消息:
4(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
6}
-
Exchange接收消息:
5(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
7}
-
Exchange接收消息:
6(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
8}
-
Exchange接收消息:
7(deliveryTag)成功!multiple=false
-
Exchange接收消息:
8(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
9}
-
Exchange接收消息:
9(deliveryTag)成功!multiple=false
-
MessageProducer发送了一条消息:{tagId=
10}
-
Exchange接收消息:
10(deliveryTag)成功!multiple=false
-
OddConsumer消费者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,注册成功!
-
-
-
-
-
-
EvenConsumer消费者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,注册成功!
-
-
-
-
-
六、Demo完整源码下载地址
Java使用RabbitMQ完整项目源码.rar