第一步:下载erlang
http://www.erlang.org/downloads/19.3html
第二步:下载rabbitMQ
http://www.rabbitmq.com/download.htmlspring
rabbitMQ安装完成后,打开rabbitMQ控制台 输入:rabbitmq-plugins enable rabbitmq_management
打开网址 http://127.0.0.1:15672/ 默认帐号 guest/guestapi
项目配置:
1、pom文件的配置:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>浏览器
或者服务器
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.5.RELEASE</version>
</dependency>网络
mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672mybatis
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <rabbit:connection-factory id="connectionFactory" username="${mq.username}" password="${mq.password}" host="${mq.host}" port="${mq.port}" virtual-host="/" /> <!-- 定义rabbit template 用于数据的接收和发送 --> <rabbit:template id="amqTemplate" connection-factory="connectionFactory" exchange="Q_PAY_PPMS_RECON"></rabbit:template> <!-- 经过指定下面的admin信息,当前productor中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定义queue 说明:durable:是否持久化 exclusive: 仅建立者可使用的私有队列,断开后自动删除 auto_delete: 当全部消费客户端链接断开后,是否自动删除队列--> <rabbit:queue name="chase1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="chase2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="chase3" durable="true" auto-delete="false" exclusive="false" /> <!--topic 模式:发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端一样如此。 --> <rabbit:topic-exchange name="mq.asdfExChange" durable="true" auto-delete="false"> <rabbit:bindings> <!-- 配置多个消费者 根据不一样业务类型选择对应消费者 pattern="*.*.test1" --> <rabbit:binding queue="chase1" pattern="mq.*.send"></rabbit:binding> <rabbit:binding queue="chase2" pattern="mq.*.send"></rabbit:binding> <rabbit:binding queue="chase3" pattern="mq.*.send"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- fanout 模式:客户端中只要是与该路由绑定在一块儿的队列都会收到相关消息,这相似广播,发送端无论队列是谁,都由客户端本身去绑定,谁须要数据谁去绑定本身的相应队列 --> <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange"> <rabbit:bindings> <rabbit:binding queue="chase1"/> <rabbit:binding queue="chase2"/> <rabbit:binding queue="chase3"/> </rabbit:bindings> </rabbit:fanout-exchange> <!--定义direct-exchange direct 消息转换队列 绑定key,意思就是消息与一个特定的路由键匹配,会转发。rabbit:binding:设置消息queue匹配的key --> <rabbit:direct-exchange name="mq.qwerExChange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="chase1" key="mq.qwer.send" ></rabbit:binding> <rabbit:binding queue="chase2" key="mq.qwer.send2" ></rabbit:binding> <rabbit:binding queue="chase3" key="mq.qwer.send3" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="asdfConsumer" class="com.rabbitmq.Consumor"></bean> <bean id="asdfConsumer2" class="com.rabbitmq.Consumor2"></bean> <bean id="qwerConsumer3" class="com.rabbitmq.Consumor3"></bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledeg = "manual",意为表示该消费者的ack方式为手动--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="chase1" ref="asdfConsumer"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="chase2" ref="asdfConsumer2"/> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener queues="chase3" ref="qwerConsumer3"/> </rabbit:listener-container> </beans>
4、spring主配置文件中引入刚刚添加的配置文件测试
<!-- 加载配置文件 --> <context:property-placeholder location="classpath:rabbitMQ.properties" /> <import resource="spring-rabbitmq.xml"></import>
5、生产者生产消息spa
在代码中注入:
@Autowired
private AmqpTemplate amqpTemplate;
以下能够将消息发送到交换器(mq.asdfExChange)中,交换器的工做模式为topic模式,根据工做模式将消息发送到队列chase1,chase2,chase3:code
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdfExChange.send", msg);
6、消费者消费消息
定义在spring-mybatis.xml中配置的消费者:com.rabbitmq.Consumor和com.rabbitmq.Consumor2和com.rabbitmq.Consumor3
这里只定义一个Consumor供参考:
public class Consumor implements ChannelAwareMessageListener { /** * message 消息实体 * Channel 当前通道 */ public void onMessage(Message message, Channel channel) throws Exception { System.out.println("消费者接收到信息"); String msg = new String(message.getBody(), "utf-8"); //消息的标识,false只确认当前一个消息收到,true确认全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //ack返回false,并从新回到队列,api里面解释得很清楚 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //true拒绝消息 false确认接受到消息 //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); System.out.println("消费者消费掉了消息:" + msg); } }
7、消息确认机制
首先介绍一下消息确认的几种类型:
AcknowledgeMode.NONE:自动确认
AcknowledgeMode.AUTO:根据状况确认
AcknowledgeMode.MANUAL:手动确认
消息经过 ACK 确认是否被正确接收,每一个 Message 都要被确认(acknowledged),能够自动去ACK或手动ACK(在spring-mybatis.xml中配置acknowledge="manual")
1.自动确认会在消息发送给消费者后当即确认,但存在丢失消息的可能,若是消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就至关于丢失了消息
2.根据状况确认会在消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直处处理成功。不会丢失消息,即使服务挂掉,没有处理完成的消息会重回队列,可是异常会让消息不断重试
3.若是消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也一样形成了实际意义的消息丢失
4.若是手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确承认以在业务失败后进行一些操做,若是消息未被 ACK 则会发送到下一个消费者(下一个消费者指的是什么,没有下一个消费者会怎样)
5.若是某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,由于 RabbitMQ 认为该服务的处理能力有限,标记为uncheck状态
6.手动确认方法 channel.basicAck(deliveryTag, multiple)
deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会创建起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 deliveryTag,
它表明了 RabbitMQ 向该 Channel 投递的这条消息的惟一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
multiple:为了减小网络流量,手动确承认以被批处理,当该参数为 true 时,则能够一次性确认 deliveryTag 小于等于传入值的全部消息
7.否定消息方法 channel.basicNack(deliveryTag, multiple, requeue)
multiple:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。
requeue:被拒绝的是否从新入队列
8.拒绝消息方法 channel.basicReject(deliveryTag, requeue)
requeue:被拒绝的是否从新入队列
问题1:刚开始运行一切OK,后来我把<rabbit:topic-exchange>下的chase2和chase3删除只留下chase1,结果发现发给chase1的信息,仍是同时发给了chase2和chase3
1.确认配置文件是否正常
2.浏览器打开 http://127.0.0.1:15672 找到设置的交换器,好比我设置的交换器 mq.asdfExChange ,发现下面绑定channel列表里依然是3个分别是chase1,chase2,chase3
原来虽然在配置文件更新了,可是在rabbitMQ里没有进行同步更新
问题2:rabbitMQ后台管理中的ready、unchecked、total分别是什么意思
ready是准备发送的消息 unchecked是消费了未确认 total=ready+unchecked
问题3:消费者消费消息时候出错如何处理
通过测试发现,若是异常出如今消息确认以后,不影响后面的消息推送。
若是异常出如今确认消息以前,会致使消息没有被确认,打开rabbitMQ控制台,发现uncheck状态的消息数增长。
能够经过在消费者逻辑里try..catch..处理,确保消息确承认以正常返回给rabbitMQ
文章参考: https://www.jianshu.com/p/2c5eebfd0e95 https://www.cnblogs.com/piaolingzxh/p/5448927.html