一,消息中间件的简单介绍:java
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成mysql
二.JMSsql
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发。数据库
JMS自己只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它相似于 JDBC(java Database Connectivity):这里,JDBC 是能够用来访问许多不一样关系数据库的 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。apache
JMS 定义了五种不一样的消息正文格式:浏览器
· TextMessage--一个字符串对象session
· MapMessage--一套名称-值对maven
· ObjectMessage--一个序列化的 Java 对象tcp
· BytesMessage--一个字节的数据流分布式
· StreamMessage -- Java 原始值的数据流
JMS消息传递类型
点对点模式:即一个生产者和一个消费者一一对应;
发布/订阅模式:即一个生产者产生消息并进行发送后,能够由多个消费者进行接收
--------------------------------------------------------
ActiveMQ官方网站下载:http://activemq.apache.org/
ActiveMQ安装完成并启动完成以后,在浏览器访问http://IP地址:8161/ 便可进入ActiveMQ管理页面 u/p:admin/admin
点对点模式Demo
建立Maven工程(jar)
pom.xml引入依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
生产者:
public class QueueProducer {
public static void main(String[] args) throws JMSException {
//1.建立链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取Session(参数1为是否启动事务,第二个参数:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立队列对象
Queue queue = session.createQueue("test-queue");
//6.建立消息生产者
MessageProducer producer = session.createProducer(queue);
//7.建立消息
TextMessage textMessage = session.createTextMessage("AvtiveDemo测试");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
}
}
消费者:
public class QueueConsumer {
public static void main(String[] args) throws JMSException, IOException {
//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立队列对象
Queue queue = session.createQueue("test-queue");
//6.建立消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
消费者:
public class QueueConsumer {
public static void main(String[] args) throws JMSException, IOException {
//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立队列对象
Queue queue = session.createQueue("test-queue");
//6.建立消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
发布/订阅模式
生产者
public static void main(String[] args) throws JMSException {
//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主题对象
Topic topic = session.createTopic("test-topic");
//6.建立消息生产者
MessageProducer producer = session.createProducer(topic);
//7.建立消息
TextMessage textMessage = session.createTextMessage("topic测试");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
消费者:消费者能够建多个运行接收信息
public static void main(String[] args) throws JMSException, IOException {
//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); }