首先介绍 JMShtml
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。咱们能够简单的理解:两个应用程序之间须要进行通讯,咱们使用一个JMS服务,进行中间的转发,经过JMS 的使用,咱们能够解除两个程序之间的耦合。JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它相似于JDBC(Java Database Connectivity)。java
概念
JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
JMS生产者(Message Producer)
JMS消费者(Message Consumer)
JMS消息
JMS队列
JMS主题spring
JMS消息一般有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)apache
接着介绍 activeMQ浏览器
ActiveMQ 是Apache出品,最流行的. 功能强大的即时通信和集成模式的开源服务器springboot
特色:
1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各类跨语言客户端和协议
2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
3) 彻底支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
4) Spring支持,ActiveMQ能够轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
5) 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
6) 使用JDBC和高性能日志支持很是快速的持久化
...
服务器
下载地址:http://activemq.apache.org/activemq-5153-release.htmlapp
下载解压,进入 bin 文件夹 ,若是咱们是32位的机器,就双击win32目录下的activemq.bat,若是是64位机器,则双击win64目录下的activemq.bat异步
若是没有异常,表明启动成功,常见启动失败缘由是端口占用的问题tcp
咱们能够关闭占用的端口,或者修改 activeMQ 的端口。打开 conf/activemq.xml,修改被占用的端口。
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
浏览器打开 http://localhost:8161 进入 控制台,输入 默认的用户名和密码 admin/admin
打开 queue 页面
Name:队列名称。
Number Of Pending Messages:等待消费的消息个数。
Number Of Consumers:当前链接的消费者数目
Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减。
Messages Dequeued:已经消费的消息数量
建立 队列,输入队列名称,点击 create 按钮
-----------------------------------------分割线-------------------------------------------------------------------------
如今咱们整合 springboot 和 activeMQ
第一步、引入 pom 依赖
<!-- 整合消息队列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 若是配置线程池则加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
第二步、配置文件 application.properties 修改
#整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://127.0.0.1:61616 #集群配置 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.user=admin spring.activemq.password=admin #下列配置要增长依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100
第三步、启动类
加入注解
@EnableJms
加入 bean
@Bean ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setPriority(999); return jmsTemplate; } @Bean(value="jmsMessagingTemplate") JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) { JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(jmsTemplate); return messagingTemplate; }
第四步、消息发布者
public interface ProducerService { /** * 功能描述:指定消息队列,还有消息 * @param destination * @param message */ public void sendMessage(Destination destination, final String message); }
@Service public class ProducerServiceImpl implements ProducerService{ @Autowired private JmsMessagingTemplate jmsTemplate; //用来发送消息到broker的对象 //发送消息,destination是发送到的队列,message是待发送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } }
第五步、消息消费者
@Component public class OrderConsumer { // 接收 order.queue 队列的消息 @JmsListener(destination="order.queue") public void receiveQueue(String text){ System.out.println("OrderConsumer收到的报文为:"+text); } }
第六步、测试类进行测试
@RestController @RequestMapping("/queue") public class QueueController { @Autowired private ProducerService producerService; /** * 点对点消息发送 ,指定 队列 * @param msg * @return */ @RequestMapping("/order") public Object order(String msg){ Destination destination = new ActiveMQQueue("order.queue"); producerService.sendMessage(destination, msg); return "ok"; } }
打开 activeMQ 控制台,能够看到 队列中的消息数量 发生了 变化
以上是 activeMQ 的点对点消息队列。点对点并非只A发送的消息只能指定B接收,而是只A发送的任意一条消息只能由一我的接收处理,也就是每条消息只能被消费一次。
JMS 的另外一种模式 发布/订阅模式。A发送的消息能够被全部监听A的对象的接收,就比如学校的广播,全部的学生均可以收听校园广播信息。
正常状况下,activeMQ 只支持一种消息模式,这里作出配置修改,让其可以同时支持 两种 模式
启动类中 加入 bean
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
@Bean public Topic topic(){ return new ActiveMQTopic("topic.queue"); }
Topic 发布/订阅 使用 topic.queue 队列
新建订阅者,@JmsListener若是不指定独立的containerFactory的话是只能消费queue消息
@Component public class TopicSub { @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("topic.queue 消费者:receive1="+text); } @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("topic.queue 消费者:receive2="+text); } @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive3(String text){ System.out.println("topic.queue 消费者:receive3="+text); } }
修改 信息发送者 ProducerServiceImpl.java,增长代码
@Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); }
修改测试类 QueueController.java ,增长代码
/** * 发布、订阅消息 * @param msg * @return */ @RequestMapping("/publish") public Object publish(String msg){ producerService.publish(msg); return "ok"; }
启动项目,访问 /queue/order ,打印出一条数据
OrderConsumer收到的报文为:订单信息
访问 /queue/publish ,打印出三条数据
topic.queue 消费者:receive3=发布、订阅 topic.queue 消费者:receive1=发布、订阅 topic.queue 消费者:receive2=发布、订阅