#情景引入
小白:起床起床起床起床。。。。快起床~
我:怎么了又,大惊小怪,吓到我了。
小白:我有事有事想找你,十万火急呢~~
我:你能有什么事?反正我不信。。那你说说看~~
小白:就是我有两个小表弟,叫大白和二白,他们如今天天睡觉以前都要分别和我聊天,让我给他们讲故事,若是不讲他们就不睡觉。可是,若是一个个的跟他们轮流来讲的话,我就须要天天说两遍,并且我还要找准他们的时间点,这个有时候我有事情都没法实现这个问题,他们就会很生气。。。
我:这不是挺好的嘛,小孩子就是爱听故事的呀。。。
小白:我也愿意讲,可是时间这个不是很好控制,有没有相似,好比我能够以前就描述好了,而后定点给他们两个一块儿发消息,而能够抛开时间和其余因素的影响呢?
我:这个嘛,很简单呀,你可让他们关注你的一个公众号,这样你再定时的推送给他们故事不就能够了嘛。。或者,你能够拉他们进你的一个群这样,就方便了呀~
小白:这样是能够,可是若是之后还有小表妹要听我讲,我就要如此反复的作。。感谢好麻烦好麻烦。。。
我:emmm,我理解你的意思,你就想实现一种不少人都可以进行相似一种消息推送的方式嘛。。。
小白:对的对的。。就是这样一种,,,我记得咱们在技术方面好像也有一种相似的技术,这个叫作什么去了呢?
我:这就是消息中间件,一种生产者和消费者的关系。
小白:我也想学我也想学,,你快给我讲讲,给我讲讲呗。。
我:真拿你没办法,好吧。。。下面我就给你讲一下这方面的知识。
#情景分析
其实,小白的这个问题,是一种比较广泛的问题。既然咱们做为技术人员,固然咱们就要从技术成分去分析如何解决了。这里面其实就是包含着一种消息中间件的技术。它也是最近技术层面用得很是很是多的,这也是很是值得咱们进行学习。。这在现在的秒杀系统,推荐系统等等,都有普遍的应用。。因此,这章我就主要来跟你们说说这方面的知识。
#基本概念的引导
本模块主要讲解关于消息中间件的相关基础知识,也是方便咱们后面的学习。
###什么是中间件?
非操做系统软件,非业务应用软件,不是直接给最终用户使用,不能直接给用户带来价值的软件,咱们就能够称为中间件(好比Dubbo,Tomcat,Jetty,Jboss都是属于的)。
###什么是消息中间件?
百度百科解释:消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。
关键点:关注于数据的发送和接受,利用高效可靠的异步消息机制传递机制集成分布式系统。
先简单的用下面这个图说明:
###为何要使用消息中间件
举几个例子,我想你就会明白了。(其实使用消息中间件主要就是为了解耦合和异步两个做用)
1:微博,都用过吧。那么,当咱们新关注一个用户,那么系统会相应的推送消息给咱们,而且还作了不少关于咱们关注的处理。这就是消息中间件的异步。
2:秒杀系统。100件商品,几十万我的在抢,那这个怎么弄呢?总不能就把服务器给宕机了吧。那么就能够把用户的请求进行缓存,而后再异步处理。
3:系统A给系统B进行通讯,而系统B须要对A的消息进行相应处理以后才能给A反馈,这时候,总不能让A就傻傻等着吧。那么,这就是异步的功能。
###什么是JMS?
Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
总结起来讲就是:Java对于应用程序之间进行信息交互的API(并且是异步)。
里面有下面的概念须要理解,对后续有帮助:html
提供者:实现JMS的消息服务中间件服务器。java
客户端:发送或接受消息的应用。web
生产者/发布者:建立并发送消息的客户端。spring
消费者/订阅者:接受并处理消息的客户端。sql
消息:应用程序之间传递的数据。apache
消息模式:在客户端之间传递消息的模式,JMS主要是队列模式和主体模式。windows
队列模式特色:
(1)客户端包括生产者和消费者。
(2)队列中的一个消息只能被一个消费者使用。
(3)消费者能够随时取消息。浏览器
主体模式特色:
(1)客户端包括发布者和订阅者。
(2)主题中的消息能够被全部订阅者消费。
(3)消费者不能消费订阅以前发送的消息。
###什么是AMQP?
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。
简单点说:就是对于消息中间件所接受的消息传输层的协议(不懂传输层,那么就须要多看看计算机网络相关知识了,OSI的层次划分),只有这样才能保证客户端和消息中间件可以进行交互(换位思考:HTTP和HTTPS甚至说是TCP/IP与UDP协议都要的道理)。
emmm,比较一下JMS和AMQP的不一样吧。。缓存
JMS是定义与Java,而AMQP是一种传输层协议。tomcat
JMS是属于Java的API,而AMQP是跨语言的。
JMS消息类型只有两种(主题和队列,后续会说),而AMQP是有五种。
JMS主要就是针对Java的开发的Client,而AMQP是面向消息,队列,路由。
###什么是ActiveMQ呢?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
简单点说:不就是为了实现我上述所想要的需求嘛。而后它就是一种实现的方式。就好比,Tomcat是什么?不就是为了实现一种client与服务器之间的交互的一种产品嘛。。因此,不须要死记概念,本身理解就好。
#ActiveMQ的安装
##环境:Windows
步骤:
(1)登陆到ActiveMQ的官网,下载安装包。http://activemq.apache.org/activemq-5154-release.html
(2)下载Zip文件
(3)解压Zip文件,目录以下
(4)启动ActiveMQ服务(注意:要右键以管理员身份进行运行)
注意:有两种方式,第一种就是相似tomcat启动,那么启动图会一直显示。
而第二种的话,就是把这个ActiveMQ注册到服务列表中,这样更方便咱们进行操做。(推荐使用这种)
(5)登陆,验证是否启动成功
(6)进入管理页面
OK,进入以后就能够看咱们的管理页面啦。。。是否是很简单呢?
##环境:Linux
步骤:(多余的我就很少说了。。。请看windows的步骤)
(1)一样须要下载对应的文件。后缀为tar.gz的这样的。其实能够直接经过下面的这个命令下载,快速一点,省得要移动到Linux(注意:若是是经过ssh链接的方式的话)。
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
(2)而后解压下载的文件
(3)一样进入相对应的目录,运行
./activemq start
(4)而后再访问相同的地址就能够看到啦。(具体看windows安装步骤)
#ActiveMQ的使用(基于Maven)
首先要再回头看看JMS中的一些关键接口。
<!--添加activemq的依赖--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
###情形一:队列模型的消息
3. 编写生产者代码(使用队列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的建立类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //定义ActivMQ的链接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createQueue(QUEUE_NAME); //建立一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("我发送message:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我如今发的消息是:" + message.getText()); } //关闭链接 connection.close(); } }
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //定义ActivMQ的链接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createQueue(QUEUE_NAME); //建立消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
查看是否进行了消费
**备注:**我上面进行的是队列模式的消息,并且进行的都是单个消费者,那若是我换成同时有两个消费者消费生产者的消息会怎么样呢?(咱们只须要运行两个消费者就能够啦。固然,要保证生产者是产生了消息的哦~~~~不然,拿什么消费呢~)
一个生产者,两个消费者的状况以下:
切记:先运行两个消费者,而后再运行生产者代码:
结果以下:
其实,这就是解释了,我以前说的,队列模式的消息,是只会被一个消费者所使用的,而不会被共享,这也就是和主题模型的差异哦~~~哈哈
###情形二:主题模型的消息
前面的步骤都同样,只是生产者和消费者的代码有点区别:
编写生产者(这个和队列模型其实很像,稍微修改就能够)
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:48 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicProducer { //定义ActivMQ的链接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的主题名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createTopic(TOPIC_NAME); //建立一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模拟100个消息 for (int i = 1; i <= 100; i++) { TextMessage message = session.createTextMessage("当前message是(主题模型):" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我如今发的消息是:" + message.getText()); } //关闭链接 connection.close(); } }
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:50 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicConsumer { //定义ActivMQ的链接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定义发送消息的队列名称 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createTopic(TOPIC_NAME); //建立消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hnu.scw</groupId> <artifactId>activemq</artifactId> <version>1.0-SNAPSHOT</version> <name>activemq</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--添加activemq的依赖--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!--spring整合activemq所须要的依赖--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.0.0</version> </plugin> <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.20.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </pluginManagement> </build> </project>
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd "> <context:annotation-config /> <!--Activemq的链接工厂--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!--spring jms为咱们提供的链接池 获取一个链接工厂--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 消息目的地 点对点的模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="SpringActiveMQMsg"/> </bean> <!-- jms模板 用于进行消息发送--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
package com.hnu.scw.spring; /** * @ Author :scw * @ Date :Created in 下午 12:19 2018/7/14 0014 * @ Description:生产者的接口 * @ Modified By: * @Version: $version$ */ public interface ProduceService { void sendMessage(String msg); }
package com.hnu.scw.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 下午 2:21 2018/7/15 0015 * @ Description:生产者的实现类 * @ Modified By: * @Version: $version$ */ public class ProduceServiceImpl implements ProduceService { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "queueDestination") private Destination destination; /** * 发送消息 * @param msg */ @Override public void sendMessage(final String msg) { jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } }); System.out.println("如今发送的消息为: " + msg); } }
<!--注入咱们的生产者--> <bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @ Author :scw * @ Date :Created in 下午 2:27 2018/7/15 0015 * @ Description:生产者的测试 * @ Modified By: * @Version: $version$ */ public class ProducerTest { public static void main(String[] args){ ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml"); ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class); //进行发送消息 for (int i = 0; i < 100 ; i++) { bean.sendMessage("test" + i); } //当消息发送完后,关闭容器 classPathXmlApplicationContext.close(); } }
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd "> <context:annotation-config /> <!--Activemq的链接工厂--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!--spring jms为咱们提供的链接池 获取一个链接工厂--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 消息目的地 点对点的模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="SpringActiveMQMsg"/> </bean> <!-- 配置消息监听器--> <bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/> <!--配置消息容器--> <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!--配置链接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!--配置监听的队列--> <property name="destination" ref="queueDestination"/> <!--配置消息监听器--> <property name="messageListener" ref="consumerMessageListener"/> </bean> </beans>
package com.hnu.scw.spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @ Author :scw * @ Date :Created in 下午 3:06 2018/7/15 0015 * @ Description:消息的监听者,用于处理消息 * @ Modified By: * @Version: $version$ */ public class ComsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @ Author :scw * @ Date :Created in 下午 3:13 2018/7/15 0015 * @ Description:消费者的测试 * @ Modified By: * @Version: $version$ */ public class ConsumerTest { public static void main(String[] args){ //启动消费者 ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml"); } }
<!-- 消息目的地 (主题模式)--> <!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!–配置队列模型的消息名称–> <constructor-arg value="SpringActiveMQMsgTopic"/> </bean>-->
将上面的代码替换以前的就能够了。。。
总结:总的来讲,基于Spring来使用消息队列仍是很是方便的,这比咱们正常进行JMS规范操做要简单不少,毕竟不少对象都是经过Spring的IOC进行容器管理了,因此,值得推荐使用哦~~~
#ActiveMQ的集群
###为何要进行集群呢?
缘由一:实现高可用:以排除单点故障所引发的服务终端。
缘由二:实现负载均衡:以提高效率为更多的客户进行服务。
###集群的方式有哪些?
方式一:客户端集群:多个客户端消费同一个队列。
方式二:Broker clusters:多个Broker之间同步消息。(实现负载均衡)
这个的实现原理主要是经过网络链接器来进行。
网络链接器:用于配置ActiveMQ服务器与服务器之间的网络通信方式,用于服务器透析消息。主要分为静态链接和动态链接。
方式三:Master Slave :实现高可用。
这种方式的话,能够联想到Mysql的主从配置和Zookeeper的负载均衡的主竞争关系master。
咱们在实际的开发中,通常都是将方式二和方式三进行集成,从而实现高可用和负载均衡。下面的话,我也就这样的配置思想来进行讲解:(经过三台服务器来模拟消息集群的实现)
其中的NodeB和NodeC就是一张Master/slave的关系。均可以成为主服务器。(只要它们某一个宕机,那么就会其他的一台就进行继续服务)
###搭建步骤(基于Windows环境,而Linux环境也是同样的操做)
三台服务器的大致功能和描述:
因为本身没有三台服务器,因此就用本身的一台电脑来模拟三台消息服务器,其实这个就是假设有三个不一样ActiveMQ消息服务器了。
<networkConnectors> <networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" /> </networkConnectors>
<!--修改服务端口--> <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <networkConnectors> <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> <!--并修改下面这个标签的内容 , 做为B和C的共享文件,目录就是本身以前建立的一个文件(能够回看上面的整个结构)--> <persistenceAdapter> <kahaDB directory="D:\Download\MQJiQun\shareDB"/> </persistenceAdapter>
(2)修改jetty.xml内容,修改服务器的服务端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
<!--修改服务端口--> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <networkConnectors> <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> <!--并修改下面这个标签的内容 , 做为B和C的共享文件,目录就是本身以前建立的一个文件(能够回看上面的整个结构)--> <persistenceAdapter> <kahaDB directory="D:\Download\MQJiQun\shareDB"/> </persistenceAdapter>
(2)修改jetty.xml中的内容
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
步骤:
(1)建立Maven项目
(2)导入依赖
<!--添加activemq的依赖--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
(3)编写生产者代码
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用于消息的建立类 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //经过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其余的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createQueue(QUEUE_NAME); //建立一个生产者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模拟100个消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("当前message是:" + i); //发送消息 producer.send(message); //在本地打印消息 System.out.println("我如今发的消息是:" + message.getText()); } //关闭链接 connection.close(); } }
(4)编写消费者代码
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消费者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //经过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机, // 那么就进行其余的服务器选择,randomize表示随机选择) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定义发送消息的队列名称 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立链接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立链接 Connection connection = activeMQConnectionFactory.createConnection(); //打开链接 connection.start(); //建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列目标 Destination destination = session.createQueue(QUEUE_NAME); //建立消费者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
(5)进行查看各自的服务器的消息队列的状况。
#其余的消息中间件
其实,相似ActiveMQ这样的消息中间件,用得比较多的还有就是RabbitMQ和Kafka。它们三者各自有各自的优点。你们能够百度进行了解,我就不进行多说了。后面我会一样把这两种消息中间件的使用进行详细的讲解,欢迎你们的关注哦~总的来讲,只有适合的场景对应的消息中间件才能发挥最大的做用,没有一种是只有好处而没有坏处的~
#总结