消息中间件ActiveMQ(二)HelloWorld入门实例

1、JMS消息发送模式

  • 在点对点或队列模型

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。

  • 发布者/订阅者模型:

支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

2、JMS应用程序接口

  • ConnectionFactory 接口(连接工厂)

用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

  • Connection 接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

  • Destination 接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

  • MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

  • MessageProducer 接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

  • Message 接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

  • Session 接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

3、消息队列

把ActiveMQ依赖的jar包添加到工程中:activemq-all-5.12.0.jar

使用maven工程,则添加jar包的依赖:

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.2</version>
</dependency>

3.1、Producer:

public class QueueSender {

	public static void main(String[] args) {
		//创建一个连接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//从工厂对象中获得连接
			Connection connection = connectionFactory.createConnection();
			//开启连接
			connection.start();
			/*
			connection.createSession(paramA, paramB)
			A)paramA设置为true时:
			paramB的值忽略, acknowledgment mode被jms服务器设置 SESSION_TRANSACTED 。
			当一个事务被提交的时候,消息确认就会自动发生。
			B) paramA设置为false时:
			Session.AUTO_ACKNOWLEDGE为自动确认,当客户成功的从receive方法返回的时候,或者从
			MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
			Session.CLIENT_ACKNOWLEDGE 为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的
			acknowledge方法。jms服务器才会删除消息。(默认是批量确认)
			*/
			//开启一个回话,第一个参数指定不使用事务,第二个参数指定客户端接收消息的确认方式
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建一目的地Queue或者是Topic
			Queue queue = session.createQueue("mytestqueue");
			//创建一个生产者
			MessageProducer producer = session.createProducer(queue);
			//创建message
			TextMessage message = new ActiveMQTextMessage();
			message.setText("hello");
			//发送消息
			producer.send(message);
			//关闭
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

3.2、Consumer:

消费者有两种消费方法:

1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

3.2.1、同步消费:

public class QueueConsumer {

	public static void main(String[] args) {
		//创建一连接工厂
		ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//创建一个连接
			Connection connection = connectionFactory.createConnection();
			//打开连接
			connection.start();
			//创建一个回话
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建一个目的地Destination 
			Queue queue = session.createQueue("mytestqueue");
			//创建一个消费者
			MessageConsumer consumer = session.createConsumer(queue);
			while(true) {
				//设置接收者接收消息的时间,为了便于测试,这里定为100s
				Message message = consumer.receive(100000);
				if (message != null) {
					System.out.println(message);
				} else {
					//超时结束
					break;
				}
				
			}
			consumer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
}

3.2.2、异步消费:

public class QueueConsumer {

	public static void main(String[] args) {
		//创建一连接工厂
		ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//创建一个连接
			Connection connection = connectionFactory.createConnection();
			//打开连接
			connection.start();
			//创建一个回话
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建一个目的地Destination 
			Queue queue = session.createQueue("mytestqueue");
			//创建一个消费者
			MessageConsumer consumer = session.createConsumer(queue);
			consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					if (message instanceof TextMessage) {
						String text = "";
						try {
							text = ((TextMessage)message).getText();
						} catch (JMSException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						System.out.println(text);
					}
					
				}
			});
			System.in.read();
			//关闭
			consumer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
}

4、发布者/订阅者

4.1、Producer:

public class TopicProducer {

	public static void main(String[] args) {
		//创建连接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//创建连接
			Connection connection = connectionFactory.createConnection();
			//开启连接
			connection.start();
			//创建一个回话
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建一个Destination,queue或者Topic
			Topic topic = session.createTopic("mytopic");
			//创建一个生成者
			MessageProducer producer = session.createProducer(topic);
			//创建一个消息
			TextMessage textMessage = new ActiveMQTextMessage();
			textMessage.setText("hello my topic");
			//发送消息
			producer.send(textMessage);
			//关闭
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

4.2、Consumer:

public class TopicConsumer {

	public static void main(String[] args) {
		//创建连接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//创建连接
			Connection connection = connectionFactory.createConnection();
			connection.start();
			//创建一个会话
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建一个目标
			Destination destination = session.createTopic("mytopic");
			//创建一个消费者
			MessageConsumer consumer = session.createConsumer(destination);
			//接收消息
			consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					
					System.out.println(message);
					
				}
			});
			//暂停
			System.in.read();
			//关闭
			consumer.close();
			session.close();
			connection.close();
			
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}