#RabbitMQ 监控(三)html
验证RabbitMQ健康运行只是确保消息通讯架构可靠性的一部分,同时,你也须要确保消息通讯结构配置没有遭受意外修改,从而避免应用消息丢失。
RabbitMQ Management HTTP API提供了一个方法容许你查看任何vhost上的任何队列:/api/queues/<vhost>/<queue>。你不只能够查看配置详情,还能够查看队列的数据统计,例如队列消耗的内存,或者队列的平均消息吞吐量。使用curl测试一下该API,这里的/%2F仍是表明默认的vhost(/)。java
curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexercise response { "consumer_details": [ { "channel_details": { "peer_host": "127.0.0.1", "peer_port": 62679, "connection_name": "127.0.0.1:62679 -> 127.0.0.1:5672", "user": "guest", "number": 2, "node": "rabbit@localhost", "name": "127.0.0.1:62679 -> 127.0.0.1:5672 (2)" }, "arguments": [], "prefetch_count": 1, "ack_required": true, "exclusive": false, "consumer_tag": "amq.ctag-YImeU8Fm_VahDpxv8EAw2Q", "queue": { "vhost": "/", "name": "springrabbitexercise" } } ], "messages_details": { "rate": 7357 }, "messages": 232517, "messages_unacknowledged_details": { "rate": 0.2 }, "messages_unacknowledged": 5, "messages_ready_details": { "rate": 7356.8 }, "messages_ready": 232512, "reductions_details": { "rate": 1861021.8 }, "reductions": 58754154, ... "auto_delete": false, "durable": true, "vhost": "/", "name": "springrabbitexercise", "message_bytes_persistent": 2220250, "message_bytes_ram": 2220250, "message_bytes_unacknowledged": 40, "message_bytes_ready": 2220210, "message_bytes": 2220250, "messages_persistent": 232517, "messages_unacknowledged_ram": 5, "messages_ready_ram": 232512, "messages_ram": 232517, "garbage_collection": { "minor_gcs": 0, "fullsweep_after": 65535, "min_heap_size": 233, "min_bin_vheap_size": 46422, "max_heap_size": 0 }, "state": "running" }
为了方便阅读,去掉了部分返回值,可是仍是能够看到队列的不少信息。例如能够看到一个consumer的信息、消息占用的内存、队列的durable、auto_delete属性等。利用这些配置信息,新的健康监控程序能够经过API方法的输出来轻松监控队列的属性,并在发生变动时通知你。
就像以前编写健康检测程序那样,除了服务器、端口、vhost、用户名和密码以外,还须要知道:node
* 队列的名称,以便监控其配置git
* 该队列是否将durable和auto_delete选项打开github
###清单3.1 检测队列配置spring
完整代码在个人github,下面代码中的@Data和@Slf4j都是插件lombok中的注解,想要了解的可自行百度。api
1.定义查看队列信息的接口 RMQResource.java服务器
@Path("api") @Consumes({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_JSON}) public interface RMQResource { /** * Return a queue`s info * * @param vhost * @param name * @return {@link QueueInfo} */ @GET @Path("queues/{vhost}/{name}") Response getQueueInfo(@PathParam("vhost") String vhost, @PathParam("name") String name); }
2.定义查看队列接口的返回值 QueueInfo.javamybatis
@Data public class QueueInfo { private ConsumerDetails[] consumer_details; /** * unknown class */ @JsonIgnore private Object[] incoming; /** * unknown class */ @JsonIgnore private Object[] deliveries; /** * unknown class */ @JsonIgnore private Object arguments; private Boolean exclusive; //... private Boolean auto_delete; private Boolean durable; private String vhost; private String name; /** * unknown class */ @JsonIgnore private Object head_message_timestamp; /** * unknown class */ @JsonIgnore private Object recoverable_slaves; private Long memory; private Double consumer_utilisation; private Integer consumers; /** * unknown class */ @JsonIgnore private Object exclusive_consumer_tag; /** * unknown class */ @JsonIgnore private Object policy; @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss") private Date idle_since; }
3.检测队列配置 QueueConfigCheck.java架构
/** * 检测队列配置 */ @Slf4j public class QueueConfigCheck { private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class); public static void checkQueueConfig(String vhost, CheckQueue queue) { RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig(); String host = config.getHost(); Response response = null; try { response = rmqResource.getQueueInfo(vhost, queue.getQueue_name()); } catch (Exception e) { log.error("UNKNOWN: Could not connect to {}, cause {}", host, e.getMessage()); ExitUtil.exit(ExitType.UNKNOWN.getValue()); } if (response == null || response.getStatus() == 404) { log.error("CRITICAL: Queue {} does not exist.", queue.getQueue_name()); ExitUtil.exit(ExitType.CRITICAL.getValue()); } else if (response.getStatus() > 299) { log.error("UNKNOWN: Unexpected API error : {}", response); ExitUtil.exit(ExitType.UNKNOWN.getValue()); } else { QueueInfo info = response.readEntity(QueueInfo.class); if (!info.getAuto_delete().equals(queue.getAuto_delete())) { log.warn("WARN: Queue {} - auto_delete flag is NOT {}", queue.getQueue_name(), info.getAuto_delete()); ExitUtil.exit(ExitType.WARN.getValue()); } if (!info.getDurable().equals(queue.getDurable())) { log.warn("WARN: Queue {} - durable flag is NOT {}", queue.getQueue_name(), info.getDurable()); ExitUtil.exit(ExitType.WARN.getValue()); } } log.info("OK: Queue {} configured correctly.", queue.getQueue_name()); ExitUtil.exit(ExitType.OK.getValue()); } }
4.检测队列配置的方法参数 CheckQueue.java @Data public class CheckQueue {
private final String queue_name; private final Boolean auto_delete; private final Boolean durable; public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) { this.queue_name = queue_name; this.auto_delete = auto_delete; this.durable = durable; } }
5.运行检测程序
@Test public void testQueueConfig() { String queue_name = "springrabbitexercise"; Boolean auto_delete = false; Boolean durable = true; String vhost = "/"; CheckQueue queue = new CheckQueue(queue_name, auto_delete, durable); QueueConfigCheck.checkQueueConfig(vhost, queue); }
能够看到监控正常运行:
11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.
11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK
这段RabbitMQ队列检测的程序有一处修改,若是健康检测程序没法链接到API服务器的话,会返回EXIT_UNKNOWN。前一章的API ping健康检测要么成功要么失败,故障代码之间没有区别,可是队列检测API方法在失败时经过HTTP状态码提供了更多信息。若是HTTP状态码是404就表明尝试验证的队列不存在,检测失败并返回EXIT_CRITICAL。对于其余大于299的HTTP状态码,退出代码为EXIT_UNKNOWN。
在获取到RabbitMQ API的response以后,使用JSON进行解码,而且把获得的durable和auto_delete参数与指望的参数进行比较,若是参数和预期不相符的话,返回EXIT_WARNING或者EXIT_CRITICAL状态码。若是队列全部的配置都正确的话,那么就正确退出。
在了解咱们对RabbitMQ作监控的原理以后,能够根据RabbitMQ Management HTTP API定制更多的监控,例如:
* /api/nodes,能够获取集群中每一个节点的数据
* /api/queues/<vhost>/<queue>,能够获取队列的详细状况,例如消息处理的速率、积压的消息数量等。
除此以外还有许多其余API,咱们要作的就是根据自身的业务逻辑和这些API来设计合理的监控脚本。RabbitMQ监控系列就到此结束啦,仍是很惋惜没有实战的机会吧,由于最近在工做变更期间,看了一下RabbitMQ实战这本书,兴起想写一下博客试试。 毕业快一年了,想养成写博客的习惯。正好最近也在工做变更中,能有闲暇时间尝试一下,博客写的比较水,多多包涵。