rabbitmq提供了6中消息队列模型服务器
public class RabbitmqConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { // 链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 服务器地址 connectionFactory.setHost("localhost"); //服务器端口 connectionFactory.setPort(5672); // rabbitmq用户名 connectionFactory.setUsername("guest"); // rabbitmq密码 connectionFactory.setPassword("guest"); return connectionFactory.newConnection(); } }
public class Sender { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取rabbitmq 服务器链接 Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立rabbitmq的队列 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 将消息发布到队列 String message = "hello, world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 关闭 channel.close(); connection.close(); } }
public class Receive { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; // 绑定通道和消费者 channel.basicConsume(QUEUE_NAME, false, consumer); } private static void oldApi() throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 从链接中建立通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, false, consumer); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[Rec]:"+ message); } } }
简单模型:生产者与消费者是一一对应的,一个生产者只能有一个消费者,若是多个的话,必须建立多个,耦合性强
2. Work Queues
Simple队列是生产者和消费者一一对应的,实际开发中,生产者发送消息是绝不费力的,消费者通常须要处理业务,花费的时间较长,若是使用Simple会形成消息的挤压异步
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发布信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); try { Thread.sleep(i*100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 每一个消费者发送确认消息以前,消息队列再也不发送下一个消息到消费者,一次只处理一个消息 */ int prefectCount = 1; channel.basicQos(prefectCount); //发布信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } channel.close(); connection.close(); } }
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 完成以后,手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Rece2 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
boolean ack = false 手动模式,若是一个消费者挂掉,就会交付给其余消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理掉了,你能够删除了,而后rabbitmq就删除内存中的消息
消息应答模式是打开的,false
若是这种方式 rabbitmq挂的话,整个消息都会丢失ide
boolean durable = false; channel.queueDeclare(QUEUE_NAME, false, false, false, null); durable=true,设置为true,若是在代码运行前QUEUE_NAME队列已经存在,而且不是持久化的,rabbitmq不容许从新定义一个已经存在的队列
public class Send { public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发 // 发送消息 String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } }
public class Receive { public static final String QUEUE_NAME = "test_queue_email"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Receive2 { public static final String QUEUE_NAME = "test_queue_sms"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicQos(1); String message = "hello direct"; // 发送消息 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("send " + message); channel.close(); connection.close(); } }
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "goods.update"; String message = "商品。。。。"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); channel.close(); connection.close(); } }
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class TxSend { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送数据 String message = "send tx message"; try{ channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); int tt = 1/0; channel.txCommit(); }catch (Exception e){ channel.txRollback(); System.out.println("发送消息失败"); } channel.close(); connection.close(); } }
public class TxRecv { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("[recv] message: " + message); } }); } }
public class Send { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启confirm模式,一个队列只能有一种模式 channel.confirmSelect(); String message = "send confirm..."; channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 关闭信道和链接 channel.close(); connection.close(); }
批量模式code
public class Send2 { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启confirm模式,一个队列只能有一种模式 channel.confirmSelect(); String message = "send confirm..."; for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 关闭信道和链接 channel.close(); connection.close(); } }