新建一个maven项目,在pom.xml文件加入如下依赖html
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>新建一个P1类
package com.rabbitMQ.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @create 2019/11/20-11:23 */ public class P1 { public static void main(String[] args) throws IOException, TimeoutException { //消息队列名字 String queueName="queue"; //实例链接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置地址 connectionFactory.setHost("192.168.128.233"); //设置端口 connectionFactory.setPort(5672); //设置用户名 connectionFactory.setUsername("mowen"); //设置密码 connectionFactory.setPassword("123456"); //获取链接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //声明队列。 //参数1:队列名 //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在) //参数3:独占队列(建立者能够使用的私有队列,断开后自动删除) //参数4:当全部消费者客户端链接断开时是否自动删除队列 //参数5:队列的其余参数 channel.queueDeclare(queueName,true,false,false,null); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本发布消息 // 第一个参数为交换机名称、 // 第二个参数为队列映射的路由key、 // 第三个参数为消息的其余属性、 // 第四个参数为发送信息的主体 channel.basicPublish("",queueName,null,msg.getBytes()); } channel.close(); connection.close(); } }
运行后再浏览器进入RabbitMQ的控制台,切换到queue看到java
新建一个C1类浏览器
package com.rabbitMQ.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @create 2019/11/20-13:12 */ public class C1 { public static void main(String[] args) throws IOException, TimeoutException { //消息队列名字 String queueName="queue"; //实例链接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置地址 connectionFactory.setHost("192.168.128.233"); //设置端口 connectionFactory.setPort(5672); //设置用户名 connectionFactory.setUsername("mowen"); //设置密码 connectionFactory.setPassword("123456"); //获取链接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); // 建立一个消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消费收到消息的时候调用的回调 System.out.println("C3接收到:" + new String(body)); } }; //把消费着绑定到指定队列 //第一个是队列名 //第二个是 是否自动确认 //第三个是消费者 channel.basicConsume(queueName,true,consumer); } }
运行后输出为服务器
消费者通常都不会关闭,会一直等待队列消息,能够手动关闭程序。maven
channel.basicConsume(queueName,true,consumer);中的true为收到消息后自动确认,改成false取消自动确认。ide
在handleDelivery方法最后面用性能
// 手动确认
// 确认收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
来收到手动确认消息。消费者能够有多个而且能够同时消费一个队列;ui
当有多个消费者同时消费同一个队列时,收到的消息是平均分配的(消费者没收到以前已经确认每一个消费者受到的消息),3d
但当其中一个消费者性能差的话,会影响其余的消费者,由于还要等它收完消息,这样会拖累其余消费者。code
能够设置channel 的basicQos方法
//设置最多接受消息数量 // 设置了这个参数以后要吧自动确认关掉 channel.basicQos(1);
扇形交换机是基本的交换机类型,会把收到的消息以广播的形式发送到绑定的队列里,由于不须要通过条件筛选,因此它的速度最快。
在生产者项目新建一个fanout类
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class fanout { public static void main(String[] args) throws IOException, TimeoutException { //交换机名字 String exchangeName="fanout"; //交换机名字类型 String exchangeType="fanout"; //消息队列名字 String queueName1="fanout.queue1"; String queueName2="fanout.queue2"; String queueName3="fanout.queue3"; //实例链接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置地址 connectionFactory.setHost("192.168.128.233"); //设置端口 connectionFactory.setPort(5672); //设置用户名 connectionFactory.setUsername("mowen"); //设置密码 connectionFactory.setPassword("123456"); //获取链接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //声明队列。 //参数1:队列名 //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在) //参数3:独占队列(建立者能够使用的私有队列,断开后自动删除) //参数4:当全部消费者客户端链接断开时是否自动删除队列 //参数5:队列的其余参数 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //声明交换机 channel.exchangeDeclare(exchangeName,exchangeType); //队列绑定到交换机 channel.queueBind(queueName1,exchangeName,""); channel.queueBind(queueName2,exchangeName,""); channel.queueBind(queueName3,exchangeName,""); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本发布消息 // 第一个参数为交换机名称、 // 第二个参数为队列映射的路由key、 // 第三个参数为消息的其余属性、 // 第四个参数为发送信息的主体 channel.basicPublish(exchangeName,"",null,msg.getBytes()); } channel.close(); connection.close(); } }
运行后在RabbitMQ网页管理后台的queue会看到
切换到Exchanges会看到一个
就是咱们声明的交换机,点击会看到咱们绑定的队列
直连交换机会带路由功能,队列经过routing_key与直连交换机绑定,发送消息须要指定routing_key,交换机收到消息时,交换机会根据routing_key发送到指定队列里,一样的routing_key能够支持多个队列。
在生产者项目新建direct类
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class direct { public static void main(String[] args) throws IOException, TimeoutException { String exchangeName="direct"; String exchangeType="direct"; //消息队列名字 String queueName1="direct.queue1"; String queueName2="direct.queue2"; String queueName3="direct.queue3"; //实例链接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置地址 connectionFactory.setHost("192.168.128.233"); //设置端口 connectionFactory.setPort(5672); //设置用户名 connectionFactory.setUsername("mowen"); //设置密码 connectionFactory.setPassword("123456"); //获取链接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //声明队列。 //参数1:队列名 //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在) //参数3:独占队列(建立者能够使用的私有队列,断开后自动删除) //参数4:当全部消费者客户端链接断开时是否自动删除队列 //参数5:队列的其余参数 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //声明交换机 channel.exchangeDeclare(exchangeName,exchangeType); //队列绑定到交换机并指定rouing_key channel.queueBind(queueName1,exchangeName,"key1"); channel.queueBind(queueName2,exchangeName,"key2"); channel.queueBind(queueName3,exchangeName,"key1"); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本发布消息 // 第一个参数为交换机名称、 // 第二个参数为队列映射的路由key、 // 第三个参数为消息的其余属性、 // 第四个参数为发送信息的主体 channel.basicPublish(exchangeName,"key1",null,msg.getBytes()); } channel.close(); connection.close(); } }
运行后到后台的queue会看到
切换到Exchanges会看到
点击进去
主题交换机的routing_key能够有必定的规则,交换机和队列的routing_key须要采用*.#.*…..的格式
每一个部分用.分开
*表明一个单词(不是字符)
#表明任意数量(0或n个)单词
在生产者项目新进topic类
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class topic { public static void main(String[] args) throws IOException, TimeoutException { String exchangeName="topic"; String exchangeType="topic"; //消息队列名字 String queueName1="topic.queue1"; String queueName2="topic.queue2"; String queueName3="topic.queue3"; //实例链接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置地址 connectionFactory.setHost("192.168.128.233"); //设置端口 connectionFactory.setPort(5672); //设置用户名 connectionFactory.setUsername("mowen"); //设置密码 connectionFactory.setPassword("123456"); //获取链接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //声明队列。 //参数1:队列名 //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在) //参数3:独占队列(建立者能够使用的私有队列,断开后自动删除) //参数4:当全部消费者客户端链接断开时是否自动删除队列 //参数5:队列的其余参数 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //声明交换机 channel.exchangeDeclare(exchangeName,exchangeType); //队列绑定到交换机并指定rouing_key channel.queueBind(queueName1,exchangeName,"com.aaa.*"); channel.queueBind(queueName2,exchangeName,"com.*.topic"); channel.queueBind(queueName3,exchangeName,"com.bbb.*"); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本发布消息 // 第一个参数为交换机名称、 // 第二个参数为队列映射的路由key、 // 第三个参数为消息的其余属性、 // 第四个参数为发送信息的主体 channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes()); } channel.close(); connection.close(); } }
运行后,到后台queue会看到
切换到Exchanges会看到
点击进入会看到