1、消息队列概述java
消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。web
消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不论是谁发布的。这样发布者和使用者都不用知道对方的存在。spring
消息队列中间件是分布式系统中重要的组件,主要解决异步消息,流量削峰,应用耦合等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。目前使用较多的消息队列产品有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。数据库
生活中的例子apache
老式餐厅点餐后需呆在座位上等餐,中途不能离开去干别的事,若是离开去干别的事,餐好了,点餐人殊不知道。浏览器
新式餐厅点餐后,餐厅会提供一个“电子盘”给顾客,顾客能够不用在店里等餐,能够去附近逛逛,买买东西,等餐好了,手上的“电子盘”就会响,通知顾客能够回去就餐了。springboot
对比以上两种形式,第二种情形就像消息队列同样,点完餐之后就能够去处理别的事情,不用一直在餐厅等着。网络
2、消息队列的做用session
上面说了消息队列主要解决了异步处理,流量削峰,应用耦合等三个方面的问题。架构
异步处理
场景说明:用户注册后,系统要发送注册邮件和注册短信。传统的方式有两种,串行模式和并行方式 。
串行模式:将注册信息存入数据库成功后,先发送注册邮件再发送注册短信,以上三个步骤都完成后,将成功的信息返回给客户端。
并行模式:将注册信息存入数据库成功后,发送邮件的同时发送注册短信,以上三个任务都完成后返回给客户端,与串行模式的差异是并行模式能够提升处理的时间。
假设每一个业务结点的处理时间为50ms,不考虑网络开销,则串行模式的时间为150ms,并行模式的时间为100ms。
若是引入消息队列,可以大大缩短响应时间,以下:
用户注册信息写入数据库后,再将发送邮件和短信写入消息队列,而后直接返回注册结果,总共耗时55m,是并行的一半左右,是串行的三分之一左右,大大提升了系统的处理能力。
应用解耦
场景说明:用户下单后,订单系统须要通知库存系统。传统的作法是,订单系统调用库存系统的接口,如图所示:
传统模式的缺点:
引入消息队列的方案以下:
流量削峰
场景说明:业务系统处理能力远远大于支付渠道处理能力,假如不控制流量把所有请求往支付渠道发送,支付渠道可能会挂掉,致使整个业务不能成功。
这时引入消息队列,控制流量,让请求有序的进入支付渠道
日志处理
日志处理是指将消息队列用在日志处理中,好比 Kafka 的应用,解决大量日志传输的问题。架构简化以下:
3、Active MQ
下载
http://activemq.apache.org/components/classic/download/
安装
直接解压,而后移动到指定目录便可。
>tar zxvf apache-activemq-5.15.10-bin.tar.gz >mv ./apache-activemq-5.15.10 /usr/local
启动
>/usr/local/activemq-5.15.10/bin/activemq start # 检查启动状态 [root@cbooy bin]# jps 3168 Jps 2268 activemq.jar # activemq启动的默认端口号 61616 [root@cbooy bin]# lsof -i:61616 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2268 root 132u IPv6 15719 0t0 TCP *:61616 (LISTEN)
其余基本命令
> activemq restart # 重启 > activemq stop # 关闭 > activemq start > /activemq_home/logs/activemq.log # 落地相关信息,打印日志
指定配置文件的启动
./bin/activemq start xbean:/usr/local/activemq-5.15.10/conf/activemq.xml
后台图形化界面支持
4、Java操做ActiveMQ
依赖 jar 包
dependencies { compile('org.apache.activemq:activemq-all:5.15.9') compile('org.apache.activemq:activemq-pool:5.15.9') }
第一种模式:Queue
生产流程
public class Producer { // activemq服务的地址,默认通讯端口为61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定义队列的名称 private static final String QUEUE_NAME = "test-Queue"; public static void main(String[] args) { MessageProducer producer = null; Session session = null; Connection connection = null; try { // 建立链接工厂对象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 从工厂中创建一个链接并开启(Connection) connection = connectionFactory.createConnection(); connection.start(); // 从链接中创建一个会话(Session) session = connection.createSession(false, 1); // 基于会话创建队列(Queue) Queue queue = session.createQueue(QUEUE_NAME); // 基于会话建立生产者(Producer) producer = session.createProducer(queue); for (int i = 0; i < 10; i++) { // 在会话的基础上建立一条消息(Message) TextMessage textMessage = session.createTextMessage("test-mq:" + i); // 生产者将消息发出 producer.send(textMessage); } } catch (Exception ex) { throw new IllegalStateException(ex); // 资源关闭 } finally { try { if (null != producer) { producer.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != session) { session.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
执行以上代码后,咱们能够在管理页面上看到以下状况:
消费流程
public class Consumer { // activemq服务地址,默认通讯端口为61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定义队列的名称 private static final String QUEUE_NAME = "test-Queue"; public static void main(String[] args) { MessageConsumer consumer = null; Session session = null; Connection connection = null; try { // 建立链接工厂对象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 从工厂中创建一个链接并开启(Connection) connection = connectionFactory.createConnection(); connection.start(); // 从链接中创建一个会话(Session) session = connection.createSession(false, 1); // 基于会话创建队列(Queue) Queue queue = session.createQueue(QUEUE_NAME); // 基于会话建立消费者(Consumer) consumer = session.createConsumer(queue); // 接收消息的第一种方式,阻塞式接收 // Message message = consumer.receive(); // System.out.println("consumer recive message = " + message); // 接收消息的第二种方式,使用监听器 consumer.setMessageListener(msg -> { TextMessage textMessage = (TextMessage) msg; try { System.out.println("textMessage = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception ex) { throw new IllegalStateException(ex); } } }
执行以上代码后,咱们能够在管理页面上看到以下状况:
咱们此次先运行两个 Consumer,因为 Consumer 种没有关闭资源,因此会一直保持和 ActiveMQ的链接。
而后再运行 Producer,咱们来看看现象:
控制台打印的信息中,Consumer1 消费的信息都是偶数的,Consumer2 消费的信息都是奇数的,一条消息只能被一个Consumer消费。
第二种模式:Topic
生产流程
public class Producer { // activemq服务地址,默认通讯端口为61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定义队列的名称 private static final String TOPIC_NAME = "test-Topic"; public static void main(String[] args) { MessageProducer producer = null; Session session = null; Connection connection = null; try { // 建立链接工厂对象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 从工厂中创建一个链接并开启(Connection) connection = connectionFactory.createConnection(); connection.start(); // 从链接中创建一个会话(Session) session = connection.createSession(false, 1); // 基于会话创建目的地(Topic) Topic topic = session.createTopic(TOPIC_NAME); // 基于会话建立生产者(Producer) producer = session.createProducer(topic); for (int i = 0; i < 10; i++) { // 在会话的基础上建立一条消息(Message) TextMessage textMessage = session.createTextMessage("test-topic:" + i); // 生产者将消息发出 producer.send(textMessage); } } catch (Exception ex) { throw new IllegalStateException(ex); // 资源关闭 } finally { try { if (null != producer) { producer.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != session) { session.close(); } } catch (JMSException e) { e.printStackTrace(); } try { if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
消费流程
public class Consumer1 { // activemq服务的地址,默认通讯端口为61616 private static final String URL = "tcp://192.168.182.128:61616"; // 定义队列的名称 private static final String TOPIC_NAME = "test-Topic"; public static void main(String[] args) { MessageConsumer consumer = null; Session session = null; Connection connection = null; try { // 建立链接工厂对象 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 从工厂中创建一个链接并开启(Connection) connection = connectionFactory.createConnection(); connection.start(); // 从链接中创建一个会话(Session) session = connection.createSession(false, 1); // 基于会话创建目的地(Topic) Topic topic = session.createTopic(TOPIC_NAME); // 基于会话建立消费者(Consumer) consumer = session.createConsumer(topic); // 接收消息的第一种方式,阻塞式接收 // Message message = consumer.receive(); // System.out.println("consumer recive message = " + message); // 接收消息的第二种方式,使用监听器 consumer.setMessageListener(msg -> { TextMessage textMessage = (TextMessage) msg; try { System.out.println("textMessage = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception ex) { throw new IllegalStateException(ex); } } }
Queue 模式和 Topic 模式,代码十分类似,一个是建立Queue,而另一个是建立Topic。
如今咱们来运行三个 Consumer,再运行 Producer,来看看现象
控制台打印的信息中,三个Consumer 都消费了全部消息,一条消息只能被多个 Consumer消费。
5、SpringBoot整合ActiveMQ
Queue模式
Producer端:
一、引入依赖
dependencies { implementation('org.springframework.boot:spring-boot-starter-web') implementation('org.springframework.boot:spring-boot-starter-aop') testImplementation('org.springframework.boot:spring-boot-starter-test') // 导入activemq启动器依赖 implementation('org.springframework.boot:spring-boot-starter-activemq') }
二、新建 application.yaml 配置文件并进行基本配置
server: port: 8888 servlet: context-path: /queue-producer spring: activemq: broker-url: tcp://192.168.182.128:61616
三、建立配置类
@EnableJms @Configuration public class ProducerConfig { @Bean public Queue createQueue(){ return new ActiveMQQueue("springboot-queue"); } }
四、建立 Producer 类
@Component public class QueueProducer { @Autowired private Queue queue; @Autowired private JmsMessagingTemplate jmsTemplate; public String sendMsg(String msg) { jmsTemplate.convertAndSend(queue, msg); return "send success"; } }
五、建立 Controller 接收消息
@Slf4j @RestController public class ProducerController { @Autowired private QueueProducer producer; @RequestMapping("/producer") public String produce(String msg) { log.info("spring boot produce msg={}", msg); return producer.sendMsg(msg); } }
六、建立启动类
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class); } }
Consumer端:
一、引入依赖
和 Producer 端同样
二、新建 application.yaml 配置文件并进行基本配置
server: port: 9999 spring: activemq: broker-url: tcp://192.168.182.128:61616
三、建立 Consumer 类
@Slf4j @Component public class QueueConsumer { @JmsListener(destination = "springboot-queue") public void recive(String msg) { log.info("spring boot queue consumer receive msg={}", msg); } }
四、建立启动类
@EnableJms @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class); } }
验证:
分别把 Producer端和 Consumer端都启动起来,而后在浏览器中发送 Get请求,Producer端接收请求并将消息发给 ActiveMQ服务端,而后 Consumer端接收到 ActiveMQ的消息。
Topic模式
topci模式的实现和queue模式基本同样,只是有一处不太同样, Producer端和 Consumer端的配置类都须要多配置一个 ContainerFactory,以下:
@Bean public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // topic类型消息必须设置为true,false则表示是queue类型 factory.setPubSubDomain(true); return factory; }
同时在 @JmsListener注解中,须要加上上面这个方法,以下:
@Slf4j @Component public class TopicConsumer { @JmsListener(destination = "springboot-topic",containerFactory = "topicListenerContainerFactory") public void recive(String msg){ log.info("spring boot topic consumer recive msg={} ",msg); } }