Java操做RabbitMQ简单队列

一、建立工具类

package com.kobe.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtils {

    public static Connection getConnection() throws TimeoutException,IOException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");

        factory.setPort(5672);

        factory.setVirtualHost("/vhost_kobe");

        factory.setUsername("kobe");

        factory.setPassword("123");

        return factory.newConnection();
    }

}

二、建立生产者

package com.kobe.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendSms {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String msg = "hello rabbitmq : " + System.currentTimeMillis();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("send msg to rabbitmq:" + msg );
        } catch (Exception e ) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

三、建立消费者

package com.kobe.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveSms {

    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            DefaultConsumer consumer =  new DefaultConsumer(channel){
                //一旦有消息进入队列就会触发
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String (body,"utf-8");
                    System.out.println("receive msg :" + msg);
                }
            };
            //监听队列
            channel.basicConsume(QUEUE_NAME,true,consumer);

        } catch (Exception e ) {
            e.printStackTrace();
        } 
    }

}

四、运行生产者,往队列里存数据

输出结果:send msg to rabbitmq:hello rabbitmq : 1534087498613java

五、查看RabbitMQ Management

能够看获得数据已经存入队列ide

六、运行消费者进行消息监听

输出结果:receive msg :hello rabbitmq : 1534087498613工具

七、再次运行生产者

输出结果:send msg to rabbitmq:hello rabbitmq : 1534087638186spa

消费者监听到以后打印出 receive msg :hello rabbitmq : 1534087638186code

八、查看RabbitMQ Management 

相关文章
相关标签/搜索