消息中间件简单介绍java
可靠性(Reliability) | 使用了一些机制来保证可靠性,如持久、传输确认、发布确认 |
灵活性(Flexible Routing) | 在消息进入队列以前,经过Exchange来路由消息,能够将多个Exchange绑定一块儿 |
消息集群(Clustering) | 多个RabbitMQ服务器开以组成一个集群,造成一个逻辑Broker |
高可用(Highly Available Queues) | 队列能够在集群中机器上进行镜像,使得在部分节点出问题的状况下队列仍能够用 |
多中协议(Multi-protocol) | 支持多种消息队列协议,好比STOMP、MQTT等等 |
多语言客户端(Many Clients) | 几乎支持全部经常使用语言,好比java、.NET、Ruby等等 |
管理界面(Management UI) | 提供了一个易用的用户界面,使得用户能够监控和管理消息Broker的许多方面 |
跟踪机制(Tracing) | 若是消息异常,RabbitMQ提供了消息跟踪机制,使用者能够找出发生了什么 |
插件机制(Plugin System) | 提供了许多插件,从多方面进行扩展,也可编写本身的插件 |
架构图redis
主要概念spring
rabbitmq‐plugins enable rabbitmq_management
|
(4)从新启动服务浏览器
(5)打开浏览器,地址栏输入http://127.0.0.1:15672 ,便可看到管理界面的登录页服务器
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring‐rabbit</artifactId> <version>2.1.4.RELEASE</version> </dependency>
<?xml version="1.0" encoding="UTF‐8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring‐beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd"> <!‐‐链接工厂‐‐> <rabbit:connection‐factory id="connectionFactory"
host="127.0.0.1"
port="5672"
username="guest"
password="guest" /> <rabbit:template id="rabbitTemplate" connection‐ factory="connectionFactory" />
</beans>
1 @Test
public void test() { 2 //解析配置文件,从中获取RabbitTemplate对象 3 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml"); 4 RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate"); 5 //发送消息(queue.test这个队列名称须要在RabbitMQ中手动建立) 6 rabbitTemplate.convertAndSend("","queue.test","直接模式"); 7 System.out.println("消息发送成功"); 8 //关闭对象 9 ((ClassPathXmlApplicationContext) context).close(); 10 }
代码实现、消息消费者架构
1 public class MessageConsumer implements MessageListener { 2 public void onMessage(Message message) { 3 System.out.println("接收到消息:" +new String(message.getBody()) ); 4 } 5 }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--链接工厂--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/> <!--队列名称--> <rabbit:queue name="queue.test" /> <!--消费者监听类class为监听类路径--> <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"> </bean> <!--设置监听容器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queue-names="queue.test" ref="messageConsumer"/> </rabbit:listener-container> </beans>
@Test public void test2(){ //运行此方法会自动调用消息监听类,并打印监听到的消息 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-consumer.xml"); }
运行结果app
接收到消息:直接模式
rabbitTemplate.convertAndSend("exchange.fanout_test","","分列模式走起");
修改消息、消费者配置文件(applicationContext-rabbitmq-consumer.xml)异步
<!--队列名称--> <rabbit:queue name="queue.test" /> <rabbit:queue name="queue.test1" /> <!--消费者监听类class为监听类路径--> <bean id="messageConsumer" class="cn.itcast.demo.MessageConsumer"></bean> <bean id="messageConsumer1" class="cn.itcast.demo.MessageConsumer1"> </bean> <!--设置监听容器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queue-names="queue.test" ref="messageConsumer"/> <rabbit:listener queue-names="queue.test1" ref="messageConsumer1"/> </rabbit:listener-container>
在增长一个消息监听者类,而后运行测试代码Test2便可分布式
运行结果性能
我是Message、接收到消息:分列模式走起
我是Message一、接收到消息:分列模式走起
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--配置connection-factory,指定链接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/> <rabbit:admin connection-factory="connectionFactory"></rabbit:admin> <!--建立交换机(过时队列的交换机)--> <rabbit:direct-exchange id="exchange.delay.order.begin" name="exchange.delay.order.begin" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue.delay.order.begin" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 延时队列(过时队列) --> <rabbit:queue name="queue.delay.order.begin" durable="false"> <rabbit:queue-arguments> <!-- 设置队列过时时间为1分钟 --> <entry key="x-message-ttl" value="60000" value-type="java.lang.Long"/> <entry key="x-dead-letter-exchange" value="exchange.delay.order.done"/> <entry key="x-dead-letter-routing-key" value="delay"/> </rabbit:queue-arguments> </rabbit:queue> <!--死信交换机定义--> <rabbit:direct-exchange id="exchange.delay.order.done" name="exchange.delay.order.done" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue.delay.order.done" key="delay"/> <!-- binding key 相同为 【delay】exchange转发消息到多个队列 --> <!--<rabbit:binding queue="queue.delay.order.done.two" key="delay" />--> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue name="queue.delay.order.done" durable="false"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!-- 消息接收者 --> <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false"> <rabbit:listener queues="queue.delay.order.done" ref="orderMessageListener"/> </rabbit:listener-container> <!--建立一个消息监听对象--> <bean class="cn.itcast.demo.MessageConsumer" id="orderMessageListener"></bean> </beans>
建立测试类消息生产者(发送消息)
@Test public void test1() { //解析配置文件,从中获取RabbitTemplate对象 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml"); RabbitTemplate rabbitTemplate= (RabbitTemplate)context.getBean("rabbitTemplate"); rabbitTemplate.convertAndSend("exchange.delay.order.begin","delay","延时队列模式走起"); System.out.println("消息发送成功"); //关闭对象 ((ClassPathXmlApplicationContext) context).close(); }
建立消息监听器(MessageConsumer)
public class MessageConsumer implements MessageListener { public void onMessage(Message message) { System.out.println("我是Message、接收到消息:" +new String(message.getBody()) ); } }
建立测试类消息消费者
@Test public void test2(){ //运行此方法会自动调用消息监听类,并打印监听到的消息 ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-mq.xml"); }
运行结果(因为死信队列时间设置为1分钟,因此须要一分钟后才会监听到消息)
我是Message、接收到消息:延时队列模式走起
如何解决消息重复消费
让生产者发送每条数据的时候,里面加一个全局惟一的id,相似订单id之类的东西,而后你这里消费到了以后,先根据这个id去好比redis里查一下,以前消费过吗?若是没有消费过,你就处理,而后这个id写redis。若是消费过了,那你就别处理了,保证别重复处理相同的消息便可。