RabbitMQ之交换器的三种模式

交换器(exchange)的最大作用一方面在于接收生产者的信息,另一方面在于发送消息到不同的队列,RabbitMQ中交换器被分为三类:fanout,topic,director

在这里插入图片描述

1.广播模式(fanout)

广播模式:生产的每一条消息,由所有消费者进行处理操作
666
图片参考自网络

消费者程序

生产者交换器核心代码

//信道绑定交换器
 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

poducer.class

public class MessageProducerfount {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection=factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel=connection.createChannel();
        //信道进行交换器类型指定
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间
        long start=System.currentTimeMillis();
        for (int i=0;i<10;i++){
            String message="sjw"+i;
            //basicPublish(exchange,队列名称,属性,参数.getbyte())
            channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        }
        //结束时间
        long end=System.currentTimeMillis();
        //输出所需时间
        System.out.println("进入队列总共耗时:"+(end-start));
        //关闭信道
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者程序

消费者核心代码

//信道绑定交换器
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理
 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey
channel.exchangeBind(queuename,EXCHANGE_NAME,"");

consumer.class

public class MessageConsumer1 {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection = factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel = connection.createChannel();
        //从信道中寻找队列名称
       String queuename=channel.queueDeclare().getQueue();
        //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //交换器Bind队列,绑定相应的队列,消费者从队列中取出数据进行处理
        //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey
        channel.queueBind(queuename,EXCHANGE_NAME,"");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //信道交给consumer进行内容接收处理
        channel.basicConsume(queuename,consumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[消费者A:]"+"[消息]" + message);
        }

    }
}

消息生产,观察exchange
在这里插入图片描述

打开三个消费者
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

发现三个消费者都在同时处理生产者产生的消息,广播模式测试成功!!!

2.直连模式(direct)

直连模式:任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。只有key匹配上了,这个队列的消费者才能进行消费操作

在这里插入图片描述

生产者程序

核心代码

channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.basicPublish(EXCHANGE_NAME,"sjw-key", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

puducer.class

public class MessageProducerfount {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange.direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection=factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel=connection.createChannel();
        //信道进行交换器类型指定
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间
        long start=System.currentTimeMillis();
        for (int i=0;i<10;i++){
            String message="sjw"+i;
            //basicPublish(exchange,队列名称,属性,参数.getbyte())
            channel.basicPublish(EXCHANGE_NAME,"sjw-key", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        }
        //结束时间
        long end=System.currentTimeMillis();
        //输出所需时间
        System.out.println("进入队列总共耗时:"+(end-start));
        //关闭信道
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者程序

核心代码

channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key");

consumer.class

public class MessageConsumer2 {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange.direct";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection = factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel = connection.createChannel();
        //从信道中寻找队列名称
        String queuename=channel.queueDeclare().getQueue();
        //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理
        //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey
        channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //信道交给consumer进行内容接收处理
        channel.basicConsume(queuename,consumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[消费者B:]"+"[消息]" + message);
        }

    }
}

此时启动三个消费者,只有B的与生产者key匹配
在这里插入图片描述
只有消费者B可以接收生产者产生的消息
在这里插入图片描述直连模式成功!!!

3.主题模式(topic)

主题模式:主题模式更像是广播模式和直连模式的结合体.根据自定义的规则分配rountingkey给不同的队列,让对应key的队列进行处理.这种模式常用于RabbitMQ.

例如:我们让模2=0的数字在消费者B出现,其余的在消费者A出现

生产者代码

public class MessageProducerfount {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange.topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection=factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel=connection.createChannel();
        //信道进行交换器类型指定
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间
        long start=System.currentTimeMillis();
        for (int i=0;i<10;i++){
            String message="sjw"+i;
            //basicPublish(exchange,队列名称,属性,参数.getbyte())
            if (i%2==0) {
                channel.basicPublish(EXCHANGE_NAME, "sjw-key-B", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            }else {
                channel.basicPublish(EXCHANGE_NAME, "sjw-key-A", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            }
        }
        //结束时间
        long end=System.currentTimeMillis();
        //输出所需时间
        System.out.println("进入队列总共耗时:"+(end-start));
        //关闭信道
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者代码
消费者A

public class MessageConsumer1 {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange.topic";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection = factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel = connection.createChannel();
        //从信道中寻找队列名称
       String queuename=channel.queueDeclare().getQueue();
        //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //交换器Bind队列,绑定相应的队列,消费者从队列中取出数据进行处理
        //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey
        channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key-A");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //信道交给consumer进行内容接收处理
        channel.basicConsume(queuename,consumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[消费者A:]"+"[消息]" + message);
        }

    }
}

消费者B

public class MessageConsumer2 {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String EXCHANGE_NAME="sjw.exchange.topic";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost("sjw.virtual");
        //获取连接
        Connection connection = factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel = connection.createChannel();
        //从信道中寻找队列名称
        String queuename=channel.queueDeclare().getQueue();
        //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理
        //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey
        channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key-B");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //信道交给consumer进行内容接收处理
        channel.basicConsume(queuename,consumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[消费者B:]"+"[消息]" + message);
        }

    }
}