消息队列的使用场景和使用技巧

1、消息队列的使用场景

一、应用解耦

假设商品和结算和支付是不一样的系统,两个系统之间的通信能够经过消息队列完成,不须要强制性的接口关联。值得注意的是消息队列中间件通常都支持同步和异步操做。java

 

二、高并发的异步处理

一个http请求里面若是有多线程的异步操做的话,在高并发环境下回产生大量的等待队列,形成后果是大量的线程被占用,内存,cup,数据库等都会处于高负荷状态。redis

经过消息队列中间件将异步处理都发送到消息队列中,而后经过推或者拉的方式执行异步处理的内容,减小了线程的占用,也不会在请求系统中产生大量等待队列,由于实际生产环境消息队列中间件都会独立一台或者多台服务器。算法

三、流量削锋

主要用于高并发业务场景好比秒杀。spring

将用户请求放入消息队列中,若是消息队列的大小超过了设置的最大值,则直接抛弃改请求。数据库

四、日志系统

不少系统多会在aop层保存日志到mongo,若是是同步操做会严重影响请求响应时间,即便经过线程来达到异步操做的目的也会暂用网站的大量资源,经过mq保存日志,用独立的日志系统进行日志处理会是一个好的办法,若是设计的好的话日志系统甚至能够兼容多个平台。apache

五、信息变动通知

假设商品信息均保存到了redis中,若是商家后台对商品信息进行了修改,可经过发送消息的方式告知商品信息已变动,消费端只须要经过监听去修改响应redis中商品信息接口。json

六、即便通信场景

经过消息队列实现聊天。缓存

七、数据同步

在实际业务中数据库的部分字段多是冗余字段,或者说冗余字段的值是其它表几个字段的值的和,此种场景数据同步可经过mq来实现数据同步,以避免相关字段的保存修改操做接口显得不三不四服务器

 

2、消息队列的使用技巧(举例用activemq)

一、预防消息丢失

1)对于重要业务最好在redis里面保存一份copysession

2)消息回执,制定检查机制,肯定消息 已经被执行

3)定时器,查漏补缺,特别是数据同步,缓存设置等场景

 

二、使用队列应该先肯定是使用一对一模式仍是一对多模式

部分业务多是但愿多个平台不一样的项目多能干收到消息,应使用发布订阅模式。

有些业务涉及并发,可能须要要求一对一关系,须要控制。

三、消息队列的推模式是经过监听器来完成的,监听器一次能够监听多个通道

以java举例

<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字,多个用逗号隔开 -->  
        <constructor-arg index="0" value="chanel1,channel2,channel3,channel4" />  
    </bean>  
    <!-- 消息监听容器 -->  
    <bean id="listenerContainer"    
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
        <property name="connectionFactory" ref="mqFactory" />    
        <property name="destination" ref="myDestination"/>    
        <property name="messageListener" ref="messageDelegateListenerImpl" />    
        <property name="pubSubDomain" value="true" /> 
         <property name="pubSubNoLocal" value="false" /> 
        
        
    </bean>

 

四、发送textMessage

发送消息最好发送textMessage,若是是对象将对象转换为json字符串便可,由于textMessage能够直接在控制台发送,方便本身调试和测试人员进行测试。

/**
	 * 发布消息
	 * 
	 * @param channel
	 * @param message
	 * @date 2016年7月19日
	 */
	public static void sendTopic(String channel, Object message) {
		String messageStr = "";
		if (message.getClass().isAssignableFrom(String.class)) {
			messageStr = (String) message;
		} else {
			messageStr = JsonUtil.objectToJsonStr(message);
		}
		final String finalMessage = messageStr;
		try {
			final Topic destination = publishJmsTemplate.getConnectionFactory().createConnection().createSession(false,Session.AUTO_ACKNOWLEDGE).createTopic(channel);
			publishJmsTemplate.send(destination, new MessageCreator() {
				public Message createMessage(Session session) throws JMSException {
					log.info("topic name is" + destination.toString() + ",publish content is:\n" + finalMessage);
					TextMessage textMessage = session.createTextMessage(finalMessage);
					textMessage.setJMSExpiration(TimeUnit.DAYS.toSeconds(1));//发布的消息保留一天
					return session.createTextMessage(finalMessage);//text message 在mq控制台也能够发,便于测试和维护
				}
			});
		} catch (Exception e) {
			log.error("--------------添加生产队列对象失败" + e.toString(), e);
		}
	}

如图,activemq控制台仅容许发送textMessage

五、可经过设置多个通道来提升消息消费的效率

方案是编写算法将消息路由到设置好的不一样渠道,须要注意的是多渠道消费消息可能出现并发问题,注意控制,能够将同一类型的消息或者相同的消息发送到同一个渠道,经过算法去控制并发的问题。

相关文章
相关标签/搜索