1. 消息:指的是在应用之间传送的数据,好比json字符串、纯文本字符串等java
2. 消息队列中间件:指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下进行进程之间的通讯。如今经常使用的消息中间件有RabbitMQ、ActiveMQ、Kafka等shell
3. 消息队列中间件的消息传递模式:json
4. 消息中间件的做用:vim
参考在Centos中RabbitMQ的安装步骤centos
一、首先安装erlang ,下载erlang的安装包到centos上,bash
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
而且进行安装 rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
查看是够安装成功多线程
[root@localhost Desktop]# erl并发
Erlang/OTP 19 [erts-8.0.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]异步
Eshell V8.0.3 (abort with ^G)
输入halt().退出erl
二、安装rabbitMQ
第一种在线下载,先下载async
rpm:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下载完成后安装:
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
三、开放端口
能够选择直接关闭防火墙,执行命令
systemctl stop firewalld.service
或者
vim /etc/sysconfig/iptables
#添加一下内容
#RabbitMQ
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 25672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 4369 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5671 -j ACCEPT
##RabbitMQ
四、启动rabbit
service rabbitmq-service start
或者
/sbin/service rabbitmq-server start
五、访问
游览器输入下列地址,便可进入RabbitMQ的管理界面:
http://localhost:15672/
1. 建立Maven工厂,导入RabbitMQ的Java客户端的相关jar包:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
2. 添加RabbitMQ的管理员帐号:在CentOS中启动RabbitMQ后,执行指令,添加一个root用户,而且密码为123456
rabbitmqctl add_user root 123456
设置帐户权限,开放全部权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
设置帐户为管理员角色
rabbitmqctl set_user_tags root administrator
3. 消息产生者客户端代码:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitMQProducer { private static final String IP="192.168.10.128"; private static final String USER="root"; private static final String PASSWORD="123456"; private static final int PORT=5672; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=null; Channel channel=null; try { String exchange_demo="exchange_demo";//声明一个交换器名称 String queue_demo="queue_demo";//声明一个队列名称 String route_demo="route_demo";//声明一个路由键,用于绑定交换器和队列 ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ链接池,并设置相关参数 fac.setHost(IP); fac.setPassword(PASSWORD); fac.setUsername(USER); fac.setPort(PORT); //从链接池中获取一个rabbitMQ链接 connection=fac.newConnection(); channel=connection.createChannel();//建立一个频道 channel.exchangeDeclare(exchange_demo, "direct",false,false,null);//建立一个type为direct,持久化的、非自动删除的交换器 channel.queueDeclare(queue_demo, true, false, false, null);//建立一个持久化、非排他的、非自动删除的交换器 channel.queueBind(queue_demo, exchange_demo, route_demo);//将交换器和队列经过路由键绑定 //发送一条消息 String message="Hello World"; channel.basicPublish(exchange_demo, route_demo, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } finally { //关闭资源 if(connection!=null){ if(channel!=null){ channel.close();//能够不用关闭,当connection关闭后,channel也会自动关闭 } connection.close(); } } } }
4. 消息消费者客户端:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitMQConsumer { private static final String IP="192.168.10.128"; private static final String USER="root"; private static final String PASSWORD="123456"; private static final int PORT=5672; private static final String QUEUE_NAME="queue_demo"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=null; Address[] address={new Address(IP,PORT)}; try { String queue_demo="queue_demo";//声明一个队列名称 ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ链接池,并设置相关参数 fac.setPassword(PASSWORD); fac.setUsername(USER); //从链接池中获取一个rabbitMQ链接 connection=fac.newConnection(address); final Channel channel=connection.createChannel();//建立一个频道 channel.basicQos(64);//设置客户端最多接收未被ack的消息个数 Consumer con=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("get message:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue_demo, con); if(channel!=null){ channel.close(); } } finally { //关闭资源 if(connection!=null){ connection.close(); } } } }