官网地址
下载下来直接解压,进入bin目录,执行命令./activemq start便可启动,如下是相关目录结构说明html
默认的服务端口为61616,不过该端口能够在conf目录下的activemq.xml中进行修改,找到transportConnectors标签,修改openwire中的端口便可。
监控平台默认的端口为8161,若是要修改该端口,能够修改conf/jetty.xml文件中的jetty启动端口。默认的用户名和密码为admin/admin,user/user,若是要修改用户名和密码,则在conf/jetty-realm.properties中进行修改便可,格式为[用户名:密码,角色名],关于web管理界面部分列说明以下:java
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
复制代码
# MQ地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)
# 在考虑结束以前等待的时间
#spring.activemq.close-timeout=15s
# 是否启用内存模式
spring.activemq.in-memory=true
# 是否在回滚回滚消息以前中止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
#默认状况下activemq提供的是queue模式,若要使用topic模式须要配置下面配置,消费端使用
spring.jms.pub-sub-domain=true
#帐号
spring.activemq.user=admin
# 密码
spring.activemq.password=admin
复制代码
@Service("producer")
public class Producer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void sendMessage(Destination destination, final Object msg){
jmsTemplate.convertAndSend(destination, msg);
}
}
@Configuration
public class DestinationConfig {
@Bean(name = "testTopic")
public Topic getTestTopic(){
return new ActiveMQTopic("topic.test");
}
@Bean(name = "testQueue")
public Queue getTestQueue(){
return new ActiveMQQueue("queue.test");
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private Producer producer;
@Resource(name = "testTopic")
private Topic testTopic;
@Resource(name = "testQueue")
private Queue testQueue;
@Test
public void sendTopicMsg(){
producer.sendMessage(testTopic, "This is test Topic");
}
@Test
public void sendQueueMsg(){
producer.sendMessage(testQueue, "This is test Queue");
}
}
复制代码
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/** * containerFactory配置请参考第5点说明 * 该配置用来指明该destination是队列仍是主题消息 */
@Component
public class Consumer {
@JmsListener(destination = "topic.test", containerFactory = "jmsListenerContainerTopic")
public void recevieTopicMsg(String msg){
System.out.println("接收的主题消息为:"+msg);
}
@JmsListener(destination = "queue.test", containerFactory = "jmsListenerContainerQueue")
//@SendTo("otherTopic") // 表示将方法的返回值发送到另外的队列,含有这个注解的话,则方法须要返回值
public void recevieQueueMsg(String msg){
System.out.println("接收的队列消息为:"+msg);
}
}
复制代码
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class MqConfig {
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
beanFactory.setPubSubDomain(true);
beanFactory.setConnectionFactory(connectionFactory);
return beanFactory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
beanFactory.setConnectionFactory(connectionFactory);
return beanFactory;
}
}
复制代码
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import javax.jms.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.ProducerCallback;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class.getName());
private static ExecutorService pool = Executors.newCachedThreadPool();
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "testTopic")
private Topic testTopic;
public void sendTechStatRefresh(String json) {
try {
sendMsgResultNotification(sendTopicMsg(testTopic, json), "主题消息测试", json);
} catch (Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
}
@Resource(name = "testQueue")
private Queue testQueue;
public void sendLotteryMatchScore(String msg) {
try {
sendMsgResultNotification(sendQueueMsg(testQueue, msg), "队列消息测试", msg);
} catch (Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
}
private boolean sendTopicMsg(final Topic topic, final String msg) {
if (topic == null || StringUtils.isBlank(msg)) {
throw new NullPointerException("topic is null or msg is null");
}
try {
return pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
jmsTemplate.convertAndSend(topic, msg);
return Boolean.TRUE;
}
}).get();
} catch (Throwable throwable) {
try {
LOGGER.error("send msg to topic error:topic = " + topic.getTopicName() + " msg = " + msg.toString(), throwable);
} catch (Exception e) {
LOGGER.error("发送主题消息异常", e);
}
}
return Boolean.FALSE;
}
private boolean sendQueueMsg(final Queue queue,final String msg) {
if (queue == null || StringUtils.isBlank(msg)) {
throw new NullPointerException("topic is null or msg is null");
}
try {
return pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
jmsTemplate.convertAndSend(queue, msg);
return Boolean.TRUE;
}
}).get();
} catch (Throwable throwable) {
try {
LOGGER.error("send msg to queue error:queue = " + queue.getQueueName() + " msg = " + msg.toString(),
throwable);
} catch (Exception e) {
LOGGER.error("发送队列消息异常", e);
}
}
return Boolean.FALSE;
}
private void sendMsgResultNotification(final boolean rs, final String msgType, final String data) {
if (rs) {
LOGGER.debug("推送 " + msgType + "成功 : 消息[" + data + "]");
}
}
}
复制代码
STOMP是一个简单的可互操做的协议, 被用于经过中间服务器在客户端之间进行异步消息传递。它定义了一种在客户端与服务端进行消息传递的文本格式。 STOMP是基于帧的协议,它假定底层为一个2-way的可靠流的网络协议(如TCP)。客户端和服务器通讯使用STOMP帧流通信。web
参考文档:blog.csdn.net/u012758088/…spring