了解消息队列中间件——RabbitMQ

1. 了解消息队列中间件

    1. 消息:指的是在应用之间传送的数据,好比json字符串、纯文本字符串等java

    2. 消息队列中间件:指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下进行进程之间的通讯。如今经常使用的消息中间件有RabbitMQ、ActiveMQ、Kafka等shell

    3. 消息队列中间件的消息传递模式:json

  • P2P(point to point):即点对点模式,该模式基于队列,消息产生者生成消息并发送到队列中保存,消息消费者从消息队列中接收消息进行处理(和多线程间中的生产者/消费者模式相同)。
  • Pub/Sub模式(publish/subscribe):即发布/订阅模式,该模式中会定义一个内容节点(topic),被称为主题,主题就至关于消息队列,是消息传递的一个中介,消息发布者将消息发布到主题中,而消息订阅者在订阅这个主题后就能够从该主题中接收到发布者发布的消息。用于一对多广播消息

    4. 消息中间件的做用:vim

  • 解耦:将消息发送者和消息接收者之间解耦
  • 持久化存储:在某些状况下,处理数据的过程可能会失败致使数据丢失,而消息中间件就能够实现将消息数据持久化存储到本地磁盘,直到这些消息数据被处理完成,就能够避免数据丢失的状况。
  • 扩展性:消息发送者和消息接收者之间的解耦,提供了良好的扩展性
  • 缓冲削峰:大多数状况下,消息发送确定要比消息接收者的处理要快的多,而消息中间件就提供了缓冲层的做用,在访问量剧增的状况下,对于服务端会产生极大压力,而这样的状况不多出现,不可能为这类峰值状况提供持续的资源,若是没有消息中间件,颇有可能会由于超负荷请求致使系统崩溃,而在加入消息中间件做为缓冲层后,请求会被写入消息中间件中等待处理,而消息接收者则会从消息中间件中一个个取出消息进行处理,不会由于忽然的巨量请求致使崩溃,应用仍然能正常运行
  • 可恢复性:当系统中的部分组件失效时(好比断电、程序异常终止),由于消息中间件的存在,不会丢失过多数据,若是一个处理消息的进程挂掉,进入消息中间件的消息仍然能够在系统恢复后进行处理
  • 顺序保证:消息中间件支持必定程度上数据处理的顺序性
  • 异步通讯:消息中间件提供了异步处理机制

2. 在CentOS中安装部署单机RabbitMQ

    参考在Centos中RabbitMQ的安装步骤centos

一、首先安装erlang ,下载erlang的安装包到centos上,bash

wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
而且进行安装 rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
查看是够安装成功多线程

[root@localhost Desktop]# erl并发

Erlang/OTP 19 [erts-8.0.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]异步

Eshell V8.0.3  (abort with ^G)

输入halt().退出erl
二、安装rabbitMQ 
第一种在线下载,先下载async

rpm:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下载完成后安装:

yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm 

三、开放端口

能够选择直接关闭防火墙,执行命令 

systemctl stop firewalld.service

或者

vim /etc/sysconfig/iptables

#添加一下内容
#RabbitMQ  
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 25672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 4369 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5671 -j ACCEPT
##RabbitMQ 
四、启动rabbit

service rabbitmq-service start
或者

/sbin/service rabbitmq-server start

五、访问 
游览器输入下列地址,便可进入RabbitMQ的管理界面:

http://localhost:15672/
 

3. RabbitMQ的Java客户端简单使用

    1. 建立Maven工厂,导入RabbitMQ的Java客户端的相关jar包:

<dependency>
  		<groupId>com.rabbitmq</groupId>
  		<artifactId>amqp-client</artifactId>
  		<version>5.0.0</version>
  	</dependency>

    2. 添加RabbitMQ的管理员帐号:在CentOS中启动RabbitMQ后,执行指令,添加一个root用户,而且密码为123456

rabbitmqctl add_user root 123456

设置帐户权限,开放全部权限

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

设置帐户为管理员角色

rabbitmqctl set_user_tags root administrator

    3. 消息产生者客户端代码:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class RabbitMQProducer {
	private static final String IP="192.168.10.128";
	private static final String USER="root";
	private static final String PASSWORD="123456";
	private static final int PORT=5672;
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=null;
		Channel channel=null;
		try {
			String exchange_demo="exchange_demo";//声明一个交换器名称
			String queue_demo="queue_demo";//声明一个队列名称
			String route_demo="route_demo";//声明一个路由键,用于绑定交换器和队列
			ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ链接池,并设置相关参数
			fac.setHost(IP);
			fac.setPassword(PASSWORD);
			fac.setUsername(USER);
			fac.setPort(PORT);
			//从链接池中获取一个rabbitMQ链接
			connection=fac.newConnection();
			channel=connection.createChannel();//建立一个频道
			channel.exchangeDeclare(exchange_demo, "direct",false,false,null);//建立一个type为direct,持久化的、非自动删除的交换器
			channel.queueDeclare(queue_demo, true, false, false, null);//建立一个持久化、非排他的、非自动删除的交换器
			channel.queueBind(queue_demo, exchange_demo, route_demo);//将交换器和队列经过路由键绑定
			//发送一条消息
			String message="Hello World";
			channel.basicPublish(exchange_demo, route_demo, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
		} finally {
			//关闭资源
			if(connection!=null){
				if(channel!=null){
					channel.close();//能够不用关闭,当connection关闭后,channel也会自动关闭
				}
				connection.close();
			}
		}
	}
}

    4. 消息消费者客户端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQConsumer {
	private static final String IP="192.168.10.128";
	private static final String USER="root";
	private static final String PASSWORD="123456";
	private static final int PORT=5672;
	private static final String QUEUE_NAME="queue_demo";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=null;
		Address[] address={new Address(IP,PORT)};
		try {
			String queue_demo="queue_demo";//声明一个队列名称
			ConnectionFactory fac=new ConnectionFactory();//获取一个rabbitMQ链接池,并设置相关参数
			fac.setPassword(PASSWORD);
			fac.setUsername(USER);
			//从链接池中获取一个rabbitMQ链接
			connection=fac.newConnection(address);
			final Channel channel=connection.createChannel();//建立一个频道
			channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
			Consumer con=new DefaultConsumer(channel){
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					System.out.println("get message:"+new String(body,"utf-8"));
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			};
			channel.basicConsume(queue_demo, con);
			if(channel!=null){
				channel.close();
			}
		} finally {
			//关闭资源
			if(connection!=null){
				
				connection.close();
			}
		}
	}
}
相关文章
相关标签/搜索