public class Provider { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接工厂对象 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ服务主机地址,默认localhost factory.setHost("localhost"); // 3.设置RabbitMQ服务端口,默认5672 factory.setPort(5672); // 4.设置虚拟主机名字,默认/ factory.setVirtualHost("/demo1"); // 5.设置用户链接名,默认guest // factory.setUsername("guest"); // 6.设置连接密码,默认guest // factory.setPassword("guest"); // 7.建立一个新连接 Connection connection = factory.newConnection(); // 8.建立消息通道 Channel channel = connection.createChannel(); // 9.建立队列 channel.queueDeclare("simple_queue",true,false,false,null); // 10.建立消息 String msg="simple queue demo"; // 11.消息发送 channel.basicPublish("","simple_queue",null,msg.getBytes()); // 12.关闭资源 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接工厂对象 ConnectionFactory factory = new ConnectionFactory(); // 2.设置RabbitMQ服务主机地址,默认localhost factory.setHost("localhost"); // 3.设置RabbitMQ服务端口,默认5672 factory.setPort(5672); // 4.设置虚拟主机名字,默认/ factory.setVirtualHost("/demo1"); // 5.设置用户链接名,默认guest // 6.设置连接密码,默认guest // 7.建立一个新连接 Connection connection = factory.newConnection(); // 8.建立消息通道 Channel channel = connection.createChannel(); // 9.建立队列 channel.queueDeclare("simple_queue",true,false,false,null); // 10.建立消费者,并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由 String routingKey = envelope.getRoutingKey(); // 交换机 String exchange = envelope.getExchange(); // 消息id long deliveryTag = envelope.getDeliveryTag(); // 消息体 String message = new String(body, "UTF-8"); System.out.println("路由:" + routingKey + ",交换机:" + exchange + ",消息id:" + deliveryTag + ",消息体:" + message); super.handleDelivery(consumerTag, envelope, properties, body); } }; // 11.消息监听 channel.basicConsume("simple_queue", true, consumer); // 12.关闭资源(不建议关闭,建议一直监听消息) } }
// 与上面同样,不过就是建立多个消息消费者去监听同一个消息队列,消息分配为轮询式
此模式下呢多出一个概念exchange(交换机)能够在交换机上绑定多个消息队列,而如今消息产生者将消息发送到交换机,由交换机调度给他下面的消息队列,此模式下交换机将会将消息发给全部与他绑定的队列java
// 9.建立队列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交换机:arg0,交换机名称 arg1,交换机类型(广播) channel.exchangeDeclare("demo3Exchange", BuiltinExchangeType.FANOUT); // 将队列绑定到交换机 channel.queueBind("simple_queue1","demo3Exchange",""); channel.queueBind("simple_queue2","demo3Exchange",""); // 10.建立消息 ....
class Consumer1 ... // 11.监听 消息队列simple_queue1 channel.basicConsume("simple_queue1", true, consumer); ... class Consumer2 ... // 11.监听 消息队列simple_queue2 channel.basicConsume("simple_queue2", true, consumer); ...
此模式下多出概念路由,基于上一模式,上一模式交换机会将接收到的消息发给全部绑定了的队列,此模式下接收到的消息会多一个参数Routing,会将消息转发到对应Routing的队列api
// 9.建立队列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交换机:arg0,交换机名称 arg1,交换机类型(路由对应) channel.exchangeDeclare("demo4Exchange", BuiltinExchangeType.DIRECT); // 将队列绑定到交换机 // simple_queue1只会接收到routingKey 为error的消息 channel.queueBind("simple_queue1","demo4Exchange","error"); // simple_queue2会接收到routingKey 为info,warning,error的消息 channel.queueBind("simple_queue2","demo4Exchange","info"); channel.queueBind("simple_queue2","demo4Exchange","warning"); channel.queueBind("simple_queue2","demo4Exchange","error"); // 10.建立消息 // 11.消息发送 for (int i = 0; i < 100; i++) { // 建立消息 String message = "routing_key:" + i; String routingKey = ""; if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8 routingKey = "error"; }else if (i%5 == 0){ // routing_key_queue2:5 routingKey = "info"; }else { // 0、一、5 routingKey = "warning"; } message += "--->" + routingKey; // 消息发送 channel.basicPublish("demo4Exchange", routingKey, null, message.getBytes()); }
// 同上 消息消费者只须要监听对应的消息队列便可
此模式下在上面的基础上将routingKey改成可使用通配符的模式
通配符规则:
多个单词之间以”.”分割
‘#’:匹配一个或多个词
:匹配很少很多刚好1个词
举例:
item.#:可以匹配item.insert.abc 或者 item.insert
item.:只能匹配item.insertide
// 9.建立队列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交换机:arg0,交换机名称 arg1,交换机类型(主题) channel.exchangeDeclare("demo5Exchange", BuiltinExchangeType.TOPIC); // 将队列绑定到交换机 channel.queueBind("simple_queue1","demo5Exchange","#"); channel.queueBind("simple_queue2","demo5Exchange","www.#"); channel.queueBind("simple_queue2","demo5Exchange","*.com"); // 10.建立消息 // 11.消息发送 for (int i = 0; i < 100; i++) { // 建立消息 String message = "routing_key:" + i; String routingKey = ""; if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8 routingKey = "www.baidu.com"; }else if (i%5 == 0){ // routing_key_queue2:5 routingKey = "jd.com"; }else { // 0、一、5 routingKey = "dqdwwfwevweevwe21e13r23dr2gerfdqw.dqefw122e.23f.2f.23.f.2.f2.f.24.ff2wf2qef.2"; } message += "--->" + routingKey; // 消息发送 channel.basicPublish("demo5Exchange", routingKey, null, message.getBytes()); }
// 同上 消息消费者只须要监听对应的消息队列便可