刚刚用了,记录下来,之后忘了,方便可以快速想起来。java
首先说明,因为RabbitMQ服务端非JAVA,C++语言,固然也就看不懂,因此本文的理解都是过于主观的。ubuntu
推荐最好的安装方式:去官网,去官网,去官网,重要的事情说三遍。tomcat
我通常的操做流程是:用google右上角翻译网页,而后看个大概意思,而后再显示原网页,一个单词单词的看。ide
仍是总结一下Ubuntu,RabbitMQ安装步骤(依次执行下面四条命令就ok了):函数
1,echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list 2,wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - 3,sudo apt-get update 4,sudo apt-get install rabbitmq-server
服务端配置,基本上不须要配置就能知足大多数需求。官网如是说,且相信他一次 ui
2.1 界面管理插件的安装google
先说一下ubuntu目录下面的日志目录,与脚本目录,通常这是咱们最关心的目录:spa
1,日志目录: /var/log/robbitmq-server 能够经过/etc/logrotate.d/rabbitmq-server进行配置
2,脚本目录:/usr/lib/rabbitmq/bin/插件
而后进入到脚本目录也就是(cd /usr/lib/rabbitmq/bin/),执行以下命令翻译
rabbitmq-plugins enable rabbitmq_management
这样管理插件算是装好了。
2.2 用户管理
经过http://localhost:15672登陆会发现(默认用户guest,密码也是guest),认证失败(登陆失败去日志文件查找缘由)。
而后天然须要受权了,给一个受权的命令demo,详情自行脑补:
#username就是用户名,能够随便取,pwd就是你要设置的密码 rabbitmqctl add_user username pwd #administrator为用户的角色,与tomcat那种管理员配置有点像 rabbitmqctl set_user_tags username administrator #授予权限 /表明vhost主机根目录,后面的*j就是读写之类的权限 rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
受权以后,讲道理就能登陆了,能够看到下面这样的界面:
先体验一下整个消息投递过程:
3.1 RabbitMQ的核心:
核心官网有介绍,说的connecnton,channel之类的,到底怎么样,who care?
整体来看,咱们关注业务实现是:1)消息怎么投递的。2)消费者怎么消费消息。3)消息是不是可靠投递。4)消息投递方式。5)消息的生命周期。6)消息队列生命周期
3.2 消息是怎么投递的?(记住一点,生产者消息投递都是面向交换机的)
RabbitMQ 是面向交换机投递消息的。交换机可能绑定有许多队列,交换机如何将消息投递给这些队列呢?
首先说一下面向交换机的设计的优点:1)这明显借助了数据链路层那个交换机的设计思想。除了层级分明之外,还能从分提升链路利用率(可能有点抽像)。
2)从代码层面来看:若是没有交换机,你至少得维护一个十分庞大的路由表,而后从路由表正确投递消息,有了交互机,这里路
由表就会被拆分到多个交换机里面,效果没必要多说。
3)而后就是高度的解耦,不一样的交换机可有不一样的路由规则,要是没有交换机。。。。。。
在RabbitMQ,交换机有4种投递方式,就是枚举类BuiltinExchangeType的4个枚举变量:
DIRECT:会将全部消息先取消息的ROUTE_KEY,而后投递到与ROUTE_KEY绑定的队列里面(if(msg.routekey.equals(queue.routekey)))。
FANOUT:此种模式下,根本不检查消息的ROUTE_KEY,直接投送到交换机所拥有的全部队列里面。
TOPIC,HEADERS自行看一下官网怎么说的,不想码字了^_^||
总结起来就一个函数就把消息发出去了:channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());能够去官网查一下这个API
3.3 消费者怎么消费消息(记住一点,消费者消费消息是面向消息队列的,这与生成者有点不同)
还不是就是TCP长链接心跳的那些事,就是这么一个API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer是Consumer类的一个实例,
你直接去处理回调接口就ok了
3.4 消息传递是否可靠
很明显是可靠的,除非你将消息队列,声明成非持久模式,这事你又重启了机器。这会丢失消息的。还有就是他有应答机制,你能够经过设置消费者消费消息的模式,
去手动应答。channel.basicConsume(?,autoACk,?)的autoAck参数设置
3.5 消息的生命周期
一旦受到消费者应答,标识消息已被消费,则消息被回收掉。
3.6 队列生命周期
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
第二个参数设置为true,会将消息持久化到磁盘,第四个参数设置为true表示没有消息而且没有链接则删除改队列,详情能够查一下API
4.1 生产者代码:
自行导入相关依赖包或相关依赖
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("username"); factory.setPort(5672);//注意这里的端口与管理插件的端口不同 factory.setPassword("pwd"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个dirent模式的交换机 channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true); //声明一个非持久化自动删除的队列 channel.queueDeclare("queue_name",false,false,true,null);//若是该队列不在被使用就删除他 zhe //将绑定到改交换机 channel.queueBind("queue_name","exchange_name","route_key"); //声明一个消息头部 Map<String,Object> header=new HashMap<>(); AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder(); header.put("charset","utf-8"); b.headers(header); AMQP.BasicProperties bp=b.build(); //将消息发出去 channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
4.2 消费者代码
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("username"); factory.setPort(5672);//注意这里的端口与管理插件的端口不同 factory.setPassword("pwd"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个dirent模式的交换机 channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true); //声明一个非持久化自动删除的队列 channel.queueDeclare("queue_name",false,false,true,null);//若是该队列不在被使用就删除他 zhe //将绑定到改交换机 channel.queueBind("queue_name","exchange_name","route_key"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume("queue_name", true, consumer);