博主以前的一个高并发需求:Java并发(三):实例引出并发应用场景中所提到的,后来通过初步测试发现多线程并不能彻底知足需求,特别是性能上的需求,或者说多线程不是比较好的解决方案,真实需求是:将商品库存(第三方数据库上)"及时"通知第三方的网购平台,达到同步商品余量信息的目的,本地是存儲了相应的阈值,在第三方数据库上的库存一旦少于库存,咱们就认为这件商品已经售罄,由于要防止线上线下同一时间段销售引发的库存紧张,甚至订单已经发出但库存实际不足的状况...以前多线程定时访问库存并同步数据显然很是低效,主管老哥推荐我使用消息队列来解决问题,顿时一脸懵,消息队列是啥??html
消息队列的基本概念:java
消息队列(Message queue)是一种进程间通讯或同一进程的不一样线程间的通讯方式,软件的贮列用来处理一系列的输入,一般是来自用户。消息队列提供了异步的通讯协议,每个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不须要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 ——维基百科spring
博主使用的消息队列中间件是ActiveMQ,为何用它呢?数据库
以上的总结比较官方,归纳来讲,ActiveMQ的优点在于它是Java语言开发,在基于Spring的项目上容易内嵌,很大程度的减小耦合,提供可靠的任务异步处理.apache
ActiveMQ的通讯模式:服务器
1.点对点(queue)session
2.发布/订阅模式(topic)多线程
如何实现?并发
业务需求是用发布-订阅模式完成,我负责消费者部分的代码,一开始是这样实现的,五步走:dom
public class RoundRobinConfig1 { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private InventoryService inventoryService; @Scheduled(cron = "0 53 * * * ?")//每2分钟调度一次任务 public void operation(){ ConnectionFactory connectionFactory; // 链接工厂 Connection connection = null; // 链接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer consumer; //建立消费者 // 实例化链接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 经过链接工厂获取链接 connection.start(); // 启动链接 /** * 这里的最好使用Boolean.FALSE,若是是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化状况。 */ session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session // destination=session.createQueue("FirstQueue1"); // 建立消息队列 destination=session.createTopic("firstTopic"); consumer=session.createConsumer(destination); consumer.setMessageListener(new MyListener()); // 注册消息监听 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
具体业务逻辑写在listener里,你们使用时别忘了引入maven依赖
<!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency>
而后就进行初步测试,喜闻乐见地遇到问题了:
博主通过一通资料的查阅,依旧没有搞懂问题所在,最后问同事要来了生产者的代码,发现了问题可能出在这里:
StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
生产者用了这个链接工厂获取链接,随即百度了一下Stomp,了解到这实际上是一种消息格式协议,另外还有AMQP,OPENWIRE,MQTT等,几种消息协议的概述能够戳我,我便换成了StompJmsConnection对象来获取链接,结果成功获取到消息体:
因为须要让订阅消息队列的程序一直运行,我采起官方推荐的死循环方式处理,而且使其在模块启动时运行,后来考虑了一下,万一死循环出现异常,那整个模块不就宕了吗,因而我给模块建立了一个子进程用来轮询消息队列,这样子进程就算挂了,整个模块也不受影响了:
import com.google.gson.Gson; import com.ycyz.framework.task.domain.Inventorycache; import com.ycyz.framework.task.service.InventorycacheService; import org.fusesource.hawtbuf.Buffer; import org.fusesource.stomp.jms.StompJmsConnectionFactory; import org.fusesource.stomp.jms.StompJmsDestination; import org.fusesource.stomp.jms.message.StompJmsBytesMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.*; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author YHW * @ClassName: AutoRunner * @Description: * @date 2019/1/7 8:27 */ @Order(value = 1) @Component public class AutoRunner implements ApplicationRunner { private Logger logger = LoggerFactory.getLogger(getClass()); Map map = new HashMap(16); String result = null; String user = env("ACTIVEMQ_USER", ""); String password = env("ACTIVEMQ_PASSWORD", ""); String host = env("ACTIVEMQ_HOST", "域名"); String destination = "/topic/bn.stock.prod"; int port = Integer.parseInt(env("ACTIVEMQ_PORT", "端口号")); Destination dest = new StompJmsDestination(destination); @Resource private InventorycacheService inventorycacheService; Gson gson = new Gson(); StompJmsConnectionFactory factory = new StompJmsConnectionFactory(); @Override public void run(ApplicationArguments args) throws Exception{ logger.info("开始运行了..."); Thread thread = new Thread(){ @Override public void run() { MessageConsumer consumer = null; Connection connection = null; long start = 0L; long count = 0L; try { factory.setBrokerURI("tcp://" + host + ":" + port); connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest); start = System.currentTimeMillis(); count = 1; System.out.println("Waiting for messages..."); while (true) { System.out.println("轮询消息队列.."); Message message = null; try { message = consumer.receive(); if (message instanceof StompJmsBytesMessage) { StompJmsBytesMessage sm = (StompJmsBytesMessage) message; Buffer buffer = sm.getContent(); byte[] a = buffer.getData(); result = new String(a);if (result.contains("SHUTDOWN")) { long diff = System.currentTimeMillis() - start; System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0))); break; } //result是获取到的消息字符串,这里开始处理它 } }catch(Exception e) { e.printStackTrace(); continue; } } connection.close(); }catch(JMSException e) { e.printStackTrace(); } } }; thread.start(); } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if( rc== null ){ return defaultValue; } return rc; } private static void flagTrigger(Inventorycache inventorycache){ if(new Integer(1).equals(inventorycache.getFlag())){ inventorycache.setFlag(0); }else{ inventorycache.setFlag(1); } } private static String getResult(String theWholeMessage){ int startFlag = 0; int endFlag = 0; for (int i = 0; i < theWholeMessage.length(); i++) { if (theWholeMessage.charAt(i) == '{') { startFlag = i; } else if (theWholeMessage.charAt(i) == '}') { endFlag = i; } } return theWholeMessage.substring(startFlag, endFlag + 1); } }
这样就初步完成,生产者只管往队列里"塞"待处理消息,消费者只管"拿"消息来处理,作到了有效的应用程序解耦.
固然我也不肯定还有没有更好的方案,博主才疏学浅,懂得太少,但愿有看到的大牛可以不吝赐教,谢谢了