1.下载安装包:curl -O https://archive.apache.org/dist/activemq/5.14.0/apache-activemq-5.14.0-bin.tar.gzjava
2.解压:tar -zxvf apache-activemq-5.14.0-bin.tar.gzapache
3.重命名:mv apache-activemq-5.14.0-bin.tar.gz activemqcentos
4.进入bin目录:cd .../activemq/bin ,运行权限命令:chmod 755 activemq ,启动activemq: ./activemq服务器
5.查询端口:61616 和8161 命令:netstat -lntp (centos 7若找不到命令,运行命令安装:sudo yum install net-tools ,sudo yum provides ifconfig ,都装一下)session
java模拟场景,代码以下:curl
导入包:activemq-all-5.8.0.jartcp
Bean:ide
public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一个参数是是不是事务型消息,设置为true,第二个参数无效 //第二个参数是 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。异常也会确认消息,应该是在执行以前确认的 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。能够在失败的 //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的链接不断开,其余的消费者也不会接受(正常状况下队列模式不存在其余消费者) //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。在须要考虑资源使用时,这种模式很是有效。 //待测试 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处到底是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (JMSException e) { e.printStackTrace(); } }
注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。由于没法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:测试
public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的链接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂获得链接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接 //这个最好仍是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createTopic("test-topic"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处到底是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ Thread.sleep(1000); bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的链接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂获得链接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接 //这个最好仍是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createTopic("test-topic"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }
以上的消息发送后,若是没有接收到,能够登陆本身的MQ管理页面: http://192.168.3.159:8161/admin/ ,默认账号密码都是admin,查看队列中的消息this
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。能够理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 能够理解为是消费这消费掉的数量