引言java
最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供本身和你们一起学习!json
下面是各个成员的做用图解app
引入依赖ide
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
先来看看Exchange中都有哪些属性工具
下面这个类用于建立一个与RabbitMQ的Connection(链接),该Connection用于建立Channel(信道),Channel是消息读写的通道,也就是咱们的操做都会在Channel的基础之上进行学习
2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)大数据
进入RabbitMQ可视化界面能够看到,RabbitMQ已经为咱们建立了exchange.0,类型为directui
具体释意spa
name 名称
type 类型
Features 特征
Message rate in 消息速率输入
Message rate out 消息速率输出
2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,以前咱们已经建立了exchange.0,那么咱们再建立一次会怎样code
exchangeDeclare(String exchange, String type, boolean durable)
运行成功,并无报错,由于只要你设置的的设置是同样的,那么就不会报错,若是设置的不同,那么就会报错,后面会进行验证
这里咱们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化
接下来验证这个持久化有什么做用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
从新进入可视化界面,Exchange就只剩下持久化的了
2.3接下来是五个参数的
多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
下面建立了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
因为这里是没有绑定Queue的,那么exchange.4将在建立后就被删除掉?
执行上面的代码
exchange.4还活的好好的,这是由于咱们必须在绑定Queue以后再失去绑定才会被删除,不然为何不直接抛异常,接下来进行验证
下面直接经过可视化工具建立一个名称为queue.4的Queue
英文释义
Name 名称
Features 特征
Status 状态
Ready 是否准备好
Unacked 未确认
Total 总计
incoming 进来的
deliver 传送
get 获得
ack 确认
2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments
exclusive:是否排他,若是未true,则只在第一次建立它的Connection中有效,当Connection关闭,该Queue也会被删除
在执行完下面代码,查看可视化界面,发现queue中并无exclusive.queue,由于在connection关闭后,该queue也会自动删除
建立实例
package com.tiandy.illegal.util.mq; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.*; import com.tiandy.illegal.bo.CLS_ManageService; import com.tiandy.illegal.bo.CLS_ManageServiceImpl; import com.tiandy.illegal.util.CLS_ILLEGAL_Error; import com.tiandy.illegal.vo.CLS_VO_Message; import com.tiandy.illegal.vo.CLS_VO_Record; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ResourceBundle; public class RabbitMQSend { //rabbitmq链接 public static Connection connection = null; //rabbitmq通道 public static Channel channel = null; //链接状态标识 public static boolean connectStatus = false; // 配置 static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig"); // 交换机 exchangeTemp private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange"); // 队列名 queue_vbs_vehicle_record private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue"); // service CLS_ManageService cls_manageService = new CLS_ManageServiceImpl(); static ConnectionFactory factory = null; public void initialize() { try { //链接工厂 if (null == factory) { factory = new ConnectionFactory(); factory= RabbitMQUtil.getRabbitMQConnectionFactory(); // 关闭通道与链接 closeConnection(); connection = factory.newConnection(); channel = connection.createChannel(); // 声明交换机 // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true); connectStatus = true; } } catch (Exception e) { connectStatus = false; e.printStackTrace(); // log.error("RabbitMQSend method initialize:" + e.getMessage(), e); } } //关闭链接 public void closeConnection() { try { if (channel != null) { if (channel.isOpen()) { channel.close(); channel = null; } } } catch (Exception e) { //log.error("RabbitMQSend closeChannel error " + e); e.printStackTrace(); } try { if (connection != null) { if (connection.isOpen()) { connection.close(); connection = null; } } } catch (Exception e) { // log.error("RabbitMQSend closeConnection error " + e); e.printStackTrace(); } } /** * 监听消息队列,获取数据 */ public void queueDeclareExchange() { //声明交换机 try { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-length", 100000); // 设置最大存储消息数 // 声明交换机 (交换机参数) channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true); // 消息持久化 (队列参数) channel.queueDeclare(rabbitmq_queue, true, false, false, args); // 交换机与队列绑定 channel.queueBind(rabbitmq_queue, rabbitmq_exchange, ""); // 消费者限制 //channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { int inRecord=0; // 插入记录数量 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息 String msg = new String(body, "UTF-8"); // 判断数据是否容许接入 int check = checkMessage(msg); if (check == CLS_ILLEGAL_Error.ERROR_OK) { // 消息转换至VO CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg); // 判断数据,分开处理白车牌数据与其余数据,每次新增一条 int count = cls_manageService.decideData(msgVo); if(count>0){ inRecord+=count; System.out.println(" 已消费消息:"+envelope.getDeliveryTag()+" 插入记录数:" + inRecord); } } // 单条消息确认(第几条,是否多条) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 设置消息手动确认 (队列名,是否自动确认,consumer) channel.basicConsume(rabbitmq_queue, false, consumer); } catch (IOException e) { e.printStackTrace(); } } /** * 方法说明:监测接收信息 * * @param message * @return @修改人及日期: @修改描述: @其余: */ public int checkMessage(String message) { // TODO 监测数据格式及是否容许接入 int check = 0; CLS_VO_Message vo_Message = null; try { vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class); } catch (Exception e) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getTotal_info() == null) { return CLS_ILLEGAL_Error.ERROR_PARAM; } CLS_VO_Record total_info = vo_Message.getTotal_info(); if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } return check; } }
至此,简单的参数讲解和应用就总结完了!