消息队列,Active MQ

随手笔记:

消息队列应用在分布式系统中,它的作用主要有:解耦、异步处理请求、流量削峰等

在系统常见的高并发的情况下容易发生线程堵塞,来不及同步处理,比如大量的insert,update操作同时到达数据库,会导致数据库的压力过大,请求堆积,通过使用消息队列,异步处理请求,缓解压力。

消息队列的中间件好多,本文只记录学习active mq

active mq是apache出品

主要特点:

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBC和journal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

官网下载

进入http://activemq.apache.org/下载ActiveMQ 解压在bin目录win32/64的目录下运行activemq.bat文件,可视化窗口http://localhost:8161/admin/index.jsp

JMS消息发送模式(PTP,PUB/SUB)

JMS接口

ConnectionFactory接口(连接工厂)

用户用来创建JMS提供者的连接的被管对象。JMS客户通过可以指的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,jms客户才能够查找到她们,根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory(brokenURL);//创建连接工厂

Connection接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接受队列和主题到目标。

Connection connection = connectionFactory.creationConnection();//创建连接通过工厂

connection.start();//启动连接;

Session接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。再提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

Session session = connenction.createSession(bfalse,Session.AUTO_ACKNOWLEDGE);//使用connection对象创建session对象,参数1表示是否支持事务,当true事,第二个参数忽略,第一个参数为false时,第二个参数才有意义,消息的应答模式:手动应答,自动应答,一般都为自动应答。

Destination接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。jms管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

Queue queue = session.createQueue("队列的名称");

Topic topic = session.createTopic("主题名称");//使用session对象创建Destination对象(topic、queue)

MessageConsumer接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步的或异步接受队列和主题类型的消息

MessageConsumer consumer =  session.createConsumer(queue);//创建一个consumer对象 消费者对象

消费者有两种消费方法::

1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

     实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。
 

consumer有两种消费方法:

1.同步消费,通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

while(true){

       TextMessage message = consumer.receive(10000);//设置接收者接收消息的时间,便于测试

if (message !=null) {
                       System.out.println(message);
                   }else {
                      //超时结束
                      break;
                   }

}

2、异步消费//topic一样

客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

     实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

// 第七步:接收消息。
         consumer.setMessageListener(new MessageListener() {
             
              @Override
              public void onMessage(Message message) {
                   try {
                       TextMessage textMessage = (TextMessage) message;
                       String text = null;
                       //取消息的内容
                       text = textMessage.getText();
                       // 第八步:打印消息。
                       System.out.println(text);
                   } catch (JMSException e) {
                       e.printStackTrace();
                   }
              }
         });
         //等待键盘输入
         System.in.read();
 

MessageProducer接口(消息生产者)

由会话创建的对象,用于发送消息带目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标

MessageProducer producer=session.createProducer(queue);//使用session创建一个producer对象;

MessageProducer producer=session.createProducer(topic);//使用session创建一个producer对象;
Message接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。一个消息分为三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息,对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

TextMessage message = session.createTextMessage("hello activeMq,this is my first test.");//使用session创建textmessage对象

producer.send(message)//使用消息生产者对象发送消息

connection.close();

producer.close();

session.close();//关闭资源