rabbitmq简单使用

rabbitmq提供了6中消息队列模型服务器

  1. 简单模式

  • 链接rabbitmq
public class RabbitmqConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        // 链接工厂
        ConnectionFactory connectionFactory = new 	ConnectionFactory();
        // 服务器地址
        connectionFactory.setHost("localhost");
        //服务器端口
        connectionFactory.setPort(5672);
        // rabbitmq用户名
        connectionFactory.setUsername("guest");
        // rabbitmq密码
        connectionFactory.setPassword("guest");
        return connectionFactory.newConnection();
    }
}
  • 生产者
public class Sender {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取rabbitmq 服务器链接
        Connection connection = RabbitmqConnectionUtil.getConnection();

        // 建立rabbitmq的队列
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 将消息发布到队列
        String message = "hello, world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        // 关闭
        channel.close();
        connection.close();
    }
}
  • 消费者
public class Receive {
    private static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();

        // 建立通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        };
        // 绑定通道和消费者
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void oldApi() throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        // 从链接中建立通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[Rec]:"+ message);
        }
    }
}

简单模型:生产者与消费者是一一对应的,一个生产者只能有一个消费者,若是多个的话,必须建立多个,耦合性强
2. Work Queues
Simple队列是生产者和消费者一一对应的,实际开发中,生产者发送消息是绝不费力的,消费者通常须要处理业务,花费的时间较长,若是使用Simple会形成消息的挤压异步

  • 生产者
public class Send {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        
        // 建立channal
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //发布信息
        for (int i = 0; i < 50; i++) {
            String message = "hello "+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            try {
                Thread.sleep(i*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        channel.close();
        connection.close();
    }
}
  • 消费者一
public class Rece1 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 消费者二 和消费者一代码同样 消费1和消费2处理的消息是同样的,轮训去消费消费队列里面的消息, 轮训分发(Round-Robin)
    消费者1:都是奇数
    消费者2:都是偶数
    无论消费端谁忙谁闲,都不会多个一个消息,任务消息你一个我一个 公平分发:多劳多得
  1. 公平分发, 多劳多得
  • 生产者
public class Send {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        
        // 建立channal
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /*
         每一个消费者发送确认消息以前,消息队列再也不发送下一个消息到消费者,一次只处理一个消息
         */
        int prefectCount = 1;
        channel.basicQos(prefectCount);

        //发布信息
        for (int i = 0; i < 50; i++) {
            String message = "hello "+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }
        channel.close();
        connection.close(); 
    }
}
  • 消费者一
public class Rece1 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 完成以后,手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  • 消费者二
public class Rece2 {
    public static final String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  1. 消费应答与消息持久化
  • 消息应答 boolean ack = false;
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    boolean ack = true: 自动应答,一旦rabbitmq将消息分发给消费者,就会从内存中删除,这种状况下,若是杀死正在执行的消费者,就会丢失正在处理的消息

boolean ack = false 手动模式,若是一个消费者挂掉,就会交付给其余消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理掉了,你能够删除了,而后rabbitmq就删除内存中的消息
消息应答模式是打开的,false
若是这种方式 rabbitmq挂的话,整个消息都会丢失ide

  • 消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

durable=true,设置为true,若是在代码运行前QUEUE_NAME队列已经存在,而且不是持久化的,rabbitmq不容许从新定义一个已经存在的队列
  1. 订阅模式

    解读:
  • 一个生产者,多个消费者
  • 每一个消费者都有本身的队列
  • 生产者没有直接将消息发送给队列,而是到了交换机exchange
  • 每一个队列都绑定到交换机上
  • 生产者发送消息 通过交换机 到达队列就能实现一个消息被多个消费者消费
  • 生产者
public class Send {
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  // 分发

        // 发送消息
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        channel.close();
        connection.close();

    }
}
  • 消费者一
public class Receive {
    public static final String QUEUE_NAME = "test_queue_email";
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 绑定交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  • 消费者二
public class Receive2 {
    public static final String QUEUE_NAME = "test_queue_sms";
    public static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 绑定交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[2] reced " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("done");
                }
                // 手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
  1. Direct 模式
  • 生产者
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.basicQos(1);
        String message = "hello direct";

        // 发送消息
        String routingKey = "warning";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("send " +  message);
        channel.close();
        connection.close();
    }
}
  • 消费者一
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动应答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  • 消费者二
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动应答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  1. Topic 主题模式

  • 生产者
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = "goods.update";
        String message = "商品。。。。";

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

        channel.close();
        connection.close();
    }
}
  • 消费者一
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[2] Recv "+ message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动应答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);
    }
}
  • 消费者二
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[1] Recv "+ message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动应答
                    System.out.println("done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 关闭自动应答
        boolean ack = false;
        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);

    }
}
  1. 消息确认机制
    生产者将消息发送出去以后,消息到底有没有到达rabbitmq服务器 默认状况不知道。有两种方式
  • AMQP协议实现了事务机制
  • Confirm模式
    事务机制
    txSelect、txCommit、txRollback
    txSelect:用户将当前channel设置transation模式
    txCommit:用于提交事务
    txRollback:回滚事务
  • 生产者
public class TxSend {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 发送数据
        String message  = "send  tx message";
        try{
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            int tt = 1/0;
            channel.txCommit();
        }catch (Exception e){
            channel.txRollback();
            System.out.println("发送消息失败");
        }
        channel.close();
        connection.close();
    }
}
  • 消费者
public class TxRecv {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("[recv] message: " + message);
            }
        });
    }
}
  1. Confirm模式
    实现原理 生产者将信道设置为Confirm模式,一旦进入confirm模式,全部在该信道上面发送的消息都被指派一个惟一的id,一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者,使得生产者知道消息被发送到目的队列了,若是消息和队列是可持久的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中 driver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序号以前全部的消息都已经获得了处理。存在两种方式:
    (1)、普通 发送一条
    (2)、批量发送
    (3)、异步发送confirm模式:提供一个回调方法
    普通模式
public class Send {
    private static final String QUEUE_NAME = "test_queue_confirm_1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启confirm模式,一个队列只能有一种模式
        channel.confirmSelect();

        String message = "send confirm...";
        channel.basicPublish("", QUEUE_NAME, null , message.getBytes());

        if(!channel.waitForConfirms()){
            System.out.println("confirm send failed");
        }else{
            System.out.println("confirm send ok ");
        }

        // 关闭信道和链接
        channel.close();
        connection.close();
    }

批量模式code

public class Send2 {
    private static final String QUEUE_NAME = "test_queue_confirm_1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启confirm模式,一个队列只能有一种模式
        channel.confirmSelect();

        String message = "send confirm...";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null , message.getBytes());
        }

        if(!channel.waitForConfirms()){
            System.out.println("confirm send failed");
        }else{
            System.out.println("confirm send ok ");
        }
        // 关闭信道和链接
        channel.close();
        connection.close();
    }
}
相关文章
相关标签/搜索