平时工作中一直使用到activeMQ,但是一直没有机会进行总结,现在由于公司业务需求,需要将消息中间键由actveMQ变成RocketMQ,现在对消息中间键进行简单的整理.
MOM(Message-oriented middleware)就是面向消息中间件,是用于以分布式应用或系统中的异步,松耦合,可靠,可扩展和安全通信的一类软件.MOM总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合.
目前在生产环境常用的消息队列有:
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。常用对比如下:
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范(JMS规范)的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
点对点(P2P) : 即每产生一个消息并发送,只有一个消息消费者和其一一对应.
点对点模式主要建立在一个队列上边,当连接到一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息首先进入队列中,如果有接收端在监听,则会发送给接收端,否则保存在activeMQ服务器,知道发现接收端接收消息.点对点可以有多个发送端和多个接收端,但是一个消息只能被一个接收端消费.
发布订阅(Pub/Sub): 即每产生一个消息并发送,可以有多个消费者对消息进行接收和消费.
发布/订阅模式,同样可以有多个发送端和多个接收端,但是接收端如果没有监听消息,那么activeMQ默认不会保存信息,将会认为消息已经发送出去了. 发送端发送的消息如果接收端不在线,收不到消息,即使以后连接上线也不会接收到已经发送的消息, 发布订阅模式中,activeMQ不对保存消息,如果需要保存消息要进一步进行设置.
第一步: 创建ConnectionFactory,需要指定服务端ip以及端口号.
第二步:使用ConncetionFactory对象创建一个Connection对象.
第三步:开启连接,调用activeMQ的start方法.
第四步:使用Connection对象创建一个session对象.
第五步:使用session创建一个Destination对象(topic,queue),此外创建一个Queue对象.
第六步: 使用Session 创建一个Producer对象.
第七步:创建一个Message对象,创建一个TextMessage对象.
第八步: 使用Productor对象发送消息.
第九步: 关闭资源.
public void testQueueProducer() throws Exception { // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 //brokerURL服务器的ip及端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接,调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:队列的名称。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(queue); // 第七步:创建一个Message对象,创建一个TextMessage对象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,我要发送第一个信息了......"); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); }
第一步:创建一个ConncetionFactory对象.
第二步:Connection 对象创建一个Connection对象.
第三步:Connection对象调用start方法开启连接.
第四步:使用 Connection对象创建一个Session对象.
第五步: 使用Connection对象创建一个Destination对象,和发送端保持一致,其中队列Queue的名字也要保持一致.
第六步:使用session创建一个Consumer对象.
第七步:接收消息.
第八步:打印消息.
第九步:关闭资源.
public void testQueueConsumer() throws Exception { // 第一步:创建一个ConnectionFactory对象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); // 第二步:从ConnectionFactory对象中获得一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启连接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象创建一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Consumer对象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待键盘输入 //这里因为是DEMO的原因,所以需要在这里等待键盘输入,也就是进行阻塞,但是在spring框架中使用activeMQ,因为有框架持有. System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); }
ActiveMQ:
非常成熟,功能强大,在业内大量的公司以及项目中都有应用
偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用.
RabbitMQ:
erlang语言开发,性能极其好,延时很低;
吞吐量到万级,MQ功能比较完备
而且开源提供的管理界面非常棒,用起来很好用
社区相对比较活跃,几乎每个月都发布几个版本分
在国内一些互联网公司近几年用rabbitmq也比较多一些
但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
优点:
缺点:
优点:
缺点:
优点: