activemq的使用方法

activemq是Apache的一款开源消息总线,主要用来作消息的分发。java

首先须要下载MQ,进行启动。spring

而后在控制台建立队列,初始用户名密码admin/admin。数据库

而后能够写生产者、消费者进行测试了。因为activemq支持spring,所以有两种不一样的写法:apache

方法一:建立factory, connection, session, destination, producer,consumer服务器

方法二:经过配置文件进行建立(何尝试)。session

最初在其做用的理解上有一些误差,其实是,在发送端引入MQ的jar包,向指定的MQ服务器发送信息,MQ会自动将其添加到消息队列中,用控制台能够比较清晰的看到队列状况:http://localhost:8161/admin/ide

在接收端循环扫描要接收的队列,当读取到信息时进行接收处理。测试

 

须要注意的是,mq支持持久化,可将消息持久化到本地文件、数据库。this

另外一个须要注意的地方是,建立会话session时,第一个参数为true时,须要向服务器确认消息的接收。不然服务器认为没有成功接收,引用一下其余同窗的话:spa

createSession(paramA,paramB);

paramA 取值有 : true or false 表示是否支持事务

paramB 取值有:Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,SESSION_TRANSACTED

createSession(paramA,paramB);

paramA是设置事务的,paramB设置acknowledgment mode

paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。

paramA设置为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。

Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。

Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。

DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。在须要考虑资源使用时,这种模式很是有效。

附代码
接收端:

package com.receiver;


import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageReceiver implements IMessageReceiver {
    
    public ActiveMQConnectionFactory connectionFactory = null;
    public Connection connection = null;
    public Session session = null;
    public Destination destination = null;
    public MessageConsumer getConsumer() {
        return consumer;
    }


    public void setConsumer(MessageConsumer consumer) {
        this.consumer = consumer;
    }

    public MessageConsumer consumer = null;
    
    //初始化,建立factory, connection, session, destination, producer
    public MessageReceiver(){                
        try {
            InputStream inProperties=MessageReceiver.class.getResourceAsStream("../config/connection.properties");
            Properties properties = new Properties();
            properties.load(inProperties);
            //建立factory
            connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                    properties.getProperty("password"),
                    properties.getProperty("brokerURL"));
            //建立connection
            connection = connectionFactory.createConnection();
            connection.start();
            //获取操做链接
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //获取消息目的地,需在控制台配置
            destination = session.createQueue(properties.getProperty("queueName"));
            //获得消息接收者
            consumer = session.createConsumer(destination);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void ReceiveMessage(MessageConsumer consumer) {
        int i = 0;
        while(true){
            try {
                TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME);
                if(message != null){
                    System.out.println("queue1 "+message.getText()+"   "+i);
                    FileOutputStream out;
                    out = new FileOutputStream("D:/test.txt");
                    PrintStream p=new PrintStream(out);
                    p.println("queue1 "+message.getText()+"   "+i);
                    out.close();
                    
                }
                 Thread.sleep(1000);
                 i++;
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    public void CloseConnection(Connection connection) {
        if(connection != null){
            try {
                connection.close();
            } catch (JMSException e) {                
                e.printStackTrace();
            }
        }
    }
    
    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public Destination getDestination() {
        return destination;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

}
package com.receiver;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class ReceiveMain {

    /**
     * @param args
     * @throws JMSException 
     */
    public static void main(String[] args) throws JMSException {
        MessageReceiver messageReceiver = new MessageReceiver();
        messageReceiver.ReceiveMessage(messageReceiver.getConsumer());
        messageReceiver.CloseConnection(messageReceiver.getConnection());
    }

}

发送端:

package com.sender;


import java.io.InputStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageSender implements IMessageSender {
    
    public ActiveMQConnectionFactory connectionFactory = null;
    public Connection connection = null;
    public Session session = null;
    public Destination destination = null;
    public MessageProducer producer = null;
    
    //初始化,建立factory, connection, session, destination, producer
    public MessageSender(){                
        try {
            InputStream inProperties=MessageSender.class.getResourceAsStream("../config/connection.properties");
            Properties properties = new Properties();
            properties.load(inProperties);
            //建立factory
            connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                    properties.getProperty("password"),
                    properties.getProperty("brokerURL"));
            //建立connection
            connection = connectionFactory.createConnection();
            connection.start();
            //获取操做链接
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //获取消息目的地,需在控制台配置
            destination = session.createQueue(properties.getProperty("queueName"));
            //获得消息发送者
            producer = session.createProducer(destination);
            //设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public TextMessage CreateMessage(Session session, int i) {
        String strMessage = "hello world!   "+i;
        TextMessage message = null;
        try {
            message = session.createTextMessage(strMessage);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return message;
    }

    public void SendMessage(TextMessage message, MessageProducer producer) {
        try {
            producer.send(message);
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    
    public void CloseConnection(Connection connection) {
        if(connection != null){
            try {
                connection.close();
            } catch (JMSException e) {                
                e.printStackTrace();
            }
        }
    }
    
    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public Destination getDestination() {
        return destination;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

    public MessageProducer getProducer() {
        return producer;
    }

    public void setProducer(MessageProducer producer) {
        this.producer = producer;
    }
}
package com.sender;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class SendMain {

    /**
     * @param args
     * @throws JMSException 
     */
    public static void main(String[] args) throws JMSException {
        MessageSender messageSender = new MessageSender();
        for(int i = 0;i < 10;i++){
            TextMessage textMessage = messageSender.CreateMessage(messageSender.getSession(),i);
            messageSender.SendMessage(textMessage, messageSender.getProducer());
            System.out.println("send message sucess!  :  " + i);
            FileOutputStream out;
            try {
                out = new FileOutputStream("D:/test.txt");
                PrintStream p=new PrintStream(out);
                p.println("send message sucess!  :  " + i);
                out.close();
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
            messageSender.getSession().commit();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        messageSender.CloseConnection(messageSender.getConnection());
    }

}
相关文章
相关标签/搜索