RabbitMQ第一篇--入门

1.简介:java

RabbitMQ是一个消息中间件,主要做用是用来接收和转发消息。你能够将他想象成是邮局:git

当你在邮箱中放入了一封信,那么邮递员确定会将信送给它的接收者。而RabbitMQ就像是邮局、web

邮箱、邮递员。它负责从一处接收消息,按要发送的“地址”将消息转发到指定“地址”。数组

RabbitMQ和邮局的不一样之处也就在于它并不处理报刊和杂志,而是接收、存储并转发二进制的消息(Message)缓存

2.RabbitMQ中的经常使用术语:服务器

  • 生产者:一般负责发送消息的程序被咱们称为“消息生产者”。能够 用此图表示消息生产者:

                    

  • 队列:就像一个邮箱,它存在于RabbitMQ内部,消息只能存储在队列中。队列自己并没有任何限制,它能够存储任意多的消息。从本质来讲队列就是一个无限大小的缓冲区。多个“消息生产者”能够将多条消息放入同一个队列中,同理,多个消费者也能够尝试从同一个队列中获取消息。能够用下图表示队列:

               

  • 消费者:一般等待消息到来并处理消息的程序称为:“消费者”用下图来表示消费者:

                    

一般状况下,消息的生产者、消费者、以及消息中间件并不在同一台机器中。异步

3.使用Java客户端:maven

在这节的示例中,咱们将写两个Java文件;一个负责发送单条消息(消息生产者),ide

一个负责接收并打印消息(消息消费者)。在下图中P表明生产者,C表明消费者,测试

中间的红色格子表明队列(消息缓存的地方)

       

4.引入Java依赖包:

本例采用Maven的形式引入,依赖以下:

在pom文件中添加如下代码

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.6</version>
</dependency>

消息生产者的Java代码(Send.java):

maven引入依赖后请自行导包,代码中有注释,再也不对代码作解释。

public class Send {
    private static final String QUEUE_NAME = "hello";

    /**
     * 如下代码均为本机测试,RabbitMQ安装后默认启用guest帐户,用户名guest,密码guest
     * 可是该帐户默认仅能在localhost中使用.请注意.
     * 测试代码请根据本身的环境修改userName和password
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //建立链接工厂,设置链接地址,用户名,密码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //建立链接
        Connection connection = factory.newConnection();
        //经过链接建立信道,信道的建立和销毁代价比链接要小的多
        Channel channel = connection.createChannel();
        //发送的消息体
        String message = "hello rabbitmq";
        //发送消息,消息体格式为字节数组
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        //关闭链接和通道
        channel.close();
        connection.close();
    }
}

消费者Java代码(Recv.java)

public class Recv {

    //队列名称
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立连接工厂,设置用户名密码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列,发送者和生产者都作了队列声明的操做
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //建立消费者,重写消息的处理方法
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //消费消息.
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

上述两个Java文件中,发送者和消费者都作了队列的声明操做。在RabbitMQ中该操做是幂等的(只有在该队列不存在时才会建立,不会重复建立)

之因此在生产者和消费中都建立队列,是由于并不肯定会先启动哪一个服务.彻底有可能先启动消费者再启动生产者.咱们要确保消费者从该队列消费

消息时该队列已经存在.在下一章你就会看到这样的例子.

而RabbitMQ队列的建立是幂等的.因此生产者和消费者同时作队列的声明并不会引发什么问题.

消息生产者的示意图:

消息经过生产者Send.java将消息发送到名为hello的队列

            

消费者的示意图:
消费者Revc.java从名为hello的队列中取出消息执行本身的逻辑

        

RabbitMQ服务器会异步的将消息推送出来,因此在消费者中提供一个回调.并重写handleDelivery方法处理消息.
在此处仅仅是将消息打印出来.
程序到此处已经写完了,如今你能够运行项目了.步骤:
1.先运行Send.java,程序将发送一条消息到hello队列.此时你能够先不启动消费者,去MQ的web管理控制台中看看

在管理控制台的Queues下能够看到名为hello的队列中有一条准备好的消息.
此时启动消费者.会看到控制台hello队列中的ready消息为0,已经被消费者消费掉了.队列也随之被清空
Java控制台会打印出来消息内容.

示例代码地址:https://git.oschina.net/li_mingzhu/rabbitmq.git

相关文章
相关标签/搜索