JMS学习(三)----- ActiveMQ简单的HelloWorld实例

源码下载:http://git.oschina.net/zhengweishan/JMS_Study_Demohtml

开发环境

我使用的是ActiveMQ 5.13.3 Release的Windows版,官网最新版是ActiveMQ 5.13.4 Release,你们能够自行下载,下载地址java

须要注意的是,开发时候,要将apache-activemq-5.13.3-bin.zip解压缩后里面的activemq-all-5.13.3.jar包加入到classpath下面,这个包包含了全部jms接口api的实现。git

项目截图:apache

ActiviteMQ消息有3中形式

JMS 公共 ----------点对点域 ----------发布/订阅域api

ConnectionFactory ---------- QueueConnectionFactory ---------- TopicConnectionFactory服务器

Connection ---------- QueueConnection ---------- TopicConnectionsession

Destination ---------- Queue ---------- Topiceclipse

Session ---------- QueueSession ---------- TopicSessionspa

MessageProducer ---------- QueueSender ---------- TopicPublisher.net

MessageConsumer ---------- QueueReceiver ---------- TopicSubscriber

(1)、点对点方式(point-to-point)

点对点的消息发送方式主要创建在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端能够在任什么时候刻发送信息到Queue,而不须要知道接收客户端是否是在运行

(2)、发布/订阅 方式(publish/subscriber Messaging)

发布/订阅方式用于多接收客户端的方式.做为发布订阅的方式,可能存在多个接收客户端,而且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他建立之后发送客户端发送的信息。做为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

ActiviteMQ接收和发送消息基本流程

发送消息的基本步骤:

(1)、建立链接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory创建链接Connection,并启动

(3)、使用链接Connection 创建会话Session

(4)、使用会话Session和管理对象Destination建立消息生产者MessageSender

(5)、使用消息生产者MessageSender发送消息

消息接收者从JMS接受消息的步骤

(1)、建立链接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory创建链接Connection,并启动

(3)、使用链接Connection 创建会话Session

(4)、使用会话Session和管理对象Destination建立消息接收者MessageReceiver

(5)、使用消息接收者MessageReceiver接受消息,须要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,须要定义onMessage事件方法。

使用JMS方式发送接收消息

package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MQConnectionFactory {
	
	private  static final  String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认链接用户名
    private  static final  String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认链接密码
    private  static final  String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址
    
    private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);//链接工厂
    
    /**
     * 经过链接工厂获取链接
     * [@return](http://my.oschina.net/u/556800)
     */
    public static Connection getConnection(){
    	Connection connection = null;
    	try {
			connection = connectionFactory.createConnection();
		} catch (JMSException e) {
			e.printStackTrace();
		}
    	return connection;
    }
}

package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JMSConsumer {
	

    public static void main(String[] args) {
        Connection connection = null;//链接
        
        Session session = null;//会话 接受或者发送消息的线程
        
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        try {
            //经过链接工厂获取链接
            connection = MQConnectionFactory.getConnection();
            //启动链接
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一个链接HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //建立消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText());
                }else {
                    break;
                }
            }
            //提交回话
            session.commit();

        } catch (JMSException e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session !=null){
            	try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }

    }
}


package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JMSProducer {

    //发送的消息数量
    private static final int SENDNUM = 10;
    
	public static void main(String[] args) {

        //链接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session = null;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
       

        try {
            //经过链接工厂获取链接
            connection = MQConnectionFactory.getConnection();
            //启动链接
            connection.start();
            //建立session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //建立一个名称为HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //建立消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);
            //提交回话
            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session !=null){
            	try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }
	}
	
	/**
     * 发送消息
     * [@param](http://my.oschina.net/u/2303379) session
     * [@param](http://my.oschina.net/u/2303379) messageProducer  消息生产者
     * [@throws](http://my.oschina.net/throws) Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            //建立一条文本消息 
            TextMessage message = session.createTextMessage("发送JMS消息第" + (i + 1) + "条");
            System.out.println("发送消息:Activemq 发送JMS消息" + (i + 1));
            //经过消息生产者发出消息 
            messageProducer.send(message);
        }

    }
}

Queue队列方式发送点对点消息数据

在获取工厂类中加入以下代码:

private static QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

/**
 * 经过链接工厂获取链接(Queue方式)
 * [@return](http://my.oschina.net/u/556800)
 */
public static QueueConnection getQueueConnection(){
	QueueConnection connection = null;
	try {
		connection = queueConnectionFactory.createQueueConnection();
	} catch (JMSException e) {
		e.printStackTrace();
	}
	return connection;
}


//消息生产者
package com.active.mq.demo;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

public class QueueProducer {
	 private static final int SEND_NUM = 10;


	public static void main(String[] args) {
		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		try {
			// 经过工厂建立一个链接
			queueConnection = MQConnectionFactory.getQueueConnection();
            // 启动链接
			queueConnection.start();
            // 建立一个session会话
			queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一个消息队列
            Queue queue = queueSession.createQueue("QueueMsgDemo");
            // 建立消息发送者
            QueueSender sender = queueSession.createSender(queue);
            // 设置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(queueSession, sender);
            // 提交会话
            queueSession.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
            // 关闭释放资源
            if (queueSession != null) {
            	try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
            if (queueConnection != null) {
            	try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }
	}
	 
	 
	 public static void sendMessage(QueueSession session, QueueSender sender) throws Exception {
	        for (int i = 0; i < SEND_NUM; i++) {
	            String message = "发送queue消息第" + (i + 1) + "条";
	            //建立一个Map集合信息
	            MapMessage map = session.createMapMessage();
	            map.setString("text", message);
	            map.setLong("time", System.currentTimeMillis());
	            System.out.println("ActiveMQ 发送queue消息:"+(i + 1));
	            sender.send(map);
	        }
	    }
}

//消费者
package com.active.mq.demo;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

public class QueueConsumer {

	public static void main(String[] args) {
		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		try {
			// 经过工厂建立一个链接
			queueConnection = MQConnectionFactory.getQueueConnection();
			// 启动链接
			queueConnection.start();
			// 建立一个session会话
			queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			// 建立一个消息队列
			Queue queue = queueSession.createQueue("QueueMsgDemo");
			// 建立消息接收者
			QueueReceiver receiver = queueSession.createReceiver(queue);

			receiver.setMessageListener(new MessageListener() {
				public void onMessage(Message msg) {
					if (msg != null) {
						MapMessage map = (MapMessage) msg;
						try {
							System.out.println(map.getLong("time") + "接收到消息#" + map.getString("text"));
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				}
			});
			// 休眠100ms再关闭
			Thread.sleep(1000 * 100);

			// 提交会话
			queueSession.commit();

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 关闭释放资源
			if (queueSession != null) {
				try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (queueConnection != null) {
				try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

Topic主题发布和订阅消息

在获取工厂类中加入以下代码:

private static TopicConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

/**
 * 经过链接工厂获取链接(Topic方式)
 * @return
 */
public static TopicConnection getTopicConnection(){
	TopicConnection topicConnection = null;
	try {
		topicConnection = topicConnectionFactory.createTopicConnection();
	} catch (JMSException e) {
		e.printStackTrace();
	}
	return topicConnection;
}

//生产者
package com.active.mq.demo;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

public class TopicProducer {
	private static final int SEND_NUM = 10;

	public static void main(String[] args) {
		  TopicConnection connection = null;
	      TopicSession session = null;
	        try {
	            // 经过工厂建立一个链接
	            connection = MQConnectionFactory.getTopicConnection();
	            // 启动链接
	            connection.start();
	            // 建立一个session会话
	            session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
	            // 建立一个消息队列
	            Topic topic = session.createTopic("TopicDemo");
	            // 建立消息发送者
	            TopicPublisher publisher = session.createPublisher(topic);
	            // 设置持久化模式
	            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	            sendMessage(session, publisher);
	            // 提交会话
	            session.commit();
	            
	        } catch (Exception e) {
	            e.printStackTrace();
	        } finally {
	            // 关闭释放资源
	            if (session != null) {
	                try {
						session.close();
					} catch (JMSException e) {
						e.printStackTrace();
					}
	            }
	            if (connection != null) {
	                try {
						connection.close();
					} catch (JMSException e) {
						e.printStackTrace();
					}
	            }
	        }
	}
	
	public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送Topic消息第" + (i + 1) + "条";
            
            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println("ActiveMQ 发送Topic消息:"+(i + 1));
            publisher.send(map);
        }
    }
}

//消费者
package com.active.mq.demo;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

public class TopicConsumer {
	
	public static void main(String[] args) {
		TopicConnection connection = null;
	    TopicSession session = null;
	    try {
	        // 经过工厂建立一个链接
	        connection = MQConnectionFactory.getTopicConnection();
	        // 启动链接
	        connection.start();
	        // 建立一个session会话
	        session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
	        // 建立一个消息队列
	        Topic topic = session.createTopic("TopicDemo");
	        // 建立消息消费者
	        TopicSubscriber subscriber = session.createSubscriber(topic);
	        
	        subscriber.setMessageListener(new MessageListener() { 
	            public void onMessage(Message msg) { 
	                if (msg != null) {
	                    MapMessage map = (MapMessage) msg;
	                    try {
	                        System.out.println(map.getLong("time") + "Topic接收消息#" + map.getString("text"));
	                    } catch (JMSException e) {
	                        e.printStackTrace();
	                    }
	                }
	            } 
	        }); 
	        // 休眠100ms再关闭
	        Thread.sleep(1000 * 100); 
	        // 提交会话
	        session.commit();
	        
	    } catch (Exception e) {
	        e.printStackTrace();
	    } finally {
	        // 关闭释放资源
	        if (session != null) {
	            try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
	        }
	        if (connection != null) {
	            try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
	        }
	    }
	}
	
}

运行

以使用JMS方式发送接收消息为例说明 一、首先,启动ActiveMQ 二、运行发送者,eclipse控制台输出,以下图:

三、查看ActiveMQ服务器,Queues内容以下:

咱们能够看到建立了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,咱们也能够经过Browse查看是哪些消息,若是这些队列中的消息,被删除,消费者则没法消费。

四、运行一下消费者,eclipse控制台打印消息,以下:

五、咱们在查看一下ActiveMQ服务器,Queues内容以下:

咱们能够看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。 点击Active Consumers,咱们能够看到这个消费者的详细信息。

实例到此就结束了,你们能够本身多看点ActiveMQ服务器的内容,进一步熟悉ActiveMQ。

相关文章
相关标签/搜索