ActiveMQ

ActiveMQ入门、ActiveMQ的使用
  • ActiveMQ入门概念
  1. 什么是JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

常见的MOM(消息中间件)包括:ActiveMQ、RocketMQ、RabbitMQ等

  1. 什么是ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演着重要角色,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断地提升版本,80%以上的业务我们用ActiveMq都能满足,当然后续如天猫淘宝这种大型的电商网站,尤其是双十一这种特殊事件,ActiveMQ需要进行很复杂的优化源码以及架构设计才能完成,我们只会会学习一个更强大的分布式消息中间件,RocketMQ,可以说ActiveMQ是和谐,是基础,所以也必须要掌握好。


  1. activemq的安装
  • 上传到centos7中
  • 解压缩
  • 运行:./activemq start
  • 关闭:./activemq stop
  • 查看状态:./activemq status
  • 访问:http://10.31.152.30:8161/admin
    • 用户名:admin
    • 密码:admin

  • 需求:ActiveMQ的消息形式
  1. 点对点

p2p的过程则理解起来更加简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,它的通信场景如下图所示

image

  1. 发布/订阅

发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

image


  • 需求:JMS消息格式

JMS总共提供了6个消息接口,分别为Message,TextMessage,StreamMessage,MapMessage,ObjectMessage以及ByteMessage,每种类型的消息都由3部分组成:消息头,消息属性和消息体

  • 需求:完成点对点模式发送与接收
public class QueueTest{
	@Test
	public void testProducer() throws Exception{
		//第一步:创建ConnectionFactory对象,需要制定服务器IP和端口号
		//brokerURL服务器的IP和端口号
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.31.152.30:61616");
		//第二步:创建Connection对象
		Connection connetion = connectionFactory.createConnection();
		//第三步:开启连接
		connection.start();
		//第四步:使用连接对象创建会话对象
		//第一个参数:是否开启事务,true为开启,第二个参数忽略
		//第二个参数:当第一个参数为false才有意义。消息应答模式:1、自动应答 2、手动应答,一般为自动应答即可
		Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		//第五步:使用会话对象创建Destination对象(topic,queue),这里创建一个queue对象
		//参数: 队列名称
		Queue queue = session.createQueue("test-queue");
		//第六步:使用会话对象创建生产者对象
		MessageProducer producer = session.createProducer(queue);
		//第七步:创建TextMessage对象
		TextMessage textMessage = session.createTextMessage("hello activemq");
		//第八步:使用producer对象发送消息
		producer.send(textMessage);
		//第九步:关闭资源
		producer.close();
		session.close();
		connection.close();
	}

	@Test
	public void testConsumer() throws Exception{
		//第一步:创建ConnectionFactory对象,需要制定服务器IP和端口号
		//brokerURL服务器的IP和端口号
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.31.152.30:61616");
		//第二步:创建Connection对象
		Connection connetion = connectionFactory.createConnection();
		//第三步:开启连接
		connection.start();
		//第四步:使用连接对象创建会话对象
		//第一个参数:是否开启事务,true为开启,第二个参数忽略
		//第二个参数:当第一个参数为false才有意义。消息应答模式:1、自动应答 2、手动应答,一般为自动应答即可
		Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		//第五步:使用会话对象创建Destination对象(topic,queue),这里创建一个queue对象
		//参数: 队列名称
		Queue queue = session.createQueue("test-queue");
		//第六步:使用会话对象创建消费者对象
		MessageConsumer consumer = session.createConsumer(queue);
		//第七步:接收消息
		consumer.setMessageListener(new MessageListener(){
			public void onMessage(Message message){
				try{
					TextMessage textMessage = (TextMessage)message;
					String text = textMessage.getText();
					//第八步:打印消息
					System.out.println(text);
				}catch(JMSException e){
					e.printStackTrace();
				}
			}
		});
		System.in.read();
		
		//第九步:关闭资源
		consumer.close();
		session.close();
		connection.close();
	}
}


  • 需求:发布商品,并将商品指定信息存入索引库

  • 使用activemq解耦合,即不让后台管理系统和solr耦合度太高

  • 发布者要做的事情:

    1. 发布商品
    2. 发布消息
  • 订阅者要做的事情:

    1. 接收消息,需要创建MessageListener接口的实现类
    2. 取消息,取商品ID
    3. 根据商品ID查询数据库
    4. 创建SolrInputDocument对象
    5. 使用SolrServer对象写入索引库
  • 详细的步骤

  • 在tt-common工程中添加依赖

<!-- 消息队列相关 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>
  • 并且可将tt-manager工程中spring依赖转移到tt-common中,并且将如下依赖加入
<!--消息服务中间件-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>
  • 在tt-manager-web中添加spring-activemq.xml
  • 在tt-manager-web的ItemAction修改saveItem方法
@Autowired
	private JmsTemplate jmsTemplate;
	@Resource
	private Destination topicDestination;
@RequestMapping(value = "/item",method = RequestMethod.POST)
     @ResponseBody
	public MessageResult saveItem(TbItem item, String desc, String itemParams) {
		Long itemId = itemService.saveItem(item, desc, itemParams);
		
		//发送商品添加消息
		jmsTemplate.send(topicDestination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage(itemId + "");
				return textMessage;
			}
		});
		
		MessageResult ms = new MessageResult();
		ms.setSuccess(true);
		ms.setMessage("新增1个商品成功");
		return ms;
	}
  • 完成com.dhc.ttshop.message.ItemAddMessageListener.java
/**
 * 监听商品添加消息,接收消息后,将对应的商品信息同步到索引库
 */
public class ItemAddMessageListener implements MessageListener {

	@Autowired
	private TbItemMapperCustom itemMapperCustom;
	@Autowired
	private SolrServer solrServer;

	@Override
	public void onMessage(Message message) {

		try {
			//从消息中取商品id
			TextMessage textMessage = (TextMessage) message;
			String text = textMessage.getText();
			Long itemId = new Long(text);
			//等待事务提交
			//Thread.sleep(1000);
			//根据商品id查询商品信息
			TbSearchItemCustom searchItem = itemMapperCustom.getSearchItemById(itemId);
			//创建一个文档对象
			SolrInputDocument document = new SolrInputDocument();
			//向文档对象中添加域
			document.addField("id", searchItem.getId());
			document.addField("item_title", searchItem.getTitle());
			document.addField("item_sell_point", searchItem.getSellPoint());
			document.addField("item_price", searchItem.getPrice());
			document.addField("item_image", searchItem.getImage());
			document.addField("item_category_name", searchItem.getCatName());
			//把文档写入索引库
			solrServer.add(document);
			//提交
			solrServer.commit();
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
  • 将监听器配置到spring容器中spring-activemq.xml
<!--配置消费者-->
    <bean id="itemAddMessageListener" class="com.dhc.ttshop.message.ItemAddMessageListener"/>
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="itemAddMessageListener"/>
    </bean>
  • 对应dao层添加接口,并且在xml中添加内容
<select id="getSearchItemById" parameterType="long" resultType="com.dhc.ttshop.pojo.vo.TbSearchItemCustom">
    SELECT
    i.id,
    i.title,
    i.sell_point as sellPoint,
    i.price,
    i.image,
    c.name as catName
    FROM
    tb_item i
    LEFT JOIN tb_item_cat c ON i.cid = c.id
    WHERE
    i.status = 1
    AND i.id=#{itemid}
  </select>
  • 启动tt-manager-web工程添加商品,启动tt-search-web全文搜索“手机”查看新增商品是否在查询列表中,有的话,则表示成功。