消息中间件之ActiveMQ的Demo测试

一,消息中间件的简单介绍:java

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成mysql

 

二.JMSsql

 

JMSJava Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发。数据库

 

       JMS自己只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它相似于 JDBC(java Database Connectivity):这里,JDBC 是能够用来访问许多不一样关系数据库 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。apache

 

  JMS 定义了五种不一样的消息正文格式:浏览器

 

    · TextMessage--一个字符串对象session

 

    · MapMessage--一套名称-值对maven

 

    · ObjectMessage--一个序列化的 Java 对象tcp

 

    · BytesMessage--一个字节的数据流分布式

 

    · StreamMessage -- Java 原始值的数据流

 

 

 

  JMS消息传递类型

    点对点模式:即一个生产者和一个消费者一一对应;

 

    发布/订阅模式:即一个生产者产生消息并进行发送后,能够由多个消费者进行接收

--------------------------------------------------------

ActiveMQ官方网站下载:http://activemq.apache.org/

ActiveMQ安装完成并启动完成以后,在浏览器访问http://IP地址:8161/ 便可进入ActiveMQ管理页面  u/p:admin/admin

 

 

点对点模式Demo

 

建立Maven工程(jar)

pom.xml引入依赖

 

<dependencies>
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
    </dependency>
</dependencies>

<build>
  <plugins>
    <!-- java编译插件 -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.2</version>
      <configuration>
        <source>1.7</source>
        <target>1.7</target>
      <encoding>UTF-8</encoding>
      </configuration>
    </plugin>
  </plugins>
</build>

 

 

生产者:

 

 

public class QueueProducer {

 

public static void main(String[] args) throws JMSException {

 

//1.建立链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");

//2.获取链接
Connection connection = connectionFactory.createConnection();

//3.启动链接
connection.start();

//4.获取Session(参数1为是否启动事务,第二个参数:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.建立队列对象
Queue queue = session.createQueue("test-queue");

//6.建立消息生产者
MessageProducer producer = session.createProducer(queue);

//7.建立消息

TextMessage textMessage = session.createTextMessage("AvtiveDemo测试");

//8.发送消息
producer.send(textMessage);

//9.关闭资源

producer.close();


}

 

}

 

 

消费者:

 


public class QueueConsumer {

 

public static void main(String[] args) throws JMSException, IOException {

 

//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立队列对象
Queue queue = session.createQueue("test-queue");
//6.建立消息消费
MessageConsumer consumer = session.createConsumer(queue);

//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
  System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}

 

}

 

消费者:

public class QueueConsumer {

public static void main(String[] args) throws JMSException, IOException {

//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立队列对象
Queue queue = session.createQueue("test-queue");
//6.建立消息消费
MessageConsumer consumer = session.createConsumer(queue);

//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}

}

 

 

 发布/订阅模式

生产者

public static void main(String[] args) throws JMSException {

//1.建立链接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.获取链接
Connection connection = connectionFactory.createConnection();
//3.启动链接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主题对象
Topic topic = session.createTopic("test-topic");
//6.建立消息生产者
MessageProducer producer = session.createProducer(topic);
//7.建立消息
TextMessage textMessage = session.createTextMessage("topic测试");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}

 

消费者:消费者能够建多个运行接收信息

public static void main(String[] args) throws JMSException, IOException {

//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); }

相关文章
相关标签/搜索