ActiveMQ官网下载地址:http://activemq.apache.org/download.htmlhtml
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,楼主这里选择了Linux 版本下进行开发。linux
下载完安装包,解压以后的目录:web
从它的目录来讲,仍是很简单的: apache
进入到ActiveMQ 安装目录的Bin 目录,linux 下输入 ./activemq start 启动activeMQ 服务。后端
输入命令以后,会提示咱们建立了一个进程IP 号,这时候说明服务已经成功启动了。浏览器
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/tomcat
咱们在浏览器打开连接以后输入帐号密码(这里和tomcat 服务器相似)安全
默认帐号:admin服务器
密码:adminsession
到这里为止,ActiveMQ 服务端就启动完毕了。
ActiveMQ 在linux 下的终止命令是 ./activemq stop
项目目录结构:
上述在官网下载ActiveMq 的时候,咱们能够在目录下看到一个jar包:
这个jar 包就是咱们须要在项目中进行开发中使用到的相关依赖。
public class Producter { //ActiveMq 的默认用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的默认登陆密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的连接地址 private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new AtomicInteger(0); //连接工厂 ConnectionFactory connectionFactory; //连接对象 Connection connection; //事务管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){ try { //建立一个连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //从工厂中建立一个连接 connection = connectionFactory.createConnection(); //开启连接 connection.start(); //建立一个事务(这里经过参数能够设置事务的级别) session = connection.createSession(true,Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname){ try { //建立一个消息队列 Queue queue = session.createQueue(disname); //消息生产者 MessageProducer messageProducer = null; if(threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while(true){ Thread.sleep(1000); int num = count.getAndIncrement(); //建立一条消息 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:我是大帅哥,我如今正在生产东西!,count:"+num); System.out.println(Thread.currentThread().getName()+ "productor:我是大帅哥,我如今正在生产东西!,count:"+num); //发送消息 messageProducer.send(msg); //提交事务 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class Comsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>(); AtomicInteger count = new AtomicInteger(); public void init(){ try { connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void getMessage(String disname){ try { Queue queue = session.createQueue(disname); MessageConsumer consumer = null; if(threadLocal.get()!=null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while(true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { msg.acknowledge(); System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class TestMq { public static void main(String[] args){ Producter producter = new Producter(); producter.init(); TestMq testMq = new TestMq(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producter)).start(); } private class ProductorMq implements Runnable{ Producter producter; public ProductorMq(Producter producter){ this.producter = producter; } @Override public void run() { while(true){ try { producter.sendMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
运行结果:
INFO | Successfully connected to tcp://localhost:61616 Thread-6productor:我是大帅哥,我如今正在生产东西!,count:0 Thread-4productor:我是大帅哥,我如今正在生产东西!,count:1 Thread-2productor:我是大帅哥,我如今正在生产东西!,count:3 Thread-5productor:我是大帅哥,我如今正在生产东西!,count:2 Thread-3productor:我是大帅哥,我如今正在生产东西!,count:4 Thread-6productor:我是大帅哥,我如今正在生产东西!,count:5 Thread-3productor:我是大帅哥,我如今正在生产东西!,count:6 Thread-5productor:我是大帅哥,我如今正在生产东西!,count:7 Thread-2productor:我是大帅哥,我如今正在生产东西!,count:8 Thread-4productor:我是大帅哥,我如今正在生产东西!,count:9 Thread-6productor:我是大帅哥,我如今正在生产东西!,count:10 Thread-3productor:我是大帅哥,我如今正在生产东西!,count:11 Thread-5productor:我是大帅哥,我如今正在生产东西!,count:12 Thread-2productor:我是大帅哥,我如今正在生产东西!,count:13 Thread-4productor:我是大帅哥,我如今正在生产东西!,count:14 Thread-6productor:我是大帅哥,我如今正在生产东西!,count:15 Thread-3productor:我是大帅哥,我如今正在生产东西!,count:16 Thread-5productor:我是大帅哥,我如今正在生产东西!,count:17 Thread-2productor:我是大帅哥,我如今正在生产东西!,count:18 Thread-4productor:我是大帅哥,我如今正在生产东西!,count:19
public class TestConsumer { public static void main(String[] args){ Comsumer comsumer = new Comsumer(); comsumer.init(); TestConsumer testConsumer = new TestConsumer(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); } private class ConsumerMq implements Runnable{ Comsumer comsumer; public ConsumerMq(Comsumer comsumer){ this.comsumer = comsumer; } @Override public void run() { while(true){ try { comsumer.getMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
INFO | Successfully connected to tcp:
//localhost:61616
Thread-2: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我如今正在生产东西!,count:4--->0
Thread-3: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我如今正在生产东西!,count:36--->1
Thread-4: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我如今正在生产东西!,count:38--->2
Thread-5: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我如今正在生产东西!,count:37--->3
Thread-2: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我如今正在生产东西!,count:2--->4
Thread-3: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我如今正在生产东西!,count:40--->5
Thread-4: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我如今正在生产东西!,count:42--->6
Thread-5: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我如今正在生产东西!,count:41--->7
Thread-2: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我如今正在生产东西!,count:1--->8
Thread-3: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我如今正在生产东西!,count:44--->9
Thread-4: Consumer:我是消费者,我正在消费MsgThread-4productor:我是大帅哥,我如今正在生产东西!,count:46--->10
Thread-5: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我如今正在生产东西!,count:45--->11
Thread-2: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我如今正在生产东西!,count:3--->12
Thread-3: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我如今正在生产东西!,count:48--->13
Thread-4: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我如今正在生产东西!,count:50--->14
Thread-5: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我如今正在生产东西!,count:49--->15
Thread-4: Consumer:我是消费者,我正在消费MsgThread-2productor:我是大帅哥,我如今正在生产东西!,count:54--->16
Thread-2: Consumer:我是消费者,我正在消费MsgThread-5productor:我是大帅哥,我如今正在生产东西!,count:6--->17
Thread-3: Consumer:我是消费者,我正在消费MsgThread-6productor:我是大帅哥,我如今正在生产东西!,count:52--->18
Thread-5: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我如今正在生产东西!,count:53--->19
Thread-4: Consumer:我是消费者,我正在消费MsgThread-3productor:我是大帅哥,我如今正在生产东西!,count:58--->20
|
查看运行结果,咱们能够作ActiveMQ 服务端:http://127.0.0.1:8161/admin/ 里面的Queues 中查看咱们生产的消息。