感兴趣的胖友能够体验一哈新的阅读地址:http://www.zhouhong.icu/post/142 (*^▽^*)
服务器IP
|
hostname
|
节点说明
|
端口
|
管控台地址
|
192.168.2.121
|
zhouhong121
|
rabbitmq master
|
5672
|
http://192.168.2.121:15672
|
192.168.2.122
|
zhouhong122
|
rabbitmq slave
|
5672
|
http://192.168.2.122:15672
|
192.168.2.123
|
zhouhong123
|
rabbitmq slave
|
5672
|
http://192.168.2.123:15672
|
前提条件:修改12一、12二、123三台服务器的 hostname 而且可使用hostname 两两之间 ping 通。
vim /etc/hostname
## 修改对应的名字,好比:
zhouhong121
vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.2.121 zhouhong121 192.168.2.122 zhouhong122 192.168.2.123 zhouhong123
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
连接:https://pan.baidu.com/s/1diapYC19UlDy4G-4lgZWHA 提取码:jf5r 复制这段内容后打开百度网盘手机App,操做更方便哦
1、安装 rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm 2、启动 systemctl start rabbitmq-server 3、安装web管控台 rabbitmq-plugins enable rabbitmq_management 4、添加用户 sudo rabbitmqctl add_user admin admin sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin "." "." ".*" 5、重启 systemctl start rabbitmq-server rabbitmq-plugins enable rabbitmq_management
选择12一、12二、123任意一个节点为Master(这里选择71为Master),也就是说咱们须要把121的Cookie文件同步到12二、123节点上去,进入/var/lib/rabbitmq目录下,把/var/lib/rabbitmq/.erlang.cookie文件的权限修改成777,原来是400;而后把.erlang.cookie文件copy到各个节点下;最后把全部cookie文件权限还原为400便可。
//进入目录修改权限;远程copy12二、123节点 cd /var/lib/rabbitmq/ chmod 777 /var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.122:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.123:/var/lib/rabbitmq/ // 每台服务器为默认修改权限 chmod 400 /var/lib/rabbitmq/.erlang.cookie
rabbitmqctl stop
rabbitmq-server -detached
//注意作这个步骤的时候:须要配置/etc/hosts 必须相互可以寻址到 //在122节点上执行如下操做 rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@zhouhong121 rabbitmqctl start_app //一样在123节点上执行如下操做 rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@zhouhong121 rabbitmqctl start_app //在另外其余节点上操做要移除的集群节点 //rabbitmqctl forget_cluster_node rabbit@zhouhong12二、12二、123
rabbitmqctl set_cluster_name rabbitmq_cluster1
rabbitmqctl cluster_status
如图:121为dics 12二、123为 RAMhtml
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
将全部队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致,RabbitMQ高可用集群就已经搭建好了,咱们能够重启服务,查看其队列是否在从节点同步。java
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhouhong</groupId> <artifactId>rabbit-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- springboot rabbitmq(amqp) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
server.servlet.context-path=/ server.port=8011 ## 镜像队列地址 spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ## 默认虚拟主机 spring.rabbitmq.virtual-host=/ ## 链接超时 spring.rabbitmq.connection-timeout=15000 ## 是否使用启用消息确认模式(可靠性投递) spring.rabbitmq.publisher-confirms=true ## 设置reture消息模式,注意要和mandatory一块儿配合使用 ## spring.rabbitmq.publisher-returns=true ## spring.rabbitmq.template.mandatory=true spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
package com.zhouhong.rabbit.producer.component; import java.util.Map; import java.util.UUID; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class RabbbitSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 这里是确认消息的回调监听接口,用于确认消息是否被 broker 所收到 */ final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * @param CorrelationData 做为一个惟一的标识 * @param ack broker是否落盘成功 * @param cause 失败的一些异常信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // TODO Auto-generated method stub } }; /** * 对外发送消息的方法 * @param massage 具体的消息内容 * @param properties 额外的属性 * @throws Exception */ public void send(Object message, Map<String, Object> properties) throws Exception{ MessageHeaders mhs = new MessageHeaders(properties); Message<?> msg = MessageBuilder.createMessage(message, mhs); /** * 使用的是confirms模式,因此在发消息以前须要监控 */ rabbitTemplate.setConfirmCallback(confirmCallback); //指定业务惟一的ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor mpp = new MessagePostProcessor() { @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { System.out.println("post todo: "+ message); return message; } }; rabbitTemplate.convertAndSend("exchange-1", "rabbitmq.*", msg, correlationData); } }
server.servlet.context-path=/ server.port=8012 ## 镜像队列地址 spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ## 默认虚拟主机 spring.rabbitmq.virtual-host=/ ## 链接超时 spring.rabbitmq.connection-timeout=15000 ## 表示消费者消息消费成功之后,须要手工的进行签收(ACK) 默认为 auto spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 线程数 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 ## 一条一条消费 spring.rabbitmq.listener.simple.prefetch=1 ## 最好不要在代码里写死配置信息,尽可能使用这种方式也就是配置文件的方式 ## 在代码里使用 ${} 方式进行设置配置: ${spring.rabbitmq.listener.order.exchange.name} ## 交换机名称 ## spring.rabbitmq.listener.order.exchange.name=order-exchange ## 是否持久化 ## spring.rabbitmq.listener.order.exchange.durable=true ## type 类型 ## spring.rabbitmq.listener.order.exchange.type=topic ## 规则 ## spring.rabbitmq.listener.order.exchange.key=rabbitmq.* spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
package com.zhouhong.rabbit.consumer.component; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class RabbbitReceive { /** * 组合使用监听 * @param message * @param channel * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable = "true"), exchange = @Exchange(name = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "rabbitmq.*" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { //一、收到消息之后进行业务端消费处理 System.err.println("======================"); System.err.println("消息消费:" + message.getPayload()); //二、处理成功以后获取deliveryTay 而且进行手工的ACK操做,由于咱们配置文件里面配置的是手工签收 Long deliveryTay = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTay, false); } }
package com.zhouhong.rabbit.producer.test; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.zhouhong.rabbit.producer.component.RabbbitSender; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private RabbbitSender rabbbitSender; @Test public void testSender() throws Exception{ Map<String , Object> properties = new HashMap<String, Object>(); properties.put("key1", "你好呀,RabbitMQ!!"); properties.put("key2", "你好呀,Kafka!!"); rabbbitSender.send("rabbitmq-test", properties); Thread.sleep(10000); } }
创建了一个咱们代码里面指定的交换机 exchange-1,而且绑定了咱们指定的队列queue-1,路由规则为 rabbitmq.*node
咱们发现会有一条未消费的消息。web