最近由于工做缘由使用到RabbitMQ,以前也接触过其余的mq消息中间件,从实际使用感受来看,却不太同样,正好趁着周末,能够好好看一下RabbitMQ的相关知识点;但愿能够经过一些学习,能够搞清楚如下几点java
<!-- more -->git
相关博文,欢迎查看:github
在开始以前,先得搭建基本的环境,由于我的主要是mac进行的开发,全部写了一篇mac上如何安装rabbitmq的教程,能够经过 《mac下安装和测试rabbitmq》 查看服务器
下面简单说一下Linux系统下,能够如何安装ide
Centos 系统:学习
# 安装erlang rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm yum install erlang # 安装RabbitMQ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
启动和查看的命令测试
# 完成后启动服务: service rabbitmq-server start # 能够查看服务状态: service rabbitmq-server status
rabbitmq-plugins enable rabbitmq_management
, 默认的端口号为15672直接使用amqp-client客户端作基本的数据读写,先不考虑Spring容器的场景,咱们能够怎样进行塞数据,而后又怎样能够从里面获取数据;ui
在实际使用以前,有必要了解一下RabbitMQ的几个基本概念,即什么是Queue,Exchange,Binding,关于这些基本概念,能够参考博文:3d
首先是创建链接,通常须要设置服务器的IP,端口号,用户名密码之类的,公共代码以下code
public class RabbitUtil { public static ConnectionFactory getConnectionFactory() { //建立链接工程,下面给出的是默认的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); return factory; } }
要使用,基本的就须要一个消息投递和一个消息消费两方,线看消息生产者的通常写法
public class MsgProducer { public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立链接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息为可持久化,不自动删除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 发布消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); channel.close(); connection.close(); } }
针对上面的代码,结合RabbitMQ的基本概念进行分析
疑问:
结合上面的代码和分析,大胆的预测下消费者的流程
下面给出一个mq推数据的消费过程
public class MsgConsumer { public static void consumerMsg(String exchange, String queue, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立链接 Connection connection = factory.newConnection(); //建立消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(queue, true, false, false, null); //绑定队列到交换机 channel.queueBind(queue, exchange, routingKey); System.out.println("[*] Waiting for message. To exist press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 取消自动ack channel.basicConsume(queue, false, consumer); } }
直接在前面的基础上进行测试,咱们定义一个新的exchange名为direct.exchange
,而且制定ExchangeType为直接路由方式 (先无论这种写法的合理性)
public class DirectProducer { private static final String EXCHANGE_NAME = "direct.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { DirectProducer directProducer = new DirectProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
上面的代码执行一遍以后,看控制台会发现新增了一个Exchange
一样的咱们写一下对应的消费者,一个用来消费aaa,一个消费bbb
public class DirectConsumer { private static final String exchangeName = "direct.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(exchangeName, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{"aaa", "bbb"}; String[] queueNames = new String[]{"qa", "qb"}; for (int i = 0; i < 2; i++) { consumer.msgConsumer(queueNames[i], routingKey[i]); } Thread.sleep(1000 * 60 * 10); } }
执行上面的代码以后,就会多两个Queue,且增长了Exchange到Queue的绑定
当上面两个代码配合起来使用时,就能够看到对于消费者而言,qa一直消费的是偶数,qb一直消费的是奇数,一次输出以下:
[qa] Waiting for message. To exist press CTRL+C [qb] Waiting for message. To exist press CTRL+C [qa] Received 'hello >>> 0 [qb] Received 'hello >>> 1 [qa] Received 'hello >>> 2 [qb] Received 'hello >>> 3 [qa] Received 'hello >>> 4 ...
有了上面的case以后,这个的实现和测试就比较简单了
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
这个的输出就比较有意思了,fa,fb两个队列均可以接收到发布的消息,并且单独的执行一次上面的投递数据以后,发现fa/fb两个队列的数据都是30条
而后消费的结果以下
[qa] Waiting for message. To exist press CTRL+C [qb] Waiting for message. To exist press CTRL+C [qa] Received 'hello >>> 0 [qb] Received 'hello >>> 0 [qa] Received 'hello >>> 1 [qb] Received 'hello >>> 1 [qb] Received 'hello >>> 2 [qa] Received 'hello >>> 2 [qa] Received 'hello >>> 3 [qb] Received 'hello >>> 3 [qb] Received 'hello >>> 4 [qa] Received 'hello >>> 4 ...
代码和上面差很少,就不重复拷贝了,接下来卡另外几个问题
在上面的基础使用中,会有几个疑问以下:
以上内容,留待下一篇进行讲解
一灰灰的我的博客,记录全部学习和工做中的博文,欢迎你们前去逛逛
尽信书则不如,已上内容,纯属一家之言,因我的能力有限,不免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激