rabbitmq 一个java 开源的消息中间件,能够用来对应用之间的调用解耦,好比说,我要对登录事件进行多种操做(记录日志、更新某些信息)。这些应用依赖登录事件,但登录自己并不对这些应用有依赖,能够经过消息中间件的方式进行解耦。java
由对登录事件感兴趣的应用系统订阅消息,对消息处理,完成业务需求,同时对各系统解耦。spring
rabbitmq 的介绍以及教程网上有,须要的能够百度,这里仅上两端带注释的代码json
//生产者app
//建立链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.22.111");
factory.setUsername("guest");
factory.setPassword("guest");
//建立链接
Connection connection = factory.newConnection();
//建立channel 全部的消息操做基于channel
Channel channel = connection.createChannel(10);
//声明一个exchange topic 类型
channel.exchangeDeclare("crm.topic", "topic",true,false,false,null);
//声明一个队列
channel.queueDeclare("app.login.user", true, false, false, null);
//将queue 与exchange 绑定
channel.queueBind("app.login.user", "crm.topic", "app.login.user");
//发送消息,指定与exchange & routingKey
channel.basicPublish("crm.topic", "app.login.user", null, "sxxxxxxxxxx".getBytes());
channel.close();
connection.close();异步
消费者线程
//建立链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.22.111");
factory.setUsername("guest");
factory.setPassword("guest");
//建立链接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(10);日志//声明一个队列
channel.queueDeclare("crmcommon.login.user", true, false, false, null);
//将queue 与exchange 绑定
channel.queueBind("crmcommon.login.user", "crm.topic", "crmcommon.login.user");
//消费
GetResponse r = channel.basicGet("crmcommon.login.user", true);
System.out.println(new String(r.getBody()));
channel.close();
connection.close(); 中间件
以上是一个简单的生产者,一个简单的消费者。教程
下面是使用spring整合的配置rabbitmq
<!-- 登录事件 -->
<!-- 链接工厂 bean -->
<rabbit:connection-factory id="connectionFactory" host="${amqp.rabbit.host}"
username="${amqp.rabbit.username}"
password="${amqp.rabbit.password}"
/>
<!-- RabbitAdmin 用于管理exchange,queue -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- 声明一个队列 -->
<rabbit:queue name="crmcommon.login.user" id="loginQuene" ></rabbit:queue>
<!-- 声明一个exchange -->
<rabbit:topic-exchange id="crmTopic" name="crm.topic" auto-delete="false" durable="true" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="crmcommon.login.#" queue="crmcommon.login.user" ></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange><!-- 消息转换器 pojo ==> json 提供了 JsonMessageConverter SimpleMessageConverter
Jackson2JsonMessageConverter SerializerMessageConverter 这几种转换器 -->
<bean id="messageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- 操做的模板,和jdbc 同样spring 提供了一个封装的模板类 -->
<rabbit:template id="rabbitTamplate" exchange="crm.topic" routing-key="crmcommon.login.user"
connection-factory="connectionFactory"
message-converter="messageConverter"></rabbit:template>
<!--异步执行线程池,对jdk 线程池的封装,暴露部分参数 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="2"></property>
<property name="maxPoolSize" value="5"></property>
</bean>
<!-- 消费者 须要实现 ChannelAwareMessageListener 或 MessageListener -->
<bean id="listener" class="com.xx.xx.xx.rabbit.LoginListener" />
<!-- listener 容器 能够绑定多个消费者,一个消费者能够绑定多个queue -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" message-converter="messageConverter" task-executor="taskExecutor" >
<rabbit:listener ref="listener" queues="loginQuene"/>
</rabbit:listener-container>
能够看到spring 配置也是要配置上面代码的基本元素,spring 同时提供了一下封装类,对消息的转换,操做模板等。