消息: 在应用之间传递的数据c++
消息中间件:Message Queue Middleware,简称MQ,是指利用高效可靠的消息机制进行与平台无关的数据交流,并基于数通讯机制来进行分布式系统的集成。经过提供的消息传递和消息排队模型,它能够在分布式环境下拓张进程间的通讯.vim
传递的模式: 1.点对点(P2P,Point-to-Point)安全
2.发布订阅模式(Pub/Sub)app
解耦:在项目的启动之初来预测未来会碰到什么需求是极其困难的。消息中间件在处理过程当中间插入一个隐含的,基于数据的接口层,两边的处理过程都要实现这一个接口,这容许你独立的拓展或修改两边的处理过程,只要确保他们遵循一样的接口约束便可。异步
冗余(存储):有些状况下,处理数据的过程会失败。消息中间件能够把数据进行持久化直到他们已经被彻底处理,经过这一方式规避了数据丢失的风险。在把一个消息从消息中间件中删除以前,须要你的处理系统明确地指出这个消息被处理完成,从而确保你地数据被安全地保存知道你使用完毕。分布式
扩展性:消息中间件解耦了应用地处理过程,因此提供了消息入队和处理地效率是很容易地,只须要另外增长处理过程便可,不须要改变代码,也不须要调节参数。ide
削峰:在访问量剧增地状况下,应用仍然须要继续发挥做用,可是这个月地突发流量的状况的不常见。.net
可恢复性:当系统一部分的组件失效时,不会影响到整个系统。消息中间件下降了进程间的耦合度。线程
顺序保证:在大所属使用场景下,数据处理顺序很重要,大部分消息中间件支持必定程度上的顺序性。unix
缓冲:在任何重要的系统中,都会存在须要不一样处理时间的元素。消息中间件经过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽量快速。
异步通讯:在不少时候应用不想也不须要当即处理消息。消息中间件提供了异步通讯机制,容许应用把一些消息放入消息中间件中,但并不当即处理它,在以后须要的时候再慢慢处理。
由于RabbitMQ采用Erlang语言编写的,因此须要配置erlang的环境,配置以下:
1.安装gcc和openssl模块
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
2.安装
yum -y install ncurses-devel
3.指定编译后程序的路径
./configure --prefix=/usr/erlang
4.编译安装
make
make install
5.配置环境变量
配置好以后使用source /etc/profile将该配置文件生效,生效以后输入erl回车能够看到:
1.这里我下载的时rabbitmq-server-generic-unix-3.7.9.tar.xz,使用xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz,tar -xvf rabbitmq-server-generic-unix-3.7.9.tar解压。
2.配置环境变量
vim /etc/profile
3.使用守护线程的模式启动rabbitmq
rabbitmq-server -detached
4.添加用户 rabbitmqctl add_user root root123 rabbitmqctl set_permissions -p / root "." "." ".*" rabbitmqctl set_user_tags root administrator
5.虽然已经启动了rabbitmq,可是后台管理尚未打开,须要使用下面的命令打开后台管理。
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
此时仍是有问题,须要关闭防火墙,systemctl stop firewalld.service
此时访问http://192.168.123.124:15672 登陆后能够看到以下的界面:
到此为止咱们就已经对rabbitmq有了基本认识,也能作些基本安装配置。
简单的例子:
1.导入依赖:
<dependencies> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies>
消息生产者:
public class DemoProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUNTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.124.129"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root@123"); //建立连接 Connection connection = factory.newConnection(); //建立信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); //建立一个持久化,非排他的,非自动删除的的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //将交换器与队列经过路由键绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY); //发送一条持久化的消息:hello world; String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //关闭资源 channel.close(); connection.close(); } }
消息消费者:
public class ConsumerDemo { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUNTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.124.129"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[]{ new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root@123"); //这里的连接方式和生产者的连接连接方式有所不一样,须要区别对待 Connection connection = factory.newConnection(addresses); //建立信道 final Channel channel = connection.createChannel(); //设置客户端最多接收为被ack的消息个数 channel.basicQos(64); Consumer consumer = new DefaultConsumer(channel) { [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message:" + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } }
执行结果: