RabbitMQ最核心的交换机和队列Exchange、Queue详解

引言java

    最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供本身和你们一起学习!json

1.先来介绍RabbitMQ中的成员

  • Producer(生产者): 将消息发送到Exchange
  • Exchange(交换器):将从生产者接收到的消息路由到Queue
  • Queue(队列):存放供消费者消费的消息
  • BindingKey(绑定键):创建Exchange与Queue之间的关系(我的看做是一种规则,也就是Exchange将什么样的消息路由到Queue)
  • RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
  • Consumer(消费者):从Queue中获取消息

下面是各个成员的做用图解app

 

 

 

 

 

 

引入依赖ide

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

 

2.先来介绍Exchange

这里将着重于介绍Exchange和Queue的各个参数解释

先来看看Exchange中都有哪些属性工具

  • exchange:名称
  • type:类型
  • durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
  • autoDelete:是否自动删除,若是没有与之绑定的Queue,直接删除
  • internal:是否内置的,若是为true,只能经过Exchange到Exchange
  • arguments:结构化参数

 

 

 下面这个类用于建立一个与RabbitMQ的Connection(链接),该Connection用于建立Channel(信道),Channel是消息读写的通道,也就是咱们的操做都会在Channel的基础之上进行学习

 

 

 2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)大数据

 

 

 进入RabbitMQ可视化界面能够看到,RabbitMQ已经为咱们建立了exchange.0,类型为directui

 

 

具体释意spa

  name                      名称
  type                        类型
  Features                 特征
  Message rate in       消息速率输入
  Message rate out     消息速率输出

2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,以前咱们已经建立了exchange.0,那么咱们再建立一次会怎样code

  exchangeDeclare(String exchange, String type, boolean durable)

 

运行成功,并无报错,由于只要你设置的的设置是同样的,那么就不会报错,若是设置的不同,那么就会报错,后面会进行验证

这里咱们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化

 

 

 接下来验证这个持久化有什么做用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
从新进入可视化界面,Exchange就只剩下持久化的了

 

 

 2.3接下来是五个参数的

多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

下面建立了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
因为这里是没有绑定Queue的,那么exchange.4将在建立后就被删除掉?

 

 

 执行上面的代码

 

exchange.4还活的好好的,这是由于咱们必须在绑定Queue以后再失去绑定才会被删除,不然为何不直接抛异常,接下来进行验证
下面直接经过可视化工具建立一个名称为queue.4的Queue

 

 

 

 

 英文释义

Name         名称
Features     特征
Status        状态
Ready        是否准备好
Unacked     未确认
Total           总计
incoming     进来的
deliver        传送
get             获得
ack             确认

2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments

exclusive:是否排他,若是未true,则只在第一次建立它的Connection中有效,当Connection关闭,该Queue也会被删除

在执行完下面代码,查看可视化界面,发现queue中并无exclusive.queue,由于在connection关闭后,该queue也会自动删除

建立实例

package com.tiandy.illegal.util.mq;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.*;
import com.tiandy.illegal.bo.CLS_ManageService;
import com.tiandy.illegal.bo.CLS_ManageServiceImpl;
import com.tiandy.illegal.util.CLS_ILLEGAL_Error;
import com.tiandy.illegal.vo.CLS_VO_Message;
import com.tiandy.illegal.vo.CLS_VO_Record;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;

public class RabbitMQSend {
    //rabbitmq链接
    public static Connection connection = null;
    //rabbitmq通道
    public static Channel channel = null;
    //链接状态标识
    public static boolean connectStatus = false;
    // 配置
    static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig");
    // 交换机  exchangeTemp
    private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange");
    // 队列名  queue_vbs_vehicle_record
    private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue");
    // service
    CLS_ManageService cls_manageService = new CLS_ManageServiceImpl();
    static ConnectionFactory factory = null;

    public void initialize() {
        try {
            //链接工厂
            if (null == factory) {
                factory = new ConnectionFactory();
                factory= RabbitMQUtil.getRabbitMQConnectionFactory();
                // 关闭通道与链接
                closeConnection();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 声明交换机
                // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true);
                connectStatus = true;

            }
        } catch (Exception e) {
            connectStatus = false;
            e.printStackTrace();
           // log.error("RabbitMQSend method initialize:" + e.getMessage(), e);
        }
    }

    //关闭链接
    public void closeConnection() {
        try {
            if (channel != null) {
                if (channel.isOpen()) {
                    channel.close();
                    channel = null;
                }
            }
        } catch (Exception e) {
            //log.error("RabbitMQSend closeChannel  error  " + e);
            e.printStackTrace();
        }
        try {
            if (connection != null) {
                if (connection.isOpen()) {
                    connection.close();
                    connection = null;
                }
            }
        } catch (Exception e) {
           // log.error("RabbitMQSend closeConnection  error  " + e);
            e.printStackTrace();
        }
    }
/**
     * 监听消息队列,获取数据
     */
    public void queueDeclareExchange() {
        //声明交换机
        try {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-max-length", 100000); // 设置最大存储消息数
            // 声明交换机  (交换机参数)
            channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true);
            // 消息持久化  (队列参数)
            channel.queueDeclare(rabbitmq_queue, true, false, false, args);
            // 交换机与队列绑定
            channel.queueBind(rabbitmq_queue, rabbitmq_exchange, "");
            // 消费者限制
            //channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                int inRecord=0; // 插入记录数量
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //接收到的消息
                    String msg = new String(body, "UTF-8");
                    // 判断数据是否容许接入
                    int check = checkMessage(msg);
                    if (check == CLS_ILLEGAL_Error.ERROR_OK) {
                        // 消息转换至VO
                        CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg);
                        // 判断数据,分开处理白车牌数据与其余数据,每次新增一条
                        int count = cls_manageService.decideData(msgVo);
                        if(count>0){
                            inRecord+=count;
                            System.out.println("  已消费消息:"+envelope.getDeliveryTag()+"  插入记录数:" + inRecord);
                        }
                    }
                    // 单条消息确认(第几条,是否多条)
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 设置消息手动确认 (队列名,是否自动确认,consumer)
            channel.basicConsume(rabbitmq_queue, false, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * 方法说明:监测接收信息
     *
     * @param message
     * @return @修改人及日期: @修改描述: @其余:
     */
    public int checkMessage(String message) {
        // TODO 监测数据格式及是否容许接入
        int check = 0;
        CLS_VO_Message vo_Message = null;
        try {
            vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class);
        } catch (Exception e) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getTotal_info() == null) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        CLS_VO_Record total_info = vo_Message.getTotal_info();
        if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        return check;
    }

}

 至此,简单的参数讲解和应用就总结完了!

相关文章
相关标签/搜索