交换器(exchange)的最大作用一方面在于接收生产者的信息,另一方面在于发送消息到不同的队列,RabbitMQ中交换器被分为三类:fanout,topic,director
广播模式:生产的每一条消息,由所有消费者进行处理操作
图片参考自网络
消费者程序
生产者交换器核心代码
//信道绑定交换器 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
poducer.class
public class MessageProducerfount { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection=factory.newConnection(); //获取信道,可以有多个信道 Channel channel=connection.createChannel(); //信道进行交换器类型指定 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间 long start=System.currentTimeMillis(); for (int i=0;i<10;i++){ String message="sjw"+i; //basicPublish(exchange,队列名称,属性,参数.getbyte()) channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); } //结束时间 long end=System.currentTimeMillis(); //输出所需时间 System.out.println("进入队列总共耗时:"+(end-start)); //关闭信道 channel.close(); //关闭连接 connection.close(); } }
消费者程序
消费者核心代码
//信道绑定交换器 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey channel.exchangeBind(queuename,EXCHANGE_NAME,"");
consumer.class
public class MessageConsumer1 { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection = factory.newConnection(); //获取信道,可以有多个信道 Channel channel = connection.createChannel(); //从信道中寻找队列名称 String queuename=channel.queueDeclare().getQueue(); //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //交换器Bind队列,绑定相应的队列,消费者从队列中取出数据进行处理 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey channel.queueBind(queuename,EXCHANGE_NAME,""); QueueingConsumer consumer = new QueueingConsumer(channel); //信道交给consumer进行内容接收处理 channel.basicConsume(queuename,consumer); while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消费者A:]"+"[消息]" + message); } } }
消息生产,观察exchange
打开三个消费者
发现三个消费者都在同时处理生产者产生的消息,广播模式测试成功!!!
直连模式:任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。只有key匹配上了,这个队列的消费者才能进行消费操作
生产者程序
核心代码
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.basicPublish(EXCHANGE_NAME,"sjw-key", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
puducer.class
public class MessageProducerfount { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange.direct"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection=factory.newConnection(); //获取信道,可以有多个信道 Channel channel=connection.createChannel(); //信道进行交换器类型指定 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间 long start=System.currentTimeMillis(); for (int i=0;i<10;i++){ String message="sjw"+i; //basicPublish(exchange,队列名称,属性,参数.getbyte()) channel.basicPublish(EXCHANGE_NAME,"sjw-key", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); } //结束时间 long end=System.currentTimeMillis(); //输出所需时间 System.out.println("进入队列总共耗时:"+(end-start)); //关闭信道 channel.close(); //关闭连接 connection.close(); } }
消费者程序
核心代码
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key");
consumer.class
public class MessageConsumer2 { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange.direct"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection = factory.newConnection(); //获取信道,可以有多个信道 Channel channel = connection.createChannel(); //从信道中寻找队列名称 String queuename=channel.queueDeclare().getQueue(); //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key"); QueueingConsumer consumer = new QueueingConsumer(channel); //信道交给consumer进行内容接收处理 channel.basicConsume(queuename,consumer); while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消费者B:]"+"[消息]" + message); } } }
此时启动三个消费者,只有B的与生产者key匹配
只有消费者B可以接收生产者产生的消息
直连模式成功!!!
主题模式:主题模式更像是广播模式和直连模式的结合体.根据自定义的规则分配rountingkey给不同的队列,让对应key的队列进行处理.这种模式常用于RabbitMQ.
例如:我们让模2=0的数字在消费者B出现,其余的在消费者A出现
生产者代码
public class MessageProducerfount { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange.topic"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection=factory.newConnection(); //获取信道,可以有多个信道 Channel channel=connection.createChannel(); //信道进行交换器类型指定 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间 long start=System.currentTimeMillis(); for (int i=0;i<10;i++){ String message="sjw"+i; //basicPublish(exchange,队列名称,属性,参数.getbyte()) if (i%2==0) { channel.basicPublish(EXCHANGE_NAME, "sjw-key-B", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); }else { channel.basicPublish(EXCHANGE_NAME, "sjw-key-A", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } } //结束时间 long end=System.currentTimeMillis(); //输出所需时间 System.out.println("进入队列总共耗时:"+(end-start)); //关闭信道 channel.close(); //关闭连接 connection.close(); } }
消费者代码
消费者A
public class MessageConsumer1 { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange.topic"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection = factory.newConnection(); //获取信道,可以有多个信道 Channel channel = connection.createChannel(); //从信道中寻找队列名称 String queuename=channel.queueDeclare().getQueue(); //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //交换器Bind队列,绑定相应的队列,消费者从队列中取出数据进行处理 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key-A"); QueueingConsumer consumer = new QueueingConsumer(channel); //信道交给consumer进行内容接收处理 channel.basicConsume(queuename,consumer); while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消费者A:]"+"[消息]" + message); } } }
消费者B
public class MessageConsumer2 { //RabbitMQ服务所在地址 public final static String HOST="192.168.74.142"; //RabbitMQ端口 public final static int PORT=5672; //RabbitMQ登陆用户名 public final static String USERNAME="sjw"; //RabbitMQ登陆密码 public final static String PASSWORD="123"; //队列名称 public final static String EXCHANGE_NAME="sjw.exchange.topic"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost("sjw.virtual"); //获取连接 Connection connection = factory.newConnection(); //获取信道,可以有多个信道 Channel channel = connection.createChannel(); //从信道中寻找队列名称 String queuename=channel.queueDeclare().getQueue(); //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //信道交换器绑定,绑定相应的队列,消费者从队列中取出数据进行处理 //第一个参数是队列名称,第二个是交换器名称,第三个rountingkey channel.queueBind(queuename,EXCHANGE_NAME,"sjw-key-B"); QueueingConsumer consumer = new QueueingConsumer(channel); //信道交给consumer进行内容接收处理 channel.basicConsume(queuename,consumer); while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消费者B:]"+"[消息]" + message); } } }