RabbitMQ实战:运行和管理RabbitMQ

上一篇 介绍了AMQP消息通讯,包括队列、交换器和绑定,经过虚拟主机还能够隔离数据和权限,消息持久化和发送方确认模式确保了消息不丢失。 本篇主要介绍如何运行和管理RabbitMQ,在介绍以前,会有个DEMO演示消息发送和接收,一方面对AMQP的元素有更直观的认识,一方面为后面介绍监控作数据来源。 经过介绍,你会了解到:spring

消息发送和接收简单实现 服务器管理-启动和中止节点 权限配置 使用统计数组

消息发送和接收简单实现 该Demo主要用于收集日志,消息发送者是各个应用子系统,消息接收者是日志收集服务,使用RabbitMQ能够很容易实现。 基于Spring Boot框架实现,主要类的做用以下:服务器

LogRabbitConfig:建立队列、交换器、绑定等初始化操做; Sender:消息发送者; AllReceiver:全部级别日志接收者,接收全部级别的日志; ErrorReceiver:错误级别日志接受者,只接收错误级别的日志; LogSenderTest:测试用例类;数据结构

消息模型以下:app

配置 首先,配置spring boot和rabbitmq依赖: org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-amqp 而后在application.properties文件中配置rabbitmq地址: spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/ LogRabbitConfig实现框架

使用Spring的@Configuration定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,用于构建bean定义,初始化Spring容器。spring-boot

@Configuration public class LogRabbitConfig { final static String QUEUE_LOG_ERROR = "log.error"; final static String QUEUE_LOG_ALL = "log.all";post

//建立log.error队列
@Bean
public Queue logError() {
    return new Queue(QUEUE_LOG_ERROR);
}
//建立log.all队列
@Bean
public Queue logAll() {
    return new Queue(QUEUE_LOG_ALL);
}
//建立exchange,命名为log
@Bean
TopicExchange exchange() {
    return new TopicExchange("log");
}
//绑定log.error队列到exchange,routingkey为log.error
@Bean
Binding bindingExchangeError(Queue logError, TopicExchange exchange) {
    return BindingBuilder.bind(logError).to(exchange).with("log.error");
}
//绑定log.all队列到exchange,routingkey为log.#
@Bean
Binding bindingExchangeAll(Queue logAll, TopicExchange exchange) {
    return BindingBuilder.bind(logAll).to(exchange).with("log.#");
}
复制代码

} Sender实现 各个子系统向rabbitmq服务器发送消息: @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate;测试

public void send() {
    //向mq服务端发送消息,exchange为log,routingkey为log.error
    String context = "error log";
    this.rabbitTemplate.convertAndSend("log", "log.error", context);

    //向mq服务端发送消息,exchange为log,routingkey为log.info
    context = "info log";
    System.out.println("send msg : " + context);
    this.rabbitTemplate.convertAndSend("log", "log.info", context);

    //向mq服务端发送消息,exchange为log,routingkey为log.warn
    context = "warn log";
    System.out.println("send msg : " + context);
    this.rabbitTemplate.convertAndSend("log", "log.warn", context);
}
复制代码

} AllReceiver和ErrorReceiver实现 从rabbitmq服务器接收消息。 AllReceiver从服务器的log.all队列获取消息,由于它绑定的routingkey为"log.#",因此,会收到全部级别的日志: @Component @RabbitListener(queues = "log.all") public class AllReceiver { @RabbitHandler public void process(String context) { System.out.println("receive log : " + context); } } ErrorReceiver从服务器的log.error队列获取消息,由于它绑定的routingkey为"log.error",因此,只会收到error级别的日志: @Component @RabbitListener(queues = "log.error") public class ErrorReceiver { @RabbitHandler public void process(String context) { System.out.println("receive error : " + context); } } LogSenderTest测试用例 测试用例很简单,就是调用Sender发送消息,观察消息的接收状况。 @RunWith(SpringRunner.class) @SpringBootTest @SpringBootApplication public class LogSenderTest { @Autowired Sender sender;ui

@Test
public void sendLog() {
    sender.send();
}
复制代码

} 运行日志以下:

能够看到,error收到了2次,说明exchange同时分发给了log.all和log.error队列,其余级别的日志分发给了log.all队列。 服务器管理-启动和中止节点 RabbitMQ是用Erlang编写的,Erlang天生就能让应用程序无需知道对方是否在同一台机器上便可相互通讯,这让集群和可靠的消息路由变得简单。 理解节点和Erlang应用程序 和Java有JVM虚拟机相似,Erlang也有虚拟机,虚拟机的每一个实例称之为「节点」,不一样的是,多个Erlang应用程序能够运行在同一个节点之上,若是应用程序崩溃了,Erlang节点会自动尝试自动重启应用程序。 节点的操做: 后台启动节点:./rabbitmq-server -detached 中止节点:./rabbitmqctl stop 仅中止rabbit应用程序:./rabbitmqctl stop_app 配置文件 配置文件的格式本质上是原始的Erlang数据结构,是一个包含了嵌套哈希表的数组,以下: [ [mnesia , [{dump_log_write_threshold , 1000}]], [rabbit , [{vm_memory_high_wateremark , 0.4}]] ] 上面配置了2个应用,每一个应用会有本身的哈希表来配置选项:

mnesia:是rabbitmq用来存储交换器和队列元数据的; rabbit:是rabbitmq特定的配置选项;

每一个应用若是有多个选项,用逗号隔开。 权限配置 RabbitMQ权限系统中,单个用户能够跨越多个vhost进行受权,并且能够对读、写、配置分别受权。 首先建立一个用户dongqingqing,密码为123456: ./rabbitmqctl add_user dongqingqing 123456 授予dongqingqing用户权限,能够读取全部队列和交换器,只可写log.格式的队列和交换器,没法建立或删除队列和交换器 ./rabbitmqctl set_permissions dongqingqing "." "log.*" "" set_permissions 后面的参数分别为用户名、读权限、写权限、配置权限。 其余详细用法可查看文档。 使用统计 查看数据统计 可经过rabbitmqctl命令查看数据统计信息,好比队列和消息数目、交换器和绑定等。 查看全部队列,包含上面demo定义的log.all和log.error:

查看全部交换器,包含上面demo定义的log

另外,rabbitmq提供了管理界面插件,更方便的查看各类统计,能够经过下面的命令开启: sudo ./rabbitmq-plugins enable rabbitmq_management

查看日志 能够在文件系统中查看日志,启动rabbitmq后,会显示日志的路径:

另外,能够经过AMQP获取实时日志信息,有一个amq.rabbitmq.log的topic交换器,监听对应的队列便可。 下一篇将介绍消息通讯模式和最佳实践,感谢你们持续关注。

做者:情情说 连接:https://juejin.im/post/5ac78b9c51882548fe4a5ed6 来源:掘金 著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。

相关文章
相关标签/搜索