RabbitMq的基本认识和配置(一)

RabbitMq的基本认识和配置

什么是消息中间件

  • 消息: 在应用之间传递的数据c++

  • 消息中间件:Message Queue Middleware,简称MQ,是指利用高效可靠的消息机制进行与平台无关的数据交流,并基于数通讯机制来进行分布式系统的集成。经过提供的消息传递和消息排队模型,它能够在分布式环境下拓张进程间的通讯.vim

  • 传递的模式: 1.点对点(P2P,Point-to-Point)安全

2.发布订阅模式(Pub/Sub)app

  • 消息中间件适用的方向: 适用于可靠的数据传输的分布式环境。可以在不一样的平台下通讯,屏蔽各个平台以及协议之间的特性,实现应用程序的连接,而且在任什么时候刻能够将消息进行传送或者存储转发,这比远程过程调用更加进步。

消息中间件的做用

  • 解耦:在项目的启动之初来预测未来会碰到什么需求是极其困难的。消息中间件在处理过程当中间插入一个隐含的,基于数据的接口层,两边的处理过程都要实现这一个接口,这容许你独立的拓展或修改两边的处理过程,只要确保他们遵循一样的接口约束便可。异步

  • 冗余(存储):有些状况下,处理数据的过程会失败。消息中间件能够把数据进行持久化直到他们已经被彻底处理,经过这一方式规避了数据丢失的风险。在把一个消息从消息中间件中删除以前,须要你的处理系统明确地指出这个消息被处理完成,从而确保你地数据被安全地保存知道你使用完毕。分布式

  • 扩展性:消息中间件解耦了应用地处理过程,因此提供了消息入队和处理地效率是很容易地,只须要另外增长处理过程便可,不须要改变代码,也不须要调节参数。ide

  • 削峰:在访问量剧增地状况下,应用仍然须要继续发挥做用,可是这个月地突发流量的状况的不常见。.net

  • 可恢复性:当系统一部分的组件失效时,不会影响到整个系统。消息中间件下降了进程间的耦合度。线程

  • 顺序保证:在大所属使用场景下,数据处理顺序很重要,大部分消息中间件支持必定程度上的顺序性。unix

  • 缓冲:在任何重要的系统中,都会存在须要不一样处理时间的元素。消息中间件经过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽量快速。

  • 异步通讯:在不少时候应用不想也不须要当即处理消息。消息中间件提供了异步通讯机制,容许应用把一些消息放入消息中间件中,但并不当即处理它,在以后须要的时候再慢慢处理。

Rabbitmq安装

安装erlang

由于RabbitMQ采用Erlang语言编写的,因此须要配置erlang的环境,配置以下:

1.安装gcc和openssl模块

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

2.安装

yum -y install ncurses-devel

3.指定编译后程序的路径

./configure --prefix=/usr/erlang

4.编译安装

make

make install

5.配置环境变量

配置好以后使用source /etc/profile将该配置文件生效,生效以后输入erl回车能够看到:

安装完成erlang的环境以后,须要安装的就是rabiitmq了。

1.这里我下载的时rabbitmq-server-generic-unix-3.7.9.tar.xz,使用xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz,tar -xvf rabbitmq-server-generic-unix-3.7.9.tar解压。

2.配置环境变量

vim /etc/profile

3.使用守护线程的模式启动rabbitmq

rabbitmq-server -detached

4.添加用户 rabbitmqctl add_user root root123 rabbitmqctl set_permissions -p / root "." "." ".*" rabbitmqctl set_user_tags root administrator

5.虽然已经启动了rabbitmq,可是后台管理尚未打开,须要使用下面的命令打开后台管理。

rabbitmqctl start_app

rabbitmq-plugins enable rabbitmq_management

此时仍是有问题,须要关闭防火墙,systemctl stop firewalld.service

此时访问http://192.168.123.124:15672 登陆后能够看到以下的界面:

到此为止咱们就已经对rabbitmq有了基本认识,也能作些基本安装配置。

简单的例子:

1.导入依赖:

<dependencies>
		<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.6.0</version>
		</dependency>
	</dependencies>

消息生产者:

public class DemoProducer {
		private static final String EXCHANGE_NAME = "exchange_demo";
		private static final String ROUNTING_KEY = "routingkey_demo";
		private static final String QUEUE_NAME = "queue_demo";
		private static final String IP_ADDRESS = "192.168.124.129";
		private static final int PORT = 5672;

		public static void main(String[] args) throws IOException, TimeoutException {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(IP_ADDRESS);
			factory.setPort(PORT);
			factory.setUsername("root");
			factory.setPassword("root@123");
			//建立连接
			Connection connection = factory.newConnection();
			//建立信道
			Channel channel = connection.createChannel();
			channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
			//建立一个持久化,非排他的,非自动删除的的队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			//将交换器与队列经过路由键绑定
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY);
			//发送一条持久化的消息:hello world;
			String message = "hello world";
			channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
			//关闭资源
			channel.close();
			connection.close();
		}
	}

消息消费者:

public class ConsumerDemo {
		private static final String EXCHANGE_NAME = "exchange_demo";
		private static final String ROUNTING_KEY = "routingkey_demo";
		private static final String QUEUE_NAME = "queue_demo";
		private static final String IP_ADDRESS = "192.168.124.129";
		private static final int PORT = 5672;

		public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
			Address[] addresses = new Address[]{
					new Address(IP_ADDRESS, PORT)
			};
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(IP_ADDRESS);
			factory.setPort(PORT);
			factory.setUsername("root");
			factory.setPassword("root@123");
			//这里的连接方式和生产者的连接连接方式有所不一样,须要区别对待
			Connection connection = factory.newConnection(addresses);
			//建立信道
			final Channel channel = connection.createChannel();
			//设置客户端最多接收为被ack的消息个数
			channel.basicQos(64);
			Consumer consumer = new DefaultConsumer(channel) {
				[@Override](https://my.oschina.net/u/1162528)
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					System.out.println("recv message:" + new String(body));
					try {
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			};
			channel.basicConsume(QUEUE_NAME, consumer);
			TimeUnit.SECONDS.sleep(5);
			channel.close();
			connection.close();
		}
	}

执行结果:

相关文章
相关标签/搜索