消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。apache
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。centos
对于消息的传递有两种类型:浏览器
一种是点对点的,即一个生产者和一个消费者一一对应;服务器
另外一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,能够由多个消费者进行接收。 session
1.官方网站下载:http://activemq.apache.org/tcp
2.centos上安装ActiveMQ
分布式
1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器ide
2)解压测试
tar zxvf apache-activemq-5.12.0-bin.tar.gz网站
3)为apache-activemq-5.12.0目录赋权
chmod 777 apache-activemq-5.12.0
4)进入apache-activemq-5.12.0\bin目录,赋与执行权限
chmod 755 activemq
3.启动
./activemq start
启动成功出现以下信息
4.打开浏览器输入服务器地址
http://192.168.56.102:8161
便可访问管理界面
点击Manage ActiveMQ broker登陆,登陆名和密码均为admin
5.点对点消息列表Queues
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 能够理解为是消费这消费掉的数量。
1.1消息生产者
1)建立Maven工程,引入依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2)建立类QueueProducer main方法代码以下:
//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立队列对象 Queue queue = session.createQueue("test-queue"); //6.建立消息生产者 MessageProducer producer = session.createProducer(queue); //7.建立消息 TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
上述代码中第4步建立session 的两个参数:
第2个参数 消息的确认模式
运行后经过ActiveMQ管理界面查询
1.2 消息消费者
建立类QueueConsumer ,main方法代码以下:
//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立队列对象 Queue queue = session.createQueue("test-queue"); //6.建立消息消费 MessageConsumer consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
执行后看到控制台输出:欢迎来到神奇的ActiveMq世界
1.3 运行测试
同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现只有一个消费者会接收到消息。
2.1消息生产者
建立类TopicProducer ,main方法代码以下:
//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主题对象 Topic topic = session.createTopic("test-topic"); //6.建立消息生产者 MessageProducer producer = session.createProducer(topic); //7.建立消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMq世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
2.2消息消费者
建立类TopicConsumer ,main方法代码以下:
//1.建立链接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.获取链接 Connection connection = connectionFactory.createConnection(); //3.启动链接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
2.3 运行测试
同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现每一个消费者会接收到消息。